New Upstream Snapshot - golang-goleveldb
Ready changes
Summary
Merged new upstream version: 0.0~git20220721.126854a (was: 0.0~git20200815.5c35d60).
Resulting package
Built on 2022-09-28T11:45 (took 4m39s)
The resulting binary packages can be installed (if you have the apt repository enabled) by running one of:
apt install -t fresh-snapshots golang-github-syndtr-goleveldb-dev
Lintian Result
Diff
diff --git a/.golangci.yml b/.golangci.yml
new file mode 100644
index 0000000..23aaf43
--- /dev/null
+++ b/.golangci.yml
@@ -0,0 +1,11 @@
+linters-settings:
+ gocritic:
+ disabled-checks:
+ - ifElseChain
+ - elseif
+
+linters:
+ enable:
+ - gofmt
+ - gocritic
+ - unconvert
diff --git a/.travis.yml b/.travis.yml
index 66c3078..a92ab7f 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -1,12 +1,15 @@
language: go
+before_install:
+ - curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin v1.46.2
+
go:
- - 1.9.x
- - 1.10.x
- - 1.11.x
+ - 1.14.x
+ - 1.18.x
- tip
script:
- go vet ./...
- - go test -timeout 1h ./...
+ - golangci-lint run
+ - go test -short -timeout 1h ./...
- go test -timeout 30m -race -run "TestDB_(Concurrent|GoleveldbIssue74)" ./leveldb
diff --git a/README.md b/README.md
index 41a4761..113bb1b 100644
--- a/README.md
+++ b/README.md
@@ -1,6 +1,6 @@
-This is an implementation of the [LevelDB key/value database](http:code.google.com/p/leveldb) in the [Go programming language](http:golang.org).
+This is an implementation of the [LevelDB key/value database](https://github.com/google/leveldb) in the [Go programming language](https://go.dev).
-[![Build Status](https://travis-ci.org/syndtr/goleveldb.png?branch=master)](https://travis-ci.org/syndtr/goleveldb)
+[![Build Status](https://app.travis-ci.com/syndtr/goleveldb.svg?branch=master)](https://app.travis-ci.com/syndtr/goleveldb)
Installation
-----------
@@ -10,7 +10,7 @@ Installation
Requirements
-----------
-* Need at least `go1.5` or newer.
+* Need at least `go1.14` or newer.
Usage
-----------
@@ -104,4 +104,4 @@ defer db.Close()
Documentation
-----------
-You can read package documentation [here](http:godoc.org/github.com/syndtr/goleveldb).
+You can read package documentation [here](https://pkg.go.dev/github.com/syndtr/goleveldb).
diff --git a/debian/changelog b/debian/changelog
index e22abe5..fcd4c93 100644
--- a/debian/changelog
+++ b/debian/changelog
@@ -1,4 +1,4 @@
-golang-goleveldb (0.0~git20200815.5c35d60-2) UNRELEASED; urgency=medium
+golang-goleveldb (0.0~git20220721.126854a-1) UNRELEASED; urgency=medium
* Remove constraints unnecessary since stretch:
+ Build-Depends: Drop versioned constraint on dh-golang.
@@ -10,8 +10,9 @@ golang-goleveldb (0.0~git20200815.5c35d60-2) UNRELEASED; urgency=medium
* Bump debhelper from old 12 to 13.
* Fix field name case in debian/control (Rules-requires-root =>
Rules-Requires-Root).
+ * New upstream snapshot.
- -- Debian Janitor <janitor@jelmer.uk> Sun, 13 Jun 2021 16:59:18 -0000
+ -- Debian Janitor <janitor@jelmer.uk> Wed, 28 Sep 2022 11:42:39 -0000
golang-goleveldb (0.0~git20200815.5c35d60-1) unstable; urgency=medium
diff --git a/go.mod b/go.mod
index 8d0a69c..e0ba9fe 100644
--- a/go.mod
+++ b/go.mod
@@ -3,11 +3,11 @@ module github.com/syndtr/goleveldb
go 1.14
require (
- github.com/golang/snappy v0.0.1
- github.com/onsi/ginkgo v1.14.0
- github.com/onsi/gomega v1.10.1
- golang.org/x/net v0.0.0-20200813134508-3edf25e44fcc // indirect
- golang.org/x/sys v0.0.0-20200814200057-3d37ad5750ed // indirect
- golang.org/x/text v0.3.3 // indirect
- golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
+ github.com/fsnotify/fsnotify v1.5.4 // indirect
+ github.com/golang/snappy v0.0.4
+ github.com/onsi/ginkgo v1.16.5
+ github.com/onsi/gomega v1.19.0
+ github.com/stretchr/testify v1.7.2
+ golang.org/x/net v0.0.0-20220607020251-c690dde0001d // indirect
+ golang.org/x/xerrors v0.0.0-20220517211312-f3a8303e98df // indirect
)
diff --git a/go.sum b/go.sum
index 4ded307..89f1075 100644
--- a/go.sum
+++ b/go.sum
@@ -1,6 +1,14 @@
+github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
+github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
+github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
+github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
+github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
-github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
+github.com/fsnotify/fsnotify v1.5.4 h1:jRbGcIw6P2Meqdwuo0H1p6JVLbL5DHKAKlYndzMwVZI=
+github.com/fsnotify/fsnotify v1.5.4/go.mod h1:OVB6XrOHzAwXMpEM7uPOzcehqUV2UqJxmVXmkdnm1bU=
+github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
@@ -8,69 +16,109 @@ github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrU
github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w=
github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
-github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
-github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
+github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
+github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
+github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
+github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
+github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
-github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
+github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
+github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
-github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78=
+github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
+github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
+github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
-github.com/onsi/ginkgo v1.7.0 h1:WSHQ+IS43OoUrWtD1/bbclrwK8TTH5hzp+umCiuxHgs=
-github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk=
-github.com/onsi/ginkgo v1.14.0 h1:2mOpI4JVVPBN+WQRa0WKH2eXR+Ey+uK4n7Zj0aYpIQA=
-github.com/onsi/ginkgo v1.14.0/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY=
-github.com/onsi/gomega v1.4.3 h1:RE1xgDvH7imwFD45h+u2SgIfERHlS2yNG4DObb5BSKU=
-github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
+github.com/onsi/ginkgo v1.16.4/go.mod h1:dX+/inL/fNMqNlz0e9LfyB9TswhZpCVdJM/Z6Vvnwo0=
+github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
+github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU=
+github.com/onsi/ginkgo/v2 v2.1.3 h1:e/3Cwtogj0HA+25nMP1jCMDIf8RtRYbGwGGuBIFztkc=
+github.com/onsi/ginkgo/v2 v2.1.3/go.mod h1:vw5CSIxN1JObi/U8gcbwft7ZxR2dgaR70JSE3/PpL4c=
github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
-github.com/onsi/gomega v1.10.1 h1:o0+MgICZLuZ7xjH7Vx6zS/zcu93/BEp1VwkIW1mEXCE=
github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
+github.com/onsi/gomega v1.17.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY=
+github.com/onsi/gomega v1.19.0 h1:4ieX6qQjPP/BfC3mpsAtIGGlxTWPeA3Inl/7DtXw1tw=
+github.com/onsi/gomega v1.19.0/go.mod h1:LY+I3pBVzYsTBU1AnDwOSxaYi9WoWiqgwooUqq9yPro=
+github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
+github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
+github.com/stretchr/testify v1.7.2 h1:4jaiDzPyXQvSd7D0EjG45355tLlV3VOECpq10pLC+8s=
+github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals=
+github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
+golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
-golang.org/x/net v0.0.0-20180906233101-161cd47e91fd h1:nTDtHvHSdCn1m6ITfMRqtOd/9+7a3s8RBNOZ3eYZzJA=
+golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
-golang.org/x/net v0.0.0-20200813134508-3edf25e44fcc h1:zK/HqS5bZxDptfPJNq8v7vJfXtkU7r9TLIoSr1bXaP4=
-golang.org/x/net v0.0.0-20200813134508-3edf25e44fcc/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
+golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
+golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk=
+golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
+golang.org/x/net v0.0.0-20220607020251-c690dde0001d h1:4SFsTMi4UahlKoloni7L4eYzhFRifURQLw+yv0QDCx8=
+golang.org/x/net v0.0.0-20220607020251-c690dde0001d/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
-golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e h1:o3PsSEY8E4eXWkXrIP9YJALUkVZqzHJT5DOasTyn8Vs=
+golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
-golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
-golang.org/x/sys v0.0.0-20200814200057-3d37ad5750ed h1:J22ig1FUekjjkmZUM7pTKixYm8DvrYsvrBZdunYeIuQ=
-golang.org/x/sys v0.0.0-20200814200057-3d37ad5750ed/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
-golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
+golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a h1:dGzPydgVsqGcTRVwiLJ1jVbufYwmzD3LfVPLKsKg+0k=
+golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
+golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
-golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
-golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
+golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
+golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk=
+golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
-golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
+golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
+golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e h1:4nW4NLDYnU28ojHaHO8OVxFHk/aQ33U01a9cjED+pzE=
+golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
+golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
-golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+golang.org/x/xerrors v0.0.0-20220517211312-f3a8303e98df h1:5Pf6pFKu98ODmgnpvkJ3kFUOQGGLIzLIkbzUHp47618=
+golang.org/x/xerrors v0.0.0-20220517211312-f3a8303e98df/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE=
google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
+google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
+google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk=
+google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
-gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
-gopkg.in/yaml.v2 v2.2.1 h1:mUhvW9EsL+naU5Q3cakzfE91YhliOondGd6ZrsDBHQE=
-gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
-gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU=
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
+gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
+gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
+gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
diff --git a/leveldb/batch.go b/leveldb/batch.go
index 823be93..d5ecf72 100644
--- a/leveldb/batch.go
+++ b/leveldb/batch.go
@@ -32,8 +32,7 @@ func newErrBatchCorrupted(reason string) error {
const (
batchHeaderLen = 8 + 4
- batchGrowRec = 3000
- batchBufioSize = 16
+ batchGrowLimit = 3000
)
// BatchReplay wraps basic batch operations.
@@ -59,10 +58,6 @@ func (index batchIndex) v(data []byte) []byte {
return nil
}
-func (index batchIndex) kv(data []byte) (key, value []byte) {
- return index.k(data), index.v(data)
-}
-
// Batch is a write batch.
type Batch struct {
data []byte
@@ -70,14 +65,24 @@ type Batch struct {
// internalLen is sums of key/value pair length plus 8-bytes internal key.
internalLen int
+
+ // growLimit is the threshold in order to slow down the memory allocation
+ // for batch when the number of accumulated entries exceeds value.
+ //
+ // batchGrowLimit is used as the default threshold if it's not configured.
+ growLimit int
}
func (b *Batch) grow(n int) {
o := len(b.data)
if cap(b.data)-o < n {
+ limit := batchGrowLimit
+ if b.growLimit > 0 {
+ limit = b.growLimit
+ }
div := 1
- if len(b.index) > batchGrowRec {
- div = len(b.index) / batchGrowRec
+ if len(b.index) > limit {
+ div = len(b.index) / limit
}
ndata := make([]byte, o, o+n+o/div)
copy(ndata, b.data)
@@ -223,17 +228,6 @@ func (b *Batch) putMem(seq uint64, mdb *memdb.DB) error {
return nil
}
-func (b *Batch) revertMem(seq uint64, mdb *memdb.DB) error {
- var ik []byte
- for i, index := range b.index {
- ik = makeInternalKey(ik, index.k(b.data), seq+uint64(i), index.keyType)
- if err := mdb.Delete(ik); err != nil {
- return err
- }
- }
- return nil
-}
-
func newBatch() interface{} {
return &Batch{}
}
@@ -243,6 +237,42 @@ func MakeBatch(n int) *Batch {
return &Batch{data: make([]byte, 0, n)}
}
+// BatchConfig contains the config options for batch.
+type BatchConfig struct {
+ // InitialCapacity is the batch initial capacity to preallocate.
+ //
+ // The default value is 0.
+ InitialCapacity int
+
+ // GrowLimit is the limit (in terms of entry) of how much buffer
+ // can grow each cycle.
+ //
+ // Initially the buffer will grow twice its current size until
+ // GrowLimit threshold is reached, after that the buffer will grow
+ // up to GrowLimit each cycle. This buffer grow size in bytes is
+ // loosely calculated from average entry size multiplied by GrowLimit.
+ //
+ // Generally, the memory allocation step is larger if this value
+ // is configured large, vice versa.
+ //
+ // The default value is 3000.
+ GrowLimit int
+}
+
+// MakeBatchWithConfig initializes a batch object with the given configs.
+func MakeBatchWithConfig(config *BatchConfig) *Batch {
+ var batch = new(Batch)
+ if config != nil {
+ if config.InitialCapacity > 0 {
+ batch.data = make([]byte, 0, config.InitialCapacity)
+ }
+ if config.GrowLimit > 0 {
+ batch.growLimit = config.GrowLimit
+ }
+ }
+ return batch
+}
+
func decodeBatch(data []byte, fn func(i int, index batchIndex) error) error {
var index batchIndex
for i, o := 0, 0; o < len(data); i++ {
diff --git a/leveldb/batch_test.go b/leveldb/batch_test.go
index 62775f7..9a9a82c 100644
--- a/leveldb/batch_test.go
+++ b/leveldb/batch_test.go
@@ -9,6 +9,7 @@ package leveldb
import (
"bytes"
"fmt"
+ "math/rand"
"testing"
"testing/quick"
@@ -145,3 +146,33 @@ func TestBatch(t *testing.T) {
}
t.Logf("length=%d internalLen=%d", len(kvs), internalLen)
}
+
+func BenchmarkDefaultBatchWrite(b *testing.B) {
+ benchmarkBatchWrite(b, nil)
+}
+
+func BenchmarkFastAllocationBatchWrite(b *testing.B) {
+ benchmarkBatchWrite(b, &BatchConfig{
+ GrowLimit: 10 * batchGrowLimit,
+ })
+}
+
+func benchmarkBatchWrite(b *testing.B, config *BatchConfig) {
+ var (
+ keys [][]byte
+ vals [][]byte
+ r = rand.New(rand.NewSource(1337))
+ )
+ for i := 0; i < 50000; i++ {
+ keys = append(keys, randomString(r, 32))
+ vals = append(vals, randomString(r, 100))
+ }
+ b.ResetTimer()
+ for round := 0; round < b.N; round++ {
+ batch := MakeBatchWithConfig(config)
+ for i := 0; i < len(keys); i++ {
+ batch.Put(keys[i], vals[i])
+ }
+ }
+ b.ReportAllocs()
+}
diff --git a/leveldb/cache/bench_test.go b/leveldb/cache/bench_test.go
index 89aef69..18fb657 100644
--- a/leveldb/cache/bench_test.go
+++ b/leveldb/cache/bench_test.go
@@ -7,23 +7,195 @@
package cache
import (
- "math/rand"
+ "sync/atomic"
"testing"
- "time"
)
-func BenchmarkLRUCache(b *testing.B) {
- c := NewCache(NewLRU(10000))
+func BenchmarkCache_InsertRemove(b *testing.B) {
+ b.StopTimer()
+ c := NewCache(nil)
- b.SetParallelism(10)
+ b.StartTimer()
+ for i := 0; i < b.N; i++ {
+ c.Get(0, uint64(i), func() (int, Value) {
+ return 1, uint64(i)
+ }).Release()
+ }
+ b.ReportMetric(float64(c.Nodes()), "nodes")
+ b.Logf("STATS: %#v", c.GetStats())
+}
+
+func BenchmarkCache_Insert(b *testing.B) {
+ b.StopTimer()
+ c := NewCache(nil)
+
+ b.StartTimer()
+ for i := 0; i < b.N; i++ {
+ c.Get(0, uint64(i), func() (int, Value) {
+ return 1, uint64(i)
+ })
+ }
+ b.ReportMetric(float64(c.Nodes()), "nodes")
+ b.Logf("STATS: %#v", c.GetStats())
+}
+
+func BenchmarkCache_Lookup(b *testing.B) {
+ b.StopTimer()
+ c := NewCache(nil)
+ for i := 0; i < b.N; i++ {
+ c.Get(0, uint64(i), func() (int, Value) {
+ return 1, uint64(i)
+ })
+ }
+
+ b.StartTimer()
+ for i := 0; i < b.N; i++ {
+ c.Get(0, uint64(i), nil).Release()
+ }
+ b.ReportMetric(float64(c.Nodes()), "nodes")
+ b.Logf("STATS: %#v", c.GetStats())
+}
+
+func BenchmarkCache_AppendRemove(b *testing.B) {
+ b.StopTimer()
+ c := NewCache(nil)
+ for i := 0; i < b.N; i++ {
+ c.Get(0, uint64(i), func() (int, Value) {
+ return 1, uint64(i)
+ })
+ }
+
+ b.StartTimer()
+ for i := 0; i < b.N; i++ {
+ c.Get(1, uint64(i), func() (int, Value) {
+ return 1, uint64(i)
+ }).Release()
+ }
+ b.ReportMetric(float64(c.Nodes()), "nodes")
+ b.Logf("STATS: %#v", c.GetStats())
+}
+
+func BenchmarkCache_Append(b *testing.B) {
+ b.StopTimer()
+ c := NewCache(nil)
+ for i := 0; i < b.N; i++ {
+ c.Get(0, uint64(i), func() (int, Value) {
+ return 1, uint64(i)
+ })
+ }
+
+ b.StartTimer()
+ for i := 0; i < b.N; i++ {
+ c.Get(1, uint64(i), func() (int, Value) {
+ return 1, uint64(i)
+ })
+ }
+ b.ReportMetric(float64(c.Nodes()), "nodes")
+ b.Logf("STATS: %#v", c.GetStats())
+}
+
+func BenchmarkCache_Delete(b *testing.B) {
+ b.StopTimer()
+ c := NewCache(nil)
+ handles := make([]*Handle, b.N)
+ for i := 0; i < b.N; i++ {
+ handles[i] = c.Get(0, uint64(i), func() (int, Value) {
+ return 1, uint64(i)
+ })
+ }
+
+ b.StartTimer()
+ for i := 0; i < b.N; i++ {
+ handles[i].Release()
+ }
+ b.ReportMetric(float64(c.Nodes()), "nodes")
+ b.Logf("STATS: %#v", c.GetStats())
+}
+
+func BenchmarkCacheParallel_Insert(b *testing.B) {
+ b.StopTimer()
+ c := NewCache(nil)
+
+ var ns uint64
+ b.StartTimer()
+ b.RunParallel(func(pb *testing.PB) {
+ ns := atomic.AddUint64(&ns, 1)
+ i := uint64(0)
+ for pb.Next() {
+ c.Get(ns, i, func() (int, Value) {
+ return 1, i
+ })
+ i++
+ }
+ })
+ b.ReportMetric(float64(c.Nodes()), "nodes")
+ b.Logf("STATS: %#v", c.GetStats())
+}
+
+func BenchmarkCacheParallel_Lookup(b *testing.B) {
+ b.StopTimer()
+ c := NewCache(nil)
+ for i := 0; i < b.N; i++ {
+ c.Get(0, uint64(i), func() (int, Value) {
+ return 1, uint64(i)
+ })
+ }
+
+ var counter uint64
+ b.StartTimer()
b.RunParallel(func(pb *testing.PB) {
- r := rand.New(rand.NewSource(time.Now().UnixNano()))
+ for pb.Next() {
+ i := atomic.AddUint64(&counter, 1) - 1
+ c.Get(0, i, nil).Release()
+ }
+ })
+ b.ReportMetric(float64(c.Nodes()), "nodes")
+ b.Logf("STATS: %#v", c.GetStats())
+}
+func BenchmarkCacheParallel_Append(b *testing.B) {
+ b.StopTimer()
+ c := NewCache(nil)
+ for i := 0; i < b.N; i++ {
+ c.Get(0, uint64(i), func() (int, Value) {
+ return 1, uint64(i)
+ })
+ }
+
+ var ns uint64
+ b.StartTimer()
+ b.RunParallel(func(pb *testing.PB) {
+ ns := atomic.AddUint64(&ns, 1)
+ i := uint64(0)
+ for pb.Next() {
+ c.Get(ns, i, func() (int, Value) {
+ return 1, i
+ })
+ i++
+ }
+ })
+ b.ReportMetric(float64(c.Nodes()), "nodes")
+ b.Logf("STATS: %#v", c.GetStats())
+}
+
+func BenchmarkCacheParallel_Delete(b *testing.B) {
+ b.StopTimer()
+ c := NewCache(nil)
+ handles := make([]*Handle, b.N)
+ for i := 0; i < b.N; i++ {
+ handles[i] = c.Get(0, uint64(i), func() (int, Value) {
+ return 1, uint64(i)
+ })
+ }
+
+ var counter int64
+ b.StartTimer()
+ b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
- key := uint64(r.Intn(1000000))
- c.Get(0, key, func() (int, Value) {
- return 1, key
- }).Release()
+ i := atomic.AddInt64(&counter, 1) - 1
+ handles[i].Release()
}
})
+ b.ReportMetric(float64(c.Nodes()), "nodes")
+ b.Logf("STATS: %#v", c.GetStats())
}
diff --git a/leveldb/cache/cache.go b/leveldb/cache/cache.go
index c36ad32..8e4f397 100644
--- a/leveldb/cache/cache.go
+++ b/leveldb/cache/cache.go
@@ -8,6 +8,7 @@
package cache
import (
+ "sort"
"sync"
"sync/atomic"
"unsafe"
@@ -32,18 +33,9 @@ type Cacher interface {
// Evict evicts the 'cache node'.
Evict(n *Node)
-
- // EvictNS evicts 'cache node' with the given namespace.
- EvictNS(ns uint64)
-
- // EvictAll evicts all 'cache node'.
- EvictAll()
-
- // Close closes the 'cache tree'
- Close() error
}
-// Value is a 'cacheable object'. It may implements util.Releaser, if
+// Value is a 'cache-able object'. It may implements util.Releaser, if
// so the the Release method will be called once object is released.
type Value interface{}
@@ -69,32 +61,76 @@ const (
mOverflowGrowThreshold = 1 << 7
)
+const (
+ bucketUninitialized = iota
+ bucketInitialized
+ bucketFrozen
+)
+
+type mNodes []*Node
+
+func (x mNodes) Len() int { return len(x) }
+func (x mNodes) Less(i, j int) bool {
+ a, b := x[i].ns, x[j].ns
+ if a == b {
+ return x[i].key < x[j].key
+ }
+ return a < b
+}
+func (x mNodes) Swap(i, j int) { x[i], x[j] = x[j], x[i] }
+
+func (x mNodes) sort() { sort.Sort(x) }
+
+func (x mNodes) search(ns, key uint64) int {
+ return sort.Search(len(x), func(i int) bool {
+ a := x[i].ns
+ if a == ns {
+ return x[i].key >= key
+ }
+ return a > ns
+ })
+}
+
type mBucket struct {
- mu sync.Mutex
- node []*Node
- frozen bool
+ mu sync.Mutex
+ nodes mNodes
+ state int8
}
-func (b *mBucket) freeze() []*Node {
+func (b *mBucket) freeze() mNodes {
b.mu.Lock()
defer b.mu.Unlock()
- if !b.frozen {
- b.frozen = true
+ if b.state == bucketInitialized {
+ b.state = bucketFrozen
+ } else if b.state == bucketUninitialized {
+ panic("BUG: freeze uninitialized bucket")
}
- return b.node
+ return b.nodes
}
-func (b *mBucket) get(r *Cache, h *mNode, hash uint32, ns, key uint64, noset bool) (done, added bool, n *Node) {
+func (b *mBucket) frozen() bool {
+ if b.state == bucketFrozen {
+ return true
+ }
+ if b.state == bucketUninitialized {
+ panic("BUG: accessing uninitialized bucket")
+ }
+ return false
+}
+
+func (b *mBucket) get(r *Cache, h *mHead, hash uint32, ns, key uint64, getOnly bool) (done, created bool, n *Node) {
b.mu.Lock()
- if b.frozen {
+ if b.frozen() {
b.mu.Unlock()
return
}
- // Scan the node.
- for _, n := range b.node {
- if n.hash == hash && n.ns == ns && n.key == key {
+ // Find the node.
+ i := b.nodes.search(ns, key)
+ if i < len(b.nodes) {
+ n = b.nodes[i]
+ if n.ns == ns && n.key == key {
atomic.AddInt32(&n.ref, 1)
b.mu.Unlock()
return true, false, n
@@ -102,7 +138,7 @@ func (b *mBucket) get(r *Cache, h *mNode, hash uint32, ns, key uint64, noset boo
}
// Get only.
- if noset {
+ if getOnly {
b.mu.Unlock()
return true, false, nil
}
@@ -116,99 +152,106 @@ func (b *mBucket) get(r *Cache, h *mNode, hash uint32, ns, key uint64, noset boo
ref: 1,
}
// Add node to bucket.
- b.node = append(b.node, n)
- bLen := len(b.node)
+ if i == len(b.nodes) {
+ b.nodes = append(b.nodes, n)
+ } else {
+ b.nodes = append(b.nodes[:i+1], b.nodes[i:]...)
+ b.nodes[i] = n
+ }
+ bLen := len(b.nodes)
b.mu.Unlock()
// Update counter.
- grow := atomic.AddInt32(&r.nodes, 1) >= h.growThreshold
+ grow := atomic.AddInt64(&r.statNodes, 1) >= h.growThreshold
if bLen > mOverflowThreshold {
grow = grow || atomic.AddInt32(&h.overflow, 1) >= mOverflowGrowThreshold
}
// Grow.
- if grow && atomic.CompareAndSwapInt32(&h.resizeInProgess, 0, 1) {
+ if grow && atomic.CompareAndSwapInt32(&h.resizeInProgress, 0, 1) {
nhLen := len(h.buckets) << 1
- nh := &mNode{
- buckets: make([]unsafe.Pointer, nhLen),
+ nh := &mHead{
+ buckets: make([]mBucket, nhLen),
mask: uint32(nhLen) - 1,
- pred: unsafe.Pointer(h),
- growThreshold: int32(nhLen * mOverflowThreshold),
- shrinkThreshold: int32(nhLen >> 1),
+ predecessor: unsafe.Pointer(h),
+ growThreshold: int64(nhLen * mOverflowThreshold),
+ shrinkThreshold: int64(nhLen >> 1),
}
ok := atomic.CompareAndSwapPointer(&r.mHead, unsafe.Pointer(h), unsafe.Pointer(nh))
if !ok {
panic("BUG: failed swapping head")
}
+ atomic.AddInt32(&r.statGrow, 1)
go nh.initBuckets()
}
return true, true, n
}
-func (b *mBucket) delete(r *Cache, h *mNode, hash uint32, ns, key uint64) (done, deleted bool) {
+func (b *mBucket) delete(r *Cache, h *mHead, hash uint32, ns, key uint64) (done, deleted bool) {
b.mu.Lock()
- if b.frozen {
+ if b.frozen() {
b.mu.Unlock()
return
}
- // Scan the node.
- var (
- n *Node
- bLen int
- )
- for i := range b.node {
- n = b.node[i]
- if n.ns == ns && n.key == key {
- if atomic.LoadInt32(&n.ref) == 0 {
- deleted = true
+ // Find the node.
+ i := b.nodes.search(ns, key)
+ if i == len(b.nodes) {
+ b.mu.Unlock()
+ return true, false
+ }
+ n := b.nodes[i]
+ var bLen int
+ if n.ns == ns && n.key == key {
+ if atomic.LoadInt32(&n.ref) == 0 {
+ deleted = true
+ // Save and clear value.
+ if n.value != nil {
// Call releaser.
- if n.value != nil {
- if r, ok := n.value.(util.Releaser); ok {
- r.Release()
- }
- n.value = nil
+ if r, ok := n.value.(util.Releaser); ok {
+ r.Release()
}
-
- // Remove node from bucket.
- b.node = append(b.node[:i], b.node[i+1:]...)
- bLen = len(b.node)
+ n.value = nil
}
- break
+
+ // Remove node from bucket.
+ b.nodes = append(b.nodes[:i], b.nodes[i+1:]...)
+ bLen = len(b.nodes)
}
}
b.mu.Unlock()
if deleted {
- // Call OnDel.
- for _, f := range n.onDel {
+ // Call delete funcs.
+ for _, f := range n.delFuncs {
f()
}
// Update counter.
- atomic.AddInt32(&r.size, int32(n.size)*-1)
- shrink := atomic.AddInt32(&r.nodes, -1) < h.shrinkThreshold
+ atomic.AddInt64(&r.statSize, int64(n.size)*-1)
+ shrink := atomic.AddInt64(&r.statNodes, -1) < h.shrinkThreshold
if bLen >= mOverflowThreshold {
atomic.AddInt32(&h.overflow, -1)
}
// Shrink.
- if shrink && len(h.buckets) > mInitialSize && atomic.CompareAndSwapInt32(&h.resizeInProgess, 0, 1) {
+ if shrink && len(h.buckets) > mInitialSize && atomic.CompareAndSwapInt32(&h.resizeInProgress, 0, 1) {
nhLen := len(h.buckets) >> 1
- nh := &mNode{
- buckets: make([]unsafe.Pointer, nhLen),
+ nh := &mHead{
+ buckets: make([]mBucket, nhLen),
mask: uint32(nhLen) - 1,
- pred: unsafe.Pointer(h),
- growThreshold: int32(nhLen * mOverflowThreshold),
- shrinkThreshold: int32(nhLen >> 1),
+ predecessor: unsafe.Pointer(h),
+ growThreshold: int64(nhLen * mOverflowThreshold),
+ shrinkThreshold: int64(nhLen >> 1),
}
ok := atomic.CompareAndSwapPointer(&r.mHead, unsafe.Pointer(h), unsafe.Pointer(nh))
if !ok {
panic("BUG: failed swapping head")
}
+ atomic.AddInt32(&r.statShrink, 1)
go nh.initBuckets()
}
}
@@ -216,95 +259,134 @@ func (b *mBucket) delete(r *Cache, h *mNode, hash uint32, ns, key uint64) (done,
return true, deleted
}
-type mNode struct {
- buckets []unsafe.Pointer // []*mBucket
- mask uint32
- pred unsafe.Pointer // *mNode
- resizeInProgess int32
+type mHead struct {
+ buckets []mBucket
+ mask uint32
+ predecessor unsafe.Pointer // *mNode
+ resizeInProgress int32
overflow int32
- growThreshold int32
- shrinkThreshold int32
+ growThreshold int64
+ shrinkThreshold int64
}
-func (n *mNode) initBucket(i uint32) *mBucket {
- if b := (*mBucket)(atomic.LoadPointer(&n.buckets[i])); b != nil {
+func (h *mHead) initBucket(i uint32) *mBucket {
+ b := &h.buckets[i]
+ b.mu.Lock()
+ if b.state >= bucketInitialized {
+ b.mu.Unlock()
return b
}
- p := (*mNode)(atomic.LoadPointer(&n.pred))
- if p != nil {
- var node []*Node
- if n.mask > p.mask {
- // Grow.
- pb := (*mBucket)(atomic.LoadPointer(&p.buckets[i&p.mask]))
- if pb == nil {
- pb = p.initBucket(i & p.mask)
- }
- m := pb.freeze()
- // Split nodes.
- for _, x := range m {
- if x.hash&n.mask == i {
- node = append(node, x)
- }
- }
- } else {
- // Shrink.
- pb0 := (*mBucket)(atomic.LoadPointer(&p.buckets[i]))
- if pb0 == nil {
- pb0 = p.initBucket(i)
- }
- pb1 := (*mBucket)(atomic.LoadPointer(&p.buckets[i+uint32(len(n.buckets))]))
- if pb1 == nil {
- pb1 = p.initBucket(i + uint32(len(n.buckets)))
- }
- m0 := pb0.freeze()
- m1 := pb1.freeze()
- // Merge nodes.
- node = make([]*Node, 0, len(m0)+len(m1))
- node = append(node, m0...)
- node = append(node, m1...)
- }
- b := &mBucket{node: node}
- if atomic.CompareAndSwapPointer(&n.buckets[i], nil, unsafe.Pointer(b)) {
- if len(node) > mOverflowThreshold {
- atomic.AddInt32(&n.overflow, int32(len(node)-mOverflowThreshold))
+ p := (*mHead)(atomic.LoadPointer(&h.predecessor))
+ if p == nil {
+ panic("BUG: uninitialized bucket doesn't have predecessor")
+ }
+
+ var nodes mNodes
+ if h.mask > p.mask {
+ // Grow.
+ m := p.initBucket(i & p.mask).freeze()
+ // Split nodes.
+ for _, x := range m {
+ if x.hash&h.mask == i {
+ nodes = append(nodes, x)
}
- return b
}
+ } else {
+ // Shrink.
+ m0 := p.initBucket(i).freeze()
+ m1 := p.initBucket(i + uint32(len(h.buckets))).freeze()
+ // Merge nodes.
+ nodes = make(mNodes, 0, len(m0)+len(m1))
+ nodes = append(nodes, m0...)
+ nodes = append(nodes, m1...)
+ nodes.sort()
+ }
+ b.nodes = nodes
+ b.state = bucketInitialized
+ b.mu.Unlock()
+ return b
+}
+
+func (h *mHead) initBuckets() {
+ for i := range h.buckets {
+ h.initBucket(uint32(i))
}
+ atomic.StorePointer(&h.predecessor, nil)
+}
- return (*mBucket)(atomic.LoadPointer(&n.buckets[i]))
+func (h *mHead) enumerateNodesWithCB(f func([]*Node)) {
+ var nodes []*Node
+ for x := range h.buckets {
+ b := h.initBucket(uint32(x))
+
+ b.mu.Lock()
+ nodes = append(nodes, b.nodes...)
+ b.mu.Unlock()
+ f(nodes)
+ }
}
-func (n *mNode) initBuckets() {
- for i := range n.buckets {
- n.initBucket(uint32(i))
+func (h *mHead) enumerateNodesByNS(ns uint64) []*Node {
+ var nodes []*Node
+ for x := range h.buckets {
+ b := h.initBucket(uint32(x))
+
+ b.mu.Lock()
+ i := b.nodes.search(ns, 0)
+ for ; i < len(b.nodes); i++ {
+ n := b.nodes[i]
+ if n.ns != ns {
+ break
+ }
+ nodes = append(nodes, n)
+ }
+ b.mu.Unlock()
}
- atomic.StorePointer(&n.pred, nil)
+ return nodes
+}
+
+type Stats struct {
+ Buckets int
+ Nodes int64
+ Size int64
+ GrowCount int32
+ ShrinkCount int32
+ HitCount int64
+ MissCount int64
+ SetCount int64
+ DelCount int64
}
// Cache is a 'cache map'.
type Cache struct {
mu sync.RWMutex
mHead unsafe.Pointer // *mNode
- nodes int32
- size int32
cacher Cacher
closed bool
+
+ statNodes int64
+ statSize int64
+ statGrow int32
+ statShrink int32
+ statHit int64
+ statMiss int64
+ statSet int64
+ statDel int64
}
// NewCache creates a new 'cache map'. The cacher is optional and
// may be nil.
func NewCache(cacher Cacher) *Cache {
- h := &mNode{
- buckets: make([]unsafe.Pointer, mInitialSize),
+ h := &mHead{
+ buckets: make([]mBucket, mInitialSize),
mask: mInitialSize - 1,
- growThreshold: int32(mInitialSize * mOverflowThreshold),
+ growThreshold: int64(mInitialSize * mOverflowThreshold),
shrinkThreshold: 0,
}
for i := range h.buckets {
- h.buckets[i] = unsafe.Pointer(&mBucket{})
+ h.buckets[i].state = bucketInitialized
}
r := &Cache{
mHead: unsafe.Pointer(h),
@@ -313,14 +395,20 @@ func NewCache(cacher Cacher) *Cache {
return r
}
-func (r *Cache) getBucket(hash uint32) (*mNode, *mBucket) {
- h := (*mNode)(atomic.LoadPointer(&r.mHead))
+func (r *Cache) getBucket(hash uint32) (*mHead, *mBucket) {
+ h := (*mHead)(atomic.LoadPointer(&r.mHead))
i := hash & h.mask
- b := (*mBucket)(atomic.LoadPointer(&h.buckets[i]))
- if b == nil {
- b = h.initBucket(i)
- }
- return h, b
+ return h, h.initBucket(i)
+}
+
+func (r *Cache) enumerateNodesWithCB(f func([]*Node)) {
+ h := (*mHead)(atomic.LoadPointer(&r.mHead))
+ h.enumerateNodesWithCB(f)
+}
+
+func (r *Cache) enumerateNodesByNS(ns uint64) []*Node {
+ h := (*mHead)(atomic.LoadPointer(&r.mHead))
+ return h.enumerateNodesByNS(ns)
}
func (r *Cache) delete(n *Node) bool {
@@ -333,14 +421,29 @@ func (r *Cache) delete(n *Node) bool {
}
}
+// GetStats returns cache statistics.
+func (r *Cache) GetStats() Stats {
+ return Stats{
+ Buckets: len((*mHead)(atomic.LoadPointer(&r.mHead)).buckets),
+ Nodes: atomic.LoadInt64(&r.statNodes),
+ Size: atomic.LoadInt64(&r.statSize),
+ GrowCount: atomic.LoadInt32(&r.statGrow),
+ ShrinkCount: atomic.LoadInt32(&r.statShrink),
+ HitCount: atomic.LoadInt64(&r.statHit),
+ MissCount: atomic.LoadInt64(&r.statMiss),
+ SetCount: atomic.LoadInt64(&r.statSet),
+ DelCount: atomic.LoadInt64(&r.statDel),
+ }
+}
+
// Nodes returns number of 'cache node' in the map.
func (r *Cache) Nodes() int {
- return int(atomic.LoadInt32(&r.nodes))
+ return int(atomic.LoadInt64(&r.statNodes))
}
// Size returns sums of 'cache node' size in the map.
func (r *Cache) Size() int {
- return int(atomic.LoadInt32(&r.size))
+ return int(atomic.LoadInt64(&r.statSize))
}
// Capacity returns cache capacity.
@@ -374,14 +477,20 @@ func (r *Cache) Get(ns, key uint64, setFunc func() (size int, value Value)) *Han
hash := murmur32(ns, key, 0xf00)
for {
h, b := r.getBucket(hash)
- done, _, n := b.get(r, h, hash, ns, key, setFunc == nil)
+ done, created, n := b.get(r, h, hash, ns, key, setFunc == nil)
if done {
+ if created || n == nil {
+ atomic.AddInt64(&r.statMiss, 1)
+ } else {
+ atomic.AddInt64(&r.statHit, 1)
+ }
+
if n != nil {
n.mu.Lock()
if n.value == nil {
if setFunc == nil {
n.mu.Unlock()
- n.unref()
+ n.unRefInternal(false)
return nil
}
@@ -389,10 +498,11 @@ func (r *Cache) Get(ns, key uint64, setFunc func() (size int, value Value)) *Han
if n.value == nil {
n.size = 0
n.mu.Unlock()
- n.unref()
+ n.unRefInternal(false)
return nil
}
- atomic.AddInt32(&r.size, int32(n.size))
+ atomic.AddInt64(&r.statSet, 1)
+ atomic.AddInt64(&r.statSize, int64(n.size))
}
n.mu.Unlock()
if r.cacher != nil {
@@ -412,11 +522,11 @@ func (r *Cache) Get(ns, key uint64, setFunc func() (size int, value Value)) *Han
// only attributed to the particular 'cache node', so when a 'cache node'
// is recreated it will not be banned.
//
-// If onDel is not nil, then it will be executed if such 'cache node'
+// If delFunc is not nil, then it will be executed if such 'cache node'
// doesn't exist or once the 'cache node' is released.
//
// Delete return true is such 'cache node' exist.
-func (r *Cache) Delete(ns, key uint64, onDel func()) bool {
+func (r *Cache) Delete(ns, key uint64, delFunc func()) bool {
r.mu.RLock()
defer r.mu.RUnlock()
if r.closed {
@@ -429,15 +539,15 @@ func (r *Cache) Delete(ns, key uint64, onDel func()) bool {
done, _, n := b.get(r, h, hash, ns, key, true)
if done {
if n != nil {
- if onDel != nil {
+ if delFunc != nil {
n.mu.Lock()
- n.onDel = append(n.onDel, onDel)
+ n.delFuncs = append(n.delFuncs, delFunc)
n.mu.Unlock()
}
if r.cacher != nil {
r.cacher.Ban(n)
}
- n.unref()
+ n.unRefInternal(true)
return true
}
@@ -445,8 +555,8 @@ func (r *Cache) Delete(ns, key uint64, onDel func()) bool {
}
}
- if onDel != nil {
- onDel()
+ if delFunc != nil {
+ delFunc()
}
return false
@@ -472,7 +582,7 @@ func (r *Cache) Evict(ns, key uint64) bool {
if r.cacher != nil {
r.cacher.Evict(n)
}
- n.unref()
+ n.unRefInternal(true)
return true
}
@@ -484,7 +594,7 @@ func (r *Cache) Evict(ns, key uint64) bool {
}
// EvictNS evicts 'cache node' with the given namespace. This will
-// simply call Cacher.EvictNS.
+// simply call Cacher.Evict on all nodes with the given namespace.
func (r *Cache) EvictNS(ns uint64) {
r.mu.RLock()
defer r.mu.RUnlock()
@@ -493,10 +603,21 @@ func (r *Cache) EvictNS(ns uint64) {
}
if r.cacher != nil {
- r.cacher.EvictNS(ns)
+ nodes := r.enumerateNodesByNS(ns)
+ for _, n := range nodes {
+ r.cacher.Evict(n)
+ }
}
}
+func (r *Cache) evictAll() {
+ r.enumerateNodesWithCB(func(nodes []*Node) {
+ for _, n := range nodes {
+ r.cacher.Evict(n)
+ }
+ })
+}
+
// EvictAll evicts all 'cache node'. This will simply call Cacher.EvictAll.
func (r *Cache) EvictAll() {
r.mu.RLock()
@@ -506,66 +627,46 @@ func (r *Cache) EvictAll() {
}
if r.cacher != nil {
- r.cacher.EvictAll()
+ r.evictAll()
}
}
-// Close closes the 'cache map' and forcefully releases all 'cache node'.
-func (r *Cache) Close() error {
+// Close closes the 'cache map'.
+// All 'Cache' method is no-op after 'cache map' is closed.
+// All 'cache node' will be evicted from 'cacher'.
+//
+// If 'force' is true then all 'cache node' will be forcefully released
+// even if the 'node ref' is not zero.
+func (r *Cache) Close(force bool) {
+ var head *mHead
+ // Hold RW-lock to make sure no more in-flight operations.
r.mu.Lock()
if !r.closed {
r.closed = true
+ head = (*mHead)(atomic.LoadPointer(&r.mHead))
+ atomic.StorePointer(&r.mHead, nil)
+ }
+ r.mu.Unlock()
- h := (*mNode)(r.mHead)
- h.initBuckets()
+ if head != nil {
+ head.enumerateNodesWithCB(func(nodes []*Node) {
+ for _, n := range nodes {
+ // Zeroing ref. Prevent unRefExternal to call finalizer.
+ if force {
+ atomic.StoreInt32(&n.ref, 0)
+ }
- for i := range h.buckets {
- b := (*mBucket)(h.buckets[i])
- for _, n := range b.node {
- // Call releaser.
- if n.value != nil {
- if r, ok := n.value.(util.Releaser); ok {
- r.Release()
- }
- n.value = nil
+ // Evict from cacher.
+ if r.cacher != nil {
+ r.cacher.Evict(n)
}
- // Call OnDel.
- for _, f := range n.onDel {
- f()
+ if force {
+ n.callFinalizer()
}
- n.onDel = nil
}
- }
+ })
}
- r.mu.Unlock()
-
- // Avoid deadlock.
- if r.cacher != nil {
- if err := r.cacher.Close(); err != nil {
- return err
- }
- }
- return nil
-}
-
-// CloseWeak closes the 'cache map' and evict all 'cache node' from cacher, but
-// unlike Close it doesn't forcefully releases 'cache node'.
-func (r *Cache) CloseWeak() error {
- r.mu.Lock()
- if !r.closed {
- r.closed = true
- }
- r.mu.Unlock()
-
- // Avoid deadlock.
- if r.cacher != nil {
- r.cacher.EvictAll()
- if err := r.cacher.Close(); err != nil {
- return err
- }
- }
- return nil
}
// Node is a 'cache node'.
@@ -579,8 +680,8 @@ type Node struct {
size int
value Value
- ref int32
- onDel []func()
+ ref int32
+ delFuncs []func()
CacheData unsafe.Pointer
}
@@ -618,17 +719,39 @@ func (n *Node) GetHandle() *Handle {
return &Handle{unsafe.Pointer(n)}
}
-func (n *Node) unref() {
+func (n *Node) callFinalizer() {
+ // Call releaser.
+ if n.value != nil {
+ if r, ok := n.value.(util.Releaser); ok {
+ r.Release()
+ }
+ n.value = nil
+ }
+
+ // Call delete funcs.
+ for _, f := range n.delFuncs {
+ f()
+ }
+ n.delFuncs = nil
+}
+
+func (n *Node) unRefInternal(updateStat bool) {
if atomic.AddInt32(&n.ref, -1) == 0 {
n.r.delete(n)
+ if updateStat {
+ atomic.AddInt64(&n.r.statDel, 1)
+ }
}
}
-func (n *Node) unrefLocked() {
+func (n *Node) unRefExternal() {
if atomic.AddInt32(&n.ref, -1) == 0 {
n.r.mu.RLock()
- if !n.r.closed {
+ if n.r.closed {
+ n.callFinalizer()
+ } else {
n.r.delete(n)
+ atomic.AddInt64(&n.r.statDel, 1)
}
n.r.mu.RUnlock()
}
@@ -654,7 +777,7 @@ func (h *Handle) Release() {
nPtr := atomic.LoadPointer(&h.n)
if nPtr != nil && atomic.CompareAndSwapPointer(&h.n, nPtr, nil) {
n := (*Node)(nPtr)
- n.unrefLocked()
+ n.unRefExternal()
}
}
diff --git a/leveldb/cache/cache_test.go b/leveldb/cache/cache_test.go
index 6b017bd..e94b06c 100644
--- a/leveldb/cache/cache_test.go
+++ b/leveldb/cache/cache_test.go
@@ -14,6 +14,9 @@ import (
"testing"
"time"
"unsafe"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
)
type int32o int32
@@ -50,13 +53,74 @@ func set(c *Cache, ns, key uint64, value Value, charge int, relf func()) *Handle
})
}
-type cacheMapTestParams struct {
- nobjects, nhandles, concurrent, repeat int
+func shuffleNodes(nodes mNodes) mNodes {
+ shuffled := append(mNodes(nil), nodes...)
+ rand.Shuffle(len(shuffled), func(i, j int) {
+ shuffled[i], shuffled[j] = shuffled[j], shuffled[i]
+ })
+ return shuffled
+}
+
+func generateSortedNodes(nNS, minNKey, maxNKey int) mNodes {
+ var generated mNodes
+ for i := 0; i < nNS; i++ {
+ nKey := minNKey
+ if maxNKey-minNKey > 0 {
+ nKey += rand.Intn(maxNKey - minNKey)
+ }
+ for j := 0; j < nKey; j++ {
+ generated = append(generated, &Node{ns: uint64(i), key: uint64(j)})
+ }
+ }
+ return generated
+}
+
+func TestNodesSort(t *testing.T) {
+ testFunc := func(nNS, minNKey, maxNKey int) func(t *testing.T) {
+ return func(t *testing.T) {
+ sorted := generateSortedNodes(nNS, minNKey, maxNKey)
+ for i := 0; i < 3; i++ {
+ shuffled := shuffleNodes(sorted)
+ require.NotEqual(t, sorted, shuffled)
+ shuffled.sort()
+ require.Equal(t, sorted, shuffled)
+ }
+ for i, x := range sorted {
+ r := sorted.search(x.ns, x.key)
+ require.Equal(t, i, r)
+ }
+ }
+ }
+
+ t.Run("SingleNS", testFunc(1, 100, 100))
+ t.Run("MultipleNS", testFunc(10, 1, 10))
+
+ t.Run("SearchInexact", func(t *testing.T) {
+ data := mNodes{
+ &Node{ns: 0, key: 2},
+ &Node{ns: 0, key: 3},
+ &Node{ns: 0, key: 4},
+ &Node{ns: 2, key: 1},
+ &Node{ns: 2, key: 2},
+ &Node{ns: 2, key: 3},
+ }
+ require.Equal(t, 0, data.search(0, 1))
+ require.Equal(t, 0, data.search(0, 2))
+ require.Equal(t, 3, data.search(0, 5))
+ require.Equal(t, 3, data.search(1, 1001000))
+ require.Equal(t, 5, data.search(2, 3))
+ require.Equal(t, 6, data.search(2, 4))
+ require.Equal(t, 6, data.search(10, 10))
+ })
}
func TestCacheMap(t *testing.T) {
runtime.GOMAXPROCS(runtime.NumCPU())
+ type cacheMapTestParams struct {
+ nObjects, nHandles, concurrent, repeat int
+ }
+
var params []cacheMapTestParams
if testing.Short() {
params = []cacheMapTestParams{
@@ -76,8 +140,8 @@ func TestCacheMap(t *testing.T) {
)
for _, x := range params {
- objects = append(objects, make([]int32o, x.nobjects))
- handles = append(handles, make([]unsafe.Pointer, x.nhandles))
+ objects = append(objects, make([]int32o, x.nObjects))
+ handles = append(handles, make([]unsafe.Pointer, x.nHandles))
}
c := NewCache(nil)
@@ -85,34 +149,43 @@ func TestCacheMap(t *testing.T) {
wg := new(sync.WaitGroup)
var done int32
- for ns, x := range params {
- for i := 0; i < x.concurrent; i++ {
+ for id, param := range params {
+ id := id
+ param := param
+ objects := objects[id]
+ handles := handles[id]
+ for job := 0; job < param.concurrent; job++ {
wg.Add(1)
- go func(ns, i, repeat int, objects []int32o, handles []unsafe.Pointer) {
+ go func() {
defer wg.Done()
+
r := rand.New(rand.NewSource(time.Now().UnixNano()))
+ for j := len(objects) * param.repeat; j >= 0; j-- {
+ if t.Failed() {
+ return
+ }
- for j := len(objects) * repeat; j >= 0; j-- {
- key := uint64(r.Intn(len(objects)))
- h := c.Get(uint64(ns), key, func() (int, Value) {
- o := &objects[key]
+ i := r.Intn(len(objects))
+ h := c.Get(uint64(id), uint64(i), func() (int, Value) {
+ o := &objects[i]
o.acquire()
return 1, o
})
- if v := h.Value().(*int32o); v != &objects[key] {
- t.Fatalf("#%d invalid value: want=%p got=%p", ns, &objects[key], v)
+ if !assert.True(t, h.Value().(*int32o) == &objects[i]) {
+ return
}
- if objects[key] != 1 {
- t.Fatalf("#%d invalid object %d: %d", ns, key, objects[key])
+ if !assert.True(t, objects[i] == 1) {
+ return
}
if !atomic.CompareAndSwapPointer(&handles[r.Intn(len(handles))], nil, unsafe.Pointer(h)) {
h.Release()
}
}
- }(ns, i, x.repeat, objects[ns], handles[ns])
+ }()
}
- go func(handles []unsafe.Pointer) {
+ // Randomly release handles at interval.
+ go func() {
r := rand.New(rand.NewSource(time.Now().UnixNano()))
for atomic.LoadInt32(&done) == 0 {
@@ -123,9 +196,11 @@ func TestCacheMap(t *testing.T) {
}
time.Sleep(time.Millisecond)
}
- }(handles[ns])
+ }()
}
+ // Emulate constant grow-shrink.
+ growShrinkStop := make(chan bool, 1)
go func() {
handles := make([]*Handle, 100000)
for atomic.LoadInt32(&done) == 0 {
@@ -138,55 +213,55 @@ func TestCacheMap(t *testing.T) {
h.Release()
}
}
+ growShrinkStop <- true
}()
wg.Wait()
-
atomic.StoreInt32(&done, 1)
- for _, handles0 := range handles {
- for i := range handles0 {
- h := (*Handle)(atomic.LoadPointer(&handles0[i]))
- if h != nil && atomic.CompareAndSwapPointer(&handles0[i], unsafe.Pointer(h), nil) {
+ // Releasing handles.
+ activeCount := 0
+ for _, handle := range handles {
+ for i := range handle {
+ h := (*Handle)(atomic.LoadPointer(&handle[i]))
+ if h != nil && atomic.CompareAndSwapPointer(&handle[i], unsafe.Pointer(h), nil) {
+ activeCount++
h.Release()
}
}
}
+ t.Logf("active_handles=%d", activeCount)
- for ns, objects0 := range objects {
- for i, o := range objects0 {
- if o != 0 {
- t.Fatalf("invalid object #%d.%d: ref=%d", ns, i, o)
- }
+ // Checking object refs
+ for id, object := range objects {
+ for i, o := range object {
+ require.EqualValues(t, 0, o, "invalid object ref: %d-%03d", id, i)
}
}
+
+ <-growShrinkStop
+
+ require.Zero(t, c.Nodes())
+ require.Zero(t, c.Size())
+ t.Logf("STATS: %#v", c.GetStats())
}
func TestCacheMap_NodesAndSize(t *testing.T) {
c := NewCache(nil)
- if c.Nodes() != 0 {
- t.Errorf("invalid nodes counter: want=%d got=%d", 0, c.Nodes())
- }
- if c.Size() != 0 {
- t.Errorf("invalid size counter: want=%d got=%d", 0, c.Size())
- }
+ require.Zero(t, c.Capacity())
+ require.Zero(t, c.Nodes())
+ require.Zero(t, c.Size())
set(c, 0, 1, 1, 1, nil)
set(c, 0, 2, 2, 2, nil)
set(c, 1, 1, 3, 3, nil)
set(c, 2, 1, 4, 1, nil)
- if c.Nodes() != 4 {
- t.Errorf("invalid nodes counter: want=%d got=%d", 4, c.Nodes())
- }
- if c.Size() != 7 {
- t.Errorf("invalid size counter: want=%d got=%d", 4, c.Size())
- }
+ require.Equal(t, 4, c.Nodes())
+ require.Equal(t, 7, c.Size())
}
func TestLRUCache_Capacity(t *testing.T) {
c := NewCache(NewLRU(10))
- if c.Capacity() != 10 {
- t.Errorf("invalid capacity: want=%d got=%d", 10, c.Capacity())
- }
+ require.Equal(t, 10, c.Capacity())
set(c, 0, 1, 1, 1, nil).Release()
set(c, 0, 2, 2, 2, nil).Release()
set(c, 1, 1, 3, 3, nil).Release()
@@ -195,22 +270,12 @@ func TestLRUCache_Capacity(t *testing.T) {
set(c, 2, 3, 6, 1, nil).Release()
set(c, 2, 4, 7, 1, nil).Release()
set(c, 2, 5, 8, 1, nil).Release()
- if c.Nodes() != 7 {
- t.Errorf("invalid nodes counter: want=%d got=%d", 7, c.Nodes())
- }
- if c.Size() != 10 {
- t.Errorf("invalid size counter: want=%d got=%d", 10, c.Size())
- }
+ require.Equal(t, 7, c.Nodes())
+ require.Equal(t, 10, c.Size())
c.SetCapacity(9)
- if c.Capacity() != 9 {
- t.Errorf("invalid capacity: want=%d got=%d", 9, c.Capacity())
- }
- if c.Nodes() != 6 {
- t.Errorf("invalid nodes counter: want=%d got=%d", 6, c.Nodes())
- }
- if c.Size() != 8 {
- t.Errorf("invalid size counter: want=%d got=%d", 8, c.Size())
- }
+ require.Equal(t, 9, c.Capacity())
+ require.Equal(t, 6, c.Nodes())
+ require.Equal(t, 8, c.Size())
}
func TestCacheMap_NilValue(t *testing.T) {
@@ -218,15 +283,9 @@ func TestCacheMap_NilValue(t *testing.T) {
h := c.Get(0, 0, func() (size int, value Value) {
return 1, nil
})
- if h != nil {
- t.Error("cache handle is non-nil")
- }
- if c.Nodes() != 0 {
- t.Errorf("invalid nodes counter: want=%d got=%d", 0, c.Nodes())
- }
- if c.Size() != 0 {
- t.Errorf("invalid size counter: want=%d got=%d", 0, c.Size())
- }
+ require.Nil(t, h)
+ require.Zero(t, c.Nodes())
+ require.Zero(t, c.Size())
}
func TestLRUCache_GetLatency(t *testing.T) {
@@ -237,7 +296,7 @@ func TestLRUCache_GetLatency(t *testing.T) {
concurrentGet = 3
duration = 3 * time.Second
delay = 3 * time.Millisecond
- maxkey = 100000
+ maxKey = 100000
)
var (
@@ -254,7 +313,7 @@ func TestLRUCache_GetLatency(t *testing.T) {
defer wg.Done()
r := rand.New(rand.NewSource(time.Now().UnixNano()))
for time.Now().Before(until) {
- c.Get(0, uint64(r.Intn(maxkey)), func() (int, Value) {
+ c.Get(0, uint64(r.Intn(maxKey)), func() (int, Value) {
time.Sleep(delay)
atomic.AddInt32(&set, 1)
return 1, 1
@@ -270,8 +329,8 @@ func TestLRUCache_GetLatency(t *testing.T) {
for {
mark := time.Now()
if mark.Before(until) {
- h := c.Get(0, uint64(r.Intn(maxkey)), nil)
- latency := int64(time.Now().Sub(mark))
+ h := c.Get(0, uint64(r.Intn(maxKey)), nil)
+ latency := int64(time.Since(mark))
m := atomic.LoadInt64(&getMaxLatency)
if latency > m {
atomic.CompareAndSwapInt64(&getMaxLatency, m, latency)
@@ -290,13 +349,12 @@ func TestLRUCache_GetLatency(t *testing.T) {
}
wg.Wait()
- getAvglatency := time.Duration(getDuration) / time.Duration(getAll)
+ getAvgLatency := time.Duration(getDuration) / time.Duration(getAll)
t.Logf("set=%d getHit=%d getAll=%d getMaxLatency=%v getAvgLatency=%v",
- set, getHit, getAll, time.Duration(getMaxLatency), getAvglatency)
+ set, getHit, getAll, time.Duration(getMaxLatency), getAvgLatency)
- if getAvglatency > delay/3 {
- t.Errorf("get avg latency > %v: got=%v", delay/3, getAvglatency)
- }
+ require.LessOrEqual(t, getAvgLatency, delay/3)
+ t.Logf("STATS: %#v", c.GetStats())
}
func TestLRUCache_HitMiss(t *testing.T) {
@@ -316,28 +374,21 @@ func TestLRUCache_HitMiss(t *testing.T) {
{9, "v9"},
}
- setfin := 0
+ setFin := 0
c := NewCache(NewLRU(1000))
for i, x := range cases {
set(c, 0, x.key, x.value, len(x.value), func() {
- setfin++
+ setFin++
}).Release()
for j, y := range cases {
h := c.Get(0, y.key, nil)
if j <= i {
// should hit
- if h == nil {
- t.Errorf("case '%d' iteration '%d' is miss", i, j)
- } else {
- if x := h.Value().(releaserFunc).value.(string); x != y.value {
- t.Errorf("case '%d' iteration '%d' has invalid value got '%s', want '%s'", i, j, x, y.value)
- }
- }
+ require.NotNilf(t, h, "case '%d' iteration '%d' should hit", i, j)
+ require.Equalf(t, y.value, h.Value().(releaserFunc).value, "case '%d' iteration '%d' should have valid value", i, j)
} else {
// should miss
- if h != nil {
- t.Errorf("case '%d' iteration '%d' is hit , value '%s'", i, j, h.Value().(releaserFunc).value.(string))
- }
+ require.Nilf(t, h, "case '%d' iteration '%d' should miss", i, j)
}
if h != nil {
h.Release()
@@ -351,26 +402,17 @@ func TestLRUCache_HitMiss(t *testing.T) {
finalizerOk = true
})
- if !finalizerOk {
- t.Errorf("case %d delete finalizer not executed", i)
- }
+ require.True(t, finalizerOk)
for j, y := range cases {
h := c.Get(0, y.key, nil)
if j > i {
// should hit
- if h == nil {
- t.Errorf("case '%d' iteration '%d' is miss", i, j)
- } else {
- if x := h.Value().(releaserFunc).value.(string); x != y.value {
- t.Errorf("case '%d' iteration '%d' has invalid value got '%s', want '%s'", i, j, x, y.value)
- }
- }
+ require.NotNilf(t, h, "case '%d' iteration '%d' should hit", i, j)
+ require.Equalf(t, y.value, h.Value().(releaserFunc).value, "case '%d' iteration '%d' should have valid value", i, j)
} else {
// should miss
- if h != nil {
- t.Errorf("case '%d' iteration '%d' is hit, value '%s'", i, j, h.Value().(releaserFunc).value.(string))
- }
+ require.Nilf(t, h, "case '%d' iteration '%d' should miss", i, j)
}
if h != nil {
h.Release()
@@ -378,9 +420,7 @@ func TestLRUCache_HitMiss(t *testing.T) {
}
}
- if setfin != len(cases) {
- t.Errorf("some set finalizer may not be executed, want=%d got=%d", len(cases), setfin)
- }
+ require.Equal(t, len(cases), setFin, "some set finalizer may not be executed")
}
func TestLRUCache_Eviction(t *testing.T) {
@@ -397,84 +437,64 @@ func TestLRUCache_Eviction(t *testing.T) {
for _, key := range []uint64{9, 2, 5, 1} {
h := c.Get(0, key, nil)
- if h == nil {
- t.Errorf("miss for key '%d'", key)
- } else {
- if x := h.Value().(int); x != int(key) {
- t.Errorf("invalid value for key '%d' want '%d', got '%d'", key, key, x)
- }
- h.Release()
- }
+ require.NotNilf(t, h, "miss for key '%d'", key)
+ require.Equalf(t, int(key), h.Value(), "invalid value for key '%d'", key)
+ h.Release()
}
o1.Release()
for _, key := range []uint64{1, 2, 5} {
h := c.Get(0, key, nil)
- if h == nil {
- t.Errorf("miss for key '%d'", key)
- } else {
- if x := h.Value().(int); x != int(key) {
- t.Errorf("invalid value for key '%d' want '%d', got '%d'", key, key, x)
- }
- h.Release()
- }
+ require.NotNilf(t, h, "miss for key '%d'", key)
+ require.Equalf(t, int(key), h.Value(), "invalid value for key '%d'", key)
+ h.Release()
}
for _, key := range []uint64{3, 4, 9} {
h := c.Get(0, key, nil)
- if h != nil {
- t.Errorf("hit for key '%d'", key)
- if x := h.Value().(int); x != int(key) {
- t.Errorf("invalid value for key '%d' want '%d', got '%d'", key, key, x)
- }
+ if !assert.Nilf(t, h, "hit for key '%d'", key) {
+ require.Equalf(t, int(key), h.Value(), "invalid value for key '%d'", key)
h.Release()
}
}
}
func TestLRUCache_Evict(t *testing.T) {
- c := NewCache(NewLRU(6))
+ lru := NewLRU(6).(*lru)
+ c := NewCache(lru)
set(c, 0, 1, 1, 1, nil).Release()
set(c, 0, 2, 2, 1, nil).Release()
- set(c, 1, 1, 4, 1, nil).Release()
- set(c, 1, 2, 5, 1, nil).Release()
- set(c, 2, 1, 6, 1, nil).Release()
- set(c, 2, 2, 7, 1, nil).Release()
+ set(c, 1, 1, 3, 1, nil).Release()
+ set(c, 1, 2, 4, 1, nil).Release()
+ set(c, 2, 1, 5, 1, nil).Release()
+ set(c, 2, 2, 6, 1, nil).Release()
+ v := 1
for ns := 0; ns < 3; ns++ {
for key := 1; key < 3; key++ {
- if h := c.Get(uint64(ns), uint64(key), nil); h != nil {
- h.Release()
- } else {
- t.Errorf("Cache.Get on #%d.%d return nil", ns, key)
- }
+ h := c.Get(uint64(ns), uint64(key), nil)
+ require.NotNilf(t, h, "NS=%d key=%d", ns, key)
+ require.Equal(t, v, h.Value())
+ h.Release()
+ v++
}
}
- if ok := c.Evict(0, 1); !ok {
- t.Error("first Cache.Evict on #0.1 return false")
- }
- if ok := c.Evict(0, 1); ok {
- t.Error("second Cache.Evict on #0.1 return true")
- }
- if h := c.Get(0, 1, nil); h != nil {
- t.Errorf("Cache.Get on #0.1 return non-nil: %v", h.Value())
- }
+ require.True(t, c.Evict(0, 1))
+ require.Equal(t, 5, lru.used)
+ require.False(t, c.Evict(0, 1))
c.EvictNS(1)
- if h := c.Get(1, 1, nil); h != nil {
- t.Errorf("Cache.Get on #1.1 return non-nil: %v", h.Value())
- }
- if h := c.Get(1, 2, nil); h != nil {
- t.Errorf("Cache.Get on #1.2 return non-nil: %v", h.Value())
- }
+ require.Equal(t, 3, lru.used)
+ require.Nil(t, c.Get(1, 1, nil))
+ require.Nil(t, c.Get(1, 2, nil))
c.EvictAll()
- for ns := 0; ns < 3; ns++ {
- for key := 1; key < 3; key++ {
- if h := c.Get(uint64(ns), uint64(key), nil); h != nil {
- t.Errorf("Cache.Get on #%d.%d return non-nil: %v", ns, key, h.Value())
- }
- }
- }
+ require.Zero(t, lru.used)
+ require.Nil(t, c.Get(0, 1, nil))
+ require.Nil(t, c.Get(0, 2, nil))
+ require.Nil(t, c.Get(1, 1, nil))
+ require.Nil(t, c.Get(1, 2, nil))
+ require.Nil(t, c.Get(2, 1, nil))
+ require.Nil(t, c.Get(2, 2, nil))
}
func TestLRUCache_Delete(t *testing.T) {
@@ -487,47 +507,28 @@ func TestLRUCache_Delete(t *testing.T) {
set(c, 0, 1, 1, 1, nil).Release()
set(c, 0, 2, 2, 1, nil).Release()
- if ok := c.Delete(0, 1, delFunc); !ok {
- t.Error("Cache.Delete on #1 return false")
- }
- if h := c.Get(0, 1, nil); h != nil {
- t.Errorf("Cache.Get on #1 return non-nil: %v", h.Value())
- }
- if ok := c.Delete(0, 1, delFunc); ok {
- t.Error("Cache.Delete on #1 return true")
- }
+ require.True(t, c.Delete(0, 1, delFunc))
+ require.Nil(t, c.Get(0, 1, nil))
+ require.False(t, c.Delete(0, 1, delFunc))
h2 := c.Get(0, 2, nil)
- if h2 == nil {
- t.Error("Cache.Get on #2 return nil")
- }
- if ok := c.Delete(0, 2, delFunc); !ok {
- t.Error("(1) Cache.Delete on #2 return false")
- }
- if ok := c.Delete(0, 2, delFunc); !ok {
- t.Error("(2) Cache.Delete on #2 return false")
- }
+ require.NotNil(t, h2)
+ require.True(t, c.Delete(0, 2, delFunc))
+ require.True(t, c.Delete(0, 2, delFunc))
set(c, 0, 3, 3, 1, nil).Release()
set(c, 0, 4, 4, 1, nil).Release()
c.Get(0, 2, nil).Release()
for key := 2; key <= 4; key++ {
- if h := c.Get(0, uint64(key), nil); h != nil {
- h.Release()
- } else {
- t.Errorf("Cache.Get on #%d return nil", key)
- }
+ h := c.Get(0, uint64(key), nil)
+ require.NotNil(t, h)
+ h.Release()
}
h2.Release()
- if h := c.Get(0, 2, nil); h != nil {
- t.Errorf("Cache.Get on #2 return non-nil: %v", h.Value())
- }
-
- if delFuncCalled != 4 {
- t.Errorf("delFunc isn't called 4 times: got=%d", delFuncCalled)
- }
+ require.Nil(t, c.Get(0, 2, nil))
+ require.Equal(t, 4, delFuncCalled)
}
func TestLRUCache_Close(t *testing.T) {
@@ -545,19 +546,11 @@ func TestLRUCache_Close(t *testing.T) {
set(c, 0, 2, 2, 1, relFunc).Release()
h3 := set(c, 0, 3, 3, 1, relFunc)
- if h3 == nil {
- t.Error("Cache.Get on #3 return nil")
- }
- if ok := c.Delete(0, 3, delFunc); !ok {
- t.Error("Cache.Delete on #3 return false")
- }
+ require.NotNil(t, h3)
+ require.True(t, c.Delete(0, 3, delFunc))
- c.Close()
+ c.Close(true)
- if relFuncCalled != 3 {
- t.Errorf("relFunc isn't called 3 times: got=%d", relFuncCalled)
- }
- if delFuncCalled != 1 {
- t.Errorf("delFunc isn't called 1 times: got=%d", delFuncCalled)
- }
+ require.Equal(t, 3, relFuncCalled)
+ require.Equal(t, 1, delFuncCalled)
}
diff --git a/leveldb/cache/lru.go b/leveldb/cache/lru.go
index d9a84cd..383ad5a 100644
--- a/leveldb/cache/lru.go
+++ b/leveldb/cache/lru.go
@@ -142,51 +142,14 @@ func (r *lru) Evict(n *Node) {
r.mu.Unlock()
return
}
+ rn.remove()
+ r.used -= n.Size()
n.CacheData = nil
r.mu.Unlock()
rn.h.Release()
}
-func (r *lru) EvictNS(ns uint64) {
- var evicted []*lruNode
-
- r.mu.Lock()
- for e := r.recent.prev; e != &r.recent; {
- rn := e
- e = e.prev
- if rn.n.NS() == ns {
- rn.remove()
- rn.n.CacheData = nil
- r.used -= rn.n.Size()
- evicted = append(evicted, rn)
- }
- }
- r.mu.Unlock()
-
- for _, rn := range evicted {
- rn.h.Release()
- }
-}
-
-func (r *lru) EvictAll() {
- r.mu.Lock()
- back := r.recent.prev
- for rn := back; rn != &r.recent; rn = rn.prev {
- rn.n.CacheData = nil
- }
- r.reset()
- r.mu.Unlock()
-
- for rn := back; rn != &r.recent; rn = rn.prev {
- rn.h.Release()
- }
-}
-
-func (r *lru) Close() error {
- return nil
-}
-
// NewLRU create a new LRU-cache.
func NewLRU(capacity int) Cacher {
r := &lru{capacity: capacity}
diff --git a/leveldb/corrupt_test.go b/leveldb/corrupt_test.go
index a987b27..543e739 100644
--- a/leveldb/corrupt_test.go
+++ b/leveldb/corrupt_test.go
@@ -164,18 +164,6 @@ func (h *dbCorruptHarness) corrupt(ft storage.FileType, fi, offset, n int) {
w.Close()
}
-func (h *dbCorruptHarness) removeAll(ft storage.FileType) {
- fds, err := h.stor.List(ft)
- if err != nil {
- h.t.Fatal("get files: ", err)
- }
- for _, fd := range fds {
- if err := h.stor.Remove(fd); err != nil {
- h.t.Error("remove file: ", err)
- }
- }
-}
-
func (h *dbCorruptHarness) forceRemoveAll(ft storage.FileType) {
fds, err := h.stor.List(ft)
if err != nil {
diff --git a/leveldb/db.go b/leveldb/db.go
index 74e9826..b2724cd 100644
--- a/leveldb/db.go
+++ b/leveldb/db.go
@@ -17,6 +17,7 @@ import (
"sync/atomic"
"time"
+ "github.com/syndtr/goleveldb/leveldb/cache"
"github.com/syndtr/goleveldb/leveldb/errors"
"github.com/syndtr/goleveldb/leveldb/iterator"
"github.com/syndtr/goleveldb/leveldb/journal"
@@ -141,7 +142,6 @@ func openDB(s *session) (*DB, error) {
}
return nil, err
}
-
}
// Doesn't need to be included in the wait group.
@@ -149,7 +149,9 @@ func openDB(s *session) (*DB, error) {
go db.mpoolDrain()
if readOnly {
- db.SetReadOnly()
+ if err := db.SetReadOnly(); err != nil {
+ return nil, err
+ }
} else {
db.closeW.Add(2)
go db.tCompaction()
@@ -311,15 +313,23 @@ func recoverTable(s *session, o *opt.Options) error {
return
}
defer func() {
- writer.Close()
+ if cerr := writer.Close(); cerr != nil {
+ if err == nil {
+ err = cerr
+ } else {
+ err = fmt.Errorf("error recovering table (%v); error closing (%v)", err, cerr)
+ }
+ }
if err != nil {
- s.stor.Remove(tmpFd)
+ if rerr := s.stor.Remove(tmpFd); rerr != nil {
+ err = fmt.Errorf("error recovering table (%v); error removing (%v)", err, rerr)
+ }
tmpFd = storage.FileDesc{}
}
}()
// Copy entries.
- tw := table.NewWriter(writer, o)
+ tw := table.NewWriter(writer, o, nil, 0)
for iter.Next() {
key := iter.Key()
if validInternalKey(key) {
@@ -397,7 +407,7 @@ func recoverTable(s *session, o *opt.Options) error {
tSeq = seq
}
if imin == nil {
- imin = append([]byte{}, key...)
+ imin = append([]byte(nil), key...)
}
imax = append(imax[:0], key...)
}
@@ -530,7 +540,8 @@ func (db *DB) recoverJournal() error {
if jr == nil {
jr = journal.NewReader(fr, dropper{db.s, fd}, strict, checksum)
} else {
- jr.Reset(fr, dropper{db.s, fd}, strict, checksum)
+ // Ignore the error here
+ _ = jr.Reset(fr, dropper{db.s, fd}, strict, checksum)
}
// Flush memdb and remove obsolete journal file.
@@ -550,7 +561,10 @@ func (db *DB) recoverJournal() error {
}
rec.resetAddedTables()
- db.s.stor.Remove(ofd)
+ if err := db.s.stor.Remove(ofd); err != nil {
+ fr.Close()
+ return err
+ }
ofd = storage.FileDesc{}
}
@@ -634,7 +648,9 @@ func (db *DB) recoverJournal() error {
// Remove the last obsolete journal file.
if !ofd.Zero() {
- db.s.stor.Remove(ofd)
+ if err := db.s.stor.Remove(ofd); err != nil {
+ return err
+ }
}
return nil
@@ -688,7 +704,9 @@ func (db *DB) recoverJournalRO() error {
if jr == nil {
jr = journal.NewReader(fr, dropper{db.s, fd}, strict, checksum)
} else {
- jr.Reset(fr, dropper{db.s, fd}, strict, checksum)
+ if err := jr.Reset(fr, dropper{db.s, fd}, strict, checksum); err != nil {
+ return err
+ }
}
// Replay journal to memdb.
@@ -765,7 +783,7 @@ func (db *DB) get(auxm *memdb.DB, auxt tFiles, key []byte, seq uint64, ro *opt.R
if auxm != nil {
if ok, mv, me := memGet(auxm, ikey, db.s.icmp); ok {
- return append([]byte{}, mv...), me
+ return append([]byte(nil), mv...), me
}
}
@@ -777,7 +795,7 @@ func (db *DB) get(auxm *memdb.DB, auxt tFiles, key []byte, seq uint64, ro *opt.R
defer m.decref()
if ok, mv, me := memGet(m.DB, ikey, db.s.icmp); ok {
- return append([]byte{}, mv...), me
+ return append([]byte(nil), mv...), me
}
}
@@ -1002,15 +1020,15 @@ func (db *DB) GetProperty(name string) (value string, err error) {
}
}
case p == "blockpool":
- value = fmt.Sprintf("%v", db.s.tops.bpool)
+ value = fmt.Sprintf("%v", db.s.tops.blockBuffer)
case p == "cachedblock":
- if db.s.tops.bcache != nil {
- value = fmt.Sprintf("%d", db.s.tops.bcache.Size())
+ if db.s.tops.blockCache != nil {
+ value = fmt.Sprintf("%d", db.s.tops.blockCache.Size())
} else {
value = "<nil>"
}
case p == "openedtables":
- value = fmt.Sprintf("%d", db.s.tops.cache.Size())
+ value = fmt.Sprintf("%d", db.s.tops.fileCache.Size())
case p == "alivesnaps":
value = fmt.Sprintf("%d", atomic.LoadInt32(&db.aliveSnaps))
case p == "aliveiters":
@@ -1037,6 +1055,9 @@ type DBStats struct {
BlockCacheSize int
OpenedTablesCount int
+ FileCache cache.Stats
+ BlockCache cache.Stats
+
LevelSizes Sizes
LevelTablesCounts []int
LevelRead Sizes
@@ -1062,13 +1083,20 @@ func (db *DB) Stats(s *DBStats) error {
s.WriteDelayDuration = time.Duration(atomic.LoadInt64(&db.cWriteDelay))
s.WritePaused = atomic.LoadInt32(&db.inWritePaused) == 1
- s.OpenedTablesCount = db.s.tops.cache.Size()
- if db.s.tops.bcache != nil {
- s.BlockCacheSize = db.s.tops.bcache.Size()
+ s.OpenedTablesCount = db.s.tops.fileCache.Size()
+ if db.s.tops.blockCache != nil {
+ s.BlockCacheSize = db.s.tops.blockCache.Size()
} else {
s.BlockCacheSize = 0
}
+ s.FileCache = db.s.tops.fileCache.GetStats()
+ if db.s.tops.blockCache != nil {
+ s.BlockCache = db.s.tops.blockCache.GetStats()
+ } else {
+ s.BlockCache = cache.Stats{}
+ }
+
s.AliveIterators = atomic.LoadInt32(&db.aliveIters)
s.AliveSnapshots = atomic.LoadInt32(&db.aliveSnaps)
diff --git a/leveldb/db_compaction.go b/leveldb/db_compaction.go
index 6b70eb2..cc275ac 100644
--- a/leveldb/db_compaction.go
+++ b/leveldb/db_compaction.go
@@ -7,6 +7,7 @@
package leveldb
import (
+ "fmt"
"sync"
"sync/atomic"
"time"
@@ -272,7 +273,7 @@ func (db *DB) memCompaction() {
}
defer mdb.decref()
- db.logf("memdb@flush N·%d S·%s", mdb.Len(), shortenb(mdb.Size()))
+ db.logf("memdb@flush N·%d S·%s", mdb.Len(), shortenb(int64(mdb.Size())))
// Don't compact empty memdb.
if mdb.Len() == 0 {
@@ -350,11 +351,11 @@ func (db *DB) memCompaction() {
}
type tableCompactionBuilder struct {
- db *DB
- s *session
- c *compaction
- rec *sessionRecord
- stat0, stat1 *cStatStaging
+ db *DB
+ s *session
+ c *compaction
+ rec *sessionRecord
+ stat1 *cStatStaging
snapHasLastUkey bool
snapLastUkey []byte
@@ -389,7 +390,7 @@ func (b *tableCompactionBuilder) appendKV(key, value []byte) error {
// Create new table.
var err error
- b.tw, err = b.s.tops.create()
+ b.tw, err = b.s.tops.create(b.tableSize)
if err != nil {
return err
}
@@ -410,29 +411,40 @@ func (b *tableCompactionBuilder) flush() error {
}
b.rec.addTableFile(b.c.sourceLevel+1, t)
b.stat1.write += t.size
- b.s.logf("table@build created L%d@%d N·%d S·%s %q:%q", b.c.sourceLevel+1, t.fd.Num, b.tw.tw.EntriesLen(), shortenb(int(t.size)), t.imin, t.imax)
+ b.s.logf("table@build created L%d@%d N·%d S·%s %q:%q", b.c.sourceLevel+1, t.fd.Num, b.tw.tw.EntriesLen(), shortenb(t.size), t.imin, t.imax)
b.tw = nil
return nil
}
-func (b *tableCompactionBuilder) cleanup() {
+func (b *tableCompactionBuilder) cleanup() error {
if b.tw != nil {
- b.tw.drop()
+ if err := b.tw.drop(); err != nil {
+ return err
+ }
b.tw = nil
}
+ return nil
}
-func (b *tableCompactionBuilder) run(cnt *compactionTransactCounter) error {
+func (b *tableCompactionBuilder) run(cnt *compactionTransactCounter) (err error) {
snapResumed := b.snapIter > 0
hasLastUkey := b.snapHasLastUkey // The key might has zero length, so this is necessary.
- lastUkey := append([]byte{}, b.snapLastUkey...)
+ lastUkey := append([]byte(nil), b.snapLastUkey...)
lastSeq := b.snapLastSeq
b.kerrCnt = b.snapKerrCnt
b.dropCnt = b.snapDropCnt
// Restore compaction state.
b.c.restore()
- defer b.cleanup()
+ defer func() {
+ if cerr := b.cleanup(); cerr != nil {
+ if err == nil {
+ err = cerr
+ } else {
+ err = fmt.Errorf("tableCompactionBuilder error: %v, cleanup error (%v)", err, cerr)
+ }
+ }
+ }()
b.stat1.startTimer()
defer b.stat1.stopTimer()
@@ -563,7 +575,7 @@ func (db *DB) tableCompaction(c *compaction, noTrivial bool) {
rec.delTable(c.sourceLevel+i, t.fd.Num)
}
}
- sourceSize := int(stats[0].read + stats[1].read)
+ sourceSize := stats[0].read + stats[1].read
minSeq := db.minSeq()
db.logf("table@compaction L%d·%d -> L%d·%d S·%s Q·%d", c.sourceLevel, len(c.levels[0]), c.sourceLevel+1, len(c.levels[1]), shortenb(sourceSize), minSeq)
@@ -584,7 +596,7 @@ func (db *DB) tableCompaction(c *compaction, noTrivial bool) {
db.compactionCommit("table", rec)
stats[1].stopTimer()
- resultSize := int(stats[1].write)
+ resultSize := stats[1].write
db.logf("table@compaction committed F%s S%s Ke·%d D·%d T·%v", sint(len(rec.addedTables)-len(rec.deletedTables)), sshortenb(resultSize-sourceSize), b.kerrCnt, b.dropCnt, stats[1].duration)
// Save compaction stats
@@ -655,10 +667,7 @@ func (db *DB) tableNeedCompaction() bool {
func (db *DB) resumeWrite() bool {
v := db.s.version()
defer v.release()
- if v.tLen(0) < db.s.o.GetWriteL0PauseTrigger() {
- return true
- }
- return false
+ return v.tLen(0) < db.s.o.GetWriteL0PauseTrigger()
}
func (db *DB) pauseCompaction(ch chan<- struct{}) {
@@ -681,7 +690,7 @@ type cAuto struct {
func (r cAuto) ack(err error) {
if r.ackC != nil {
defer func() {
- recover()
+ _ = recover()
}()
r.ackC <- err
}
@@ -696,7 +705,7 @@ type cRange struct {
func (r cRange) ack(err error) {
if r.ackC != nil {
defer func() {
- recover()
+ _ = recover()
}()
r.ackC <- err
}
diff --git a/leveldb/db_iter.go b/leveldb/db_iter.go
index e6e8ca5..ded13d3 100644
--- a/leveldb/db_iter.go
+++ b/leveldb/db_iter.go
@@ -7,7 +7,6 @@
package leveldb
import (
- "errors"
"math/rand"
"runtime"
"sync"
@@ -18,10 +17,6 @@ import (
"github.com/syndtr/goleveldb/leveldb/util"
)
-var (
- errInvalidInternalKey = errors.New("leveldb: Iterator: invalid internal key")
-)
-
type memdbReleaser struct {
once sync.Once
m *memDB
diff --git a/leveldb/db_state.go b/leveldb/db_state.go
index 65e1c54..29430fe 100644
--- a/leveldb/db_state.go
+++ b/leveldb/db_state.go
@@ -137,8 +137,12 @@ func (db *DB) newMem(n int) (mem *memDB, err error) {
if db.journal == nil {
db.journal = journal.NewWriter(w)
} else {
- db.journal.Reset(w)
- db.journalWriter.Close()
+ if err := db.journal.Reset(w); err != nil {
+ return nil, err
+ }
+ if err := db.journalWriter.Close(); err != nil {
+ return nil, err
+ }
db.frozenJournalFd = db.journalFd
}
db.journalWriter = w
@@ -181,13 +185,6 @@ func (db *DB) getEffectiveMem() *memDB {
return db.mem
}
-// Check whether we has frozen memdb.
-func (db *DB) hasFrozenMem() bool {
- db.memMu.RLock()
- defer db.memMu.RUnlock()
- return db.frozenMem != nil
-}
-
// Get frozen memdb.
func (db *DB) getFrozenMem() *memDB {
db.memMu.RLock()
diff --git a/leveldb/db_test.go b/leveldb/db_test.go
index 5749b4d..b25157a 100644
--- a/leveldb/db_test.go
+++ b/leveldb/db_test.go
@@ -113,7 +113,7 @@ func (h *dbHarness) closeDB0() error {
func (h *dbHarness) closeDB() {
if h.db != nil {
- if err := h.closeDB0(); err != nil {
+ if err := h.closeDB0(); err != nil && err != ErrClosed {
h.t.Error("Close: got error: ", err)
}
}
@@ -130,7 +130,9 @@ func (h *dbHarness) reopenDB() {
func (h *dbHarness) close() {
if h.db != nil {
- h.closeDB0()
+ if err := h.closeDB0(); err != nil && err != ErrClosed {
+ h.t.Error("Close: got error: ", err)
+ }
h.db = nil
}
h.stor.Close()
@@ -538,8 +540,7 @@ func truno(t *testing.T, o *opt.Options, f func(h *dbHarness)) {
}
h := newDbHarnessWopt(t, o)
defer h.close()
- switch i {
- case 3:
+ if i == 3 {
h.reopenDB()
}
f(h)
@@ -558,9 +559,9 @@ func testAligned(t *testing.T, name string, offset uintptr) {
}
func Test_FieldsAligned(t *testing.T) {
- p1 := new(DB)
+ p1 := new(DB) //nolint:staticcheck
testAligned(t, "DB.seq", unsafe.Offsetof(p1.seq))
- p2 := new(session)
+ p2 := new(session) //nolint:staticcheck
testAligned(t, "session.stNextFileNum", unsafe.Offsetof(p2.stNextFileNum))
testAligned(t, "session.stJournalNum", unsafe.Offsetof(p2.stJournalNum))
testAligned(t, "session.stPrevJournalNum", unsafe.Offsetof(p2.stPrevJournalNum))
@@ -1351,11 +1352,17 @@ func TestDB_CompactionTableOpenError(t *testing.T) {
}
h.stor.EmulateError(testutil.ModeOpen, storage.TypeTable, errors.New("open error during table compaction"))
- go h.db.CompactRange(util.Range{})
+ go func() {
+ if err := h.db.CompactRange(util.Range{}); err != nil && err != ErrClosed {
+ t.Error("CompactRange error: ", err)
+ }
+ }()
if err := h.db.compTriggerWait(h.db.tcompCmdC); err != nil {
t.Log("compaction error: ", err)
}
- h.closeDB0()
+ if err := h.closeDB0(); err != nil {
+ t.Error("Close: got error: ", err)
+ }
h.openDB()
h.stor.EmulateError(testutil.ModeOpen, storage.TypeTable, nil)
@@ -2076,6 +2083,10 @@ func TestDB_GoleveldbIssue74(t *testing.T) {
b := new(Batch)
for ; time.Now().Before(until) && atomic.LoadUint32(&done) == 0; i++ {
+ if t.Failed() {
+ return
+ }
+
iv := fmt.Sprintf("VAL%010d", i)
for k := 0; k < n; k++ {
key := fmt.Sprintf("KEY%06d", k)
@@ -2089,16 +2100,23 @@ func TestDB_GoleveldbIssue74(t *testing.T) {
iter := snap.NewIterator(util.BytesPrefix([]byte("PTR")), nil)
var k int
for ; iter.Next(); k++ {
+ if t.Failed() {
+ return
+ }
+
ptrKey := iter.Key()
key := iter.Value()
if _, err := snap.Get(ptrKey, nil); err != nil {
- t.Fatalf("WRITER #%d snapshot.Get %q: %v", i, ptrKey, err)
+ t.Errorf("WRITER #%d snapshot.Get %q: %v", i, ptrKey, err)
+ return
}
if value, err := snap.Get(key, nil); err != nil {
- t.Fatalf("WRITER #%d snapshot.Get %q: %v", i, key, err)
+ t.Errorf("WRITER #%d snapshot.Get %q: %v", i, key, err)
+ return
} else if string(value) != string(key)+iv {
- t.Fatalf("WRITER #%d snapshot.Get %q got invalid value, want %q got %q", i, key, string(key)+iv, value)
+ t.Errorf("WRITER #%d snapshot.Get %q got invalid value, want %q got %q", i, key, string(key)+iv, value)
+ return
}
b.Delete(key)
@@ -2108,7 +2126,7 @@ func TestDB_GoleveldbIssue74(t *testing.T) {
iter.Release()
snap.Release()
if k != n {
- t.Fatalf("#%d %d != %d", i, k, n)
+ t.Errorf("#%d %d != %d", i, k, n)
}
}
}()
@@ -2120,22 +2138,33 @@ func TestDB_GoleveldbIssue74(t *testing.T) {
wg.Done()
}()
for ; time.Now().Before(until) && atomic.LoadUint32(&done) == 0; i++ {
+ if t.Failed() {
+ return
+ }
+
snap := h.getSnapshot()
iter := snap.NewIterator(util.BytesPrefix([]byte("PTR")), nil)
var prevValue string
var k int
for ; iter.Next(); k++ {
+ if t.Failed() {
+ return
+ }
+
ptrKey := iter.Key()
key := iter.Value()
if _, err := snap.Get(ptrKey, nil); err != nil {
- t.Fatalf("READER #%d snapshot.Get %q: %v", i, ptrKey, err)
+ t.Errorf("READER #%d snapshot.Get %q: %v", i, ptrKey, err)
+ return
}
if value, err := snap.Get(key, nil); err != nil {
- t.Fatalf("READER #%d snapshot.Get %q: %v", i, key, err)
+ t.Errorf("READER #%d snapshot.Get %q: %v", i, key, err)
+ return
} else if prevValue != "" && string(value) != string(key)+prevValue {
- t.Fatalf("READER #%d snapshot.Get %q got invalid value, want %q got %q", i, key, string(key)+prevValue, value)
+ t.Errorf("READER #%d snapshot.Get %q got invalid value, want %q got %q", i, key, string(key)+prevValue, value)
+ return
} else {
prevValue = string(value[len(key):])
}
@@ -2143,7 +2172,7 @@ func TestDB_GoleveldbIssue74(t *testing.T) {
iter.Release()
snap.Release()
if k > 0 && k != n {
- t.Fatalf("#%d %d != %d", i, k, n)
+ t.Errorf("#%d %d != %d", i, k, n)
}
}
}()
@@ -2212,6 +2241,10 @@ func TestDB_GoleveldbIssue72and83(t *testing.T) {
b := new(Batch)
for ; i < wn && atomic.LoadUint32(&done) == 0; i++ {
+ if t.Failed() {
+ return
+ }
+
b.Reset()
for _, k1 := range keys {
k2 := randomData(2, i)
@@ -2220,7 +2253,8 @@ func TestDB_GoleveldbIssue72and83(t *testing.T) {
}
if err := h.db.Write(b, h.wo); err != nil {
atomic.StoreUint32(&done, 1)
- t.Fatalf("WRITER #%d db.Write: %v", i, err)
+ t.Errorf("WRITER #%d db.Write: %v", i, err)
+ return
}
}
}()
@@ -2232,6 +2266,10 @@ func TestDB_GoleveldbIssue72and83(t *testing.T) {
wg.Done()
}()
for ; time.Now().Before(until) && atomic.LoadUint32(&done) == 0; i++ {
+ if t.Failed() {
+ return
+ }
+
snap := h.getSnapshot()
seq := snap.elem.seq
if seq == 0 {
@@ -2242,33 +2280,42 @@ func TestDB_GoleveldbIssue72and83(t *testing.T) {
writei := int(seq/(n*2) - 1)
var k int
for ; iter.Next(); k++ {
+ if t.Failed() {
+ return
+ }
+
k1 := iter.Key()
k2 := iter.Value()
k1checksum0 := binary.LittleEndian.Uint32(k1[len(k1)-4:])
k1checksum1 := util.NewCRC(k1[:len(k1)-4]).Value()
if k1checksum0 != k1checksum1 {
- t.Fatalf("READER0 #%d.%d W#%d invalid K1 checksum: %#x != %#x", i, k, writei, k1checksum0, k1checksum0)
+ t.Errorf("READER0 #%d.%d W#%d invalid K1 checksum: %#x != %#x", i, k, writei, k1checksum0, k1checksum0)
+ return
}
k2checksum0 := binary.LittleEndian.Uint32(k2[len(k2)-4:])
k2checksum1 := util.NewCRC(k2[:len(k2)-4]).Value()
if k2checksum0 != k2checksum1 {
- t.Fatalf("READER0 #%d.%d W#%d invalid K2 checksum: %#x != %#x", i, k, writei, k2checksum0, k2checksum1)
+ t.Errorf("READER0 #%d.%d W#%d invalid K2 checksum: %#x != %#x", i, k, writei, k2checksum0, k2checksum1)
+ return
}
kwritei := int(binary.LittleEndian.Uint32(k2[len(k2)-8:]))
if writei != kwritei {
- t.Fatalf("READER0 #%d.%d W#%d invalid write iteration num: %d", i, k, writei, kwritei)
+ t.Errorf("READER0 #%d.%d W#%d invalid write iteration num: %d", i, k, writei, kwritei)
+ return
}
if _, err := snap.Get(k2, nil); err != nil {
- t.Fatalf("READER0 #%d.%d W#%d snap.Get: %v\nk1: %x\n -> k2: %x", i, k, writei, err, k1, k2)
+ t.Errorf("READER0 #%d.%d W#%d snap.Get: %v\nk1: %x\n -> k2: %x", i, k, writei, err, k1, k2)
+ return
}
}
if err := iter.Error(); err != nil {
- t.Fatalf("READER0 #%d.%d W#%d snap.Iterator: %v", i, k, writei, err)
+ t.Errorf("READER0 #%d.%d W#%d snap.Iterator: %v", i, k, writei, err)
+ return
}
iter.Release()
snap.Release()
if k > 0 && k != n {
- t.Fatalf("READER0 #%d W#%d short read, got=%d want=%d", i, writei, k, n)
+ t.Errorf("READER0 #%d W#%d short read, got=%d want=%d", i, writei, k, n)
}
}
}()
@@ -2280,6 +2327,10 @@ func TestDB_GoleveldbIssue72and83(t *testing.T) {
wg.Done()
}()
for ; time.Now().Before(until) && atomic.LoadUint32(&done) == 0; i++ {
+ if t.Failed() {
+ return
+ }
+
iter := h.db.NewIterator(nil, nil)
seq := iter.(*dbIter).seq
if seq == 0 {
@@ -2292,11 +2343,13 @@ func TestDB_GoleveldbIssue72and83(t *testing.T) {
k++
}
if err := iter.Error(); err != nil {
- t.Fatalf("READER1 #%d.%d W#%d db.Iterator: %v", i, k, writei, err)
+ t.Errorf("READER1 #%d.%d W#%d db.Iterator: %v", i, k, writei, err)
+ return
}
iter.Release()
if m := (writei+1)*n + n; k != m {
- t.Fatalf("READER1 #%d W#%d short read, got=%d want=%d", i, writei, k, m)
+ t.Errorf("READER1 #%d W#%d short read, got=%d want=%d", i, writei, k, m)
+ return
}
}
}()
@@ -2375,14 +2428,20 @@ func TestDB_TransientError(t *testing.T) {
vtail := fmt.Sprintf("VAL%030d", i)
for _, k := range sk {
+ if t.Failed() {
+ return
+ }
+
key := fmt.Sprintf("KEY%8d", k)
xvalue, err := snap.Get([]byte(key), nil)
if err != nil {
- t.Fatalf("READER_GET #%d SEQ=%d K%d error: %v", i, snap.elem.seq, k, err)
+ t.Errorf("READER_GET #%d SEQ=%d K%d error: %v", i, snap.elem.seq, k, err)
+ return
}
value := key + vtail
if !bytes.Equal([]byte(value), xvalue) {
- t.Fatalf("READER_GET #%d SEQ=%d K%d invalid value: want %q, got %q", i, snap.elem.seq, k, value, xvalue)
+ t.Errorf("READER_GET #%d SEQ=%d K%d invalid value: want %q, got %q", i, snap.elem.seq, k, value, xvalue)
+ return
}
}
}(i, snap, rnd.Perm(nKey))
@@ -2394,22 +2453,29 @@ func TestDB_TransientError(t *testing.T) {
iter := snap.NewIterator(nil, nil)
defer iter.Release()
for k := 0; k < nKey; k++ {
+ if t.Failed() {
+ return
+ }
+
if !iter.Next() {
if err := iter.Error(); err != nil {
- t.Fatalf("READER_ITER #%d K%d error: %v", i, k, err)
- } else {
- t.Fatalf("READER_ITER #%d K%d eoi", i, k)
+ t.Errorf("READER_ITER #%d K%d error: %v", i, k, err)
+ return
}
+ t.Errorf("READER_ITER #%d K%d eoi", i, k)
+ return
}
key := fmt.Sprintf("KEY%8d", k)
xkey := iter.Key()
if !bytes.Equal([]byte(key), xkey) {
- t.Fatalf("READER_ITER #%d K%d invalid key: want %q, got %q", i, k, key, xkey)
+ t.Errorf("READER_ITER #%d K%d invalid key: want %q, got %q", i, k, key, xkey)
+ return
}
value := key + vtail
xvalue := iter.Value()
if !bytes.Equal([]byte(value), xvalue) {
- t.Fatalf("READER_ITER #%d K%d invalid value: want %q, got %q", i, k, value, xvalue)
+ t.Errorf("READER_ITER #%d K%d invalid value: want %q, got %q", i, k, value, xvalue)
+ return
}
}
}(i, snap)
@@ -2492,14 +2558,20 @@ func TestDB_UkeyShouldntHopAcrossTable(t *testing.T) {
vtail := fmt.Sprintf("VAL%030d", i)
for k := 0; k < nKey; k++ {
+ if t.Failed() {
+ return
+ }
+
key := fmt.Sprintf("KEY%08d", k)
xvalue, err := snap.Get([]byte(key), nil)
if err != nil {
- t.Fatalf("READER_GET #%d SEQ=%d K%d error: %v", i, snap.elem.seq, k, err)
+ t.Errorf("READER_GET #%d SEQ=%d K%d error: %v", i, snap.elem.seq, k, err)
+ return
}
value := key + vtail
if !bytes.Equal([]byte(value), xvalue) {
- t.Fatalf("READER_GET #%d SEQ=%d K%d invalid value: want %q, got %q", i, snap.elem.seq, k, value, xvalue)
+ t.Errorf("READER_GET #%d SEQ=%d K%d invalid value: want %q, got %q", i, snap.elem.seq, k, value, xvalue)
+ return
}
}
}(i, snap)
@@ -2539,7 +2611,7 @@ func TestDB_TableCompactionBuilder(t *testing.T) {
value = bytes.Repeat([]byte{'0'}, 100)
)
for i := 0; i < 2; i++ {
- tw, err := s.tops.create()
+ tw, err := s.tops.create(0)
if err != nil {
t.Fatal(err)
}
@@ -2736,7 +2808,7 @@ func testDB_IterTriggeredCompaction(t *testing.T, limitDiv int) {
initialSize1 = h.sizeOf(limitKey, maxKey)
)
- t.Logf("initial size %s [rest %s]", shortenb(int(initialSize0)), shortenb(int(initialSize1)))
+ t.Logf("initial size %s [rest %s]", shortenb(initialSize0), shortenb(initialSize1))
for r := 0; true; r++ {
if r >= mIter {
@@ -2758,7 +2830,7 @@ func testDB_IterTriggeredCompaction(t *testing.T, limitDiv int) {
// Check size.
size0 := h.sizeOf(startKey, limitKey)
size1 := h.sizeOf(limitKey, maxKey)
- t.Logf("#%03d size %s [rest %s]", r, shortenb(int(size0)), shortenb(int(size1)))
+ t.Logf("#%03d size %s [rest %s]", r, shortenb(size0), shortenb(size1))
if size0 < initialSize0/10 {
break
}
@@ -2832,11 +2904,15 @@ func TestDB_BulkInsertDelete(t *testing.T) {
offset := N * i
for j := 0; j < N; j++ {
binary.BigEndian.PutUint32(key, uint32(offset+j))
- h.db.Put(key, value, nil)
+ if err := h.db.Put(key, value, nil); err != nil {
+ t.Fatal("Put error: ", err)
+ }
}
for j := 0; j < N; j++ {
binary.BigEndian.PutUint32(key, uint32(offset+j))
- h.db.Delete(key, nil)
+ if err := h.db.Delete(key, nil); err != nil {
+ t.Fatal("Delete error: ", err)
+ }
}
}
diff --git a/leveldb/db_transaction.go b/leveldb/db_transaction.go
index 21d1e51..b7b82fd 100644
--- a/leveldb/db_transaction.go
+++ b/leveldb/db_transaction.go
@@ -110,7 +110,7 @@ func (tr *Transaction) flush() error {
tr.tables = append(tr.tables, t)
tr.rec.addTableFile(0, t)
tr.stats.write += t.size
- tr.db.logf("transaction@flush created L0@%d N·%d S·%s %q:%q", t.fd.Num, n, shortenb(int(t.size)), t.imin, t.imax)
+ tr.db.logf("transaction@flush created L0@%d N·%d S·%s %q:%q", t.fd.Num, n, shortenb(t.size), t.imin, t.imax)
}
return nil
}
@@ -244,7 +244,7 @@ func (tr *Transaction) Commit() error {
// Additionally, wait compaction when certain threshold reached.
// Ignore error, returns error only if transaction can't be committed.
- tr.db.waitCompaction()
+ _ = tr.db.waitCompaction()
}
// Only mark as done if transaction committed successfully.
tr.setDone()
diff --git a/leveldb/db_write.go b/leveldb/db_write.go
index db0c1be..18eddbe 100644
--- a/leveldb/db_write.go
+++ b/leveldb/db_write.go
@@ -246,7 +246,10 @@ func (db *DB) writeLocked(batch, ourBatch *Batch, merge, sync bool) error {
// Rotate memdb if it's reach the threshold.
if batch.internalLen >= mdbFree {
- db.rotateMem(0, false)
+ if _, err := db.rotateMem(0, false); err != nil {
+ db.unlockWrite(overflow, merged, err)
+ return err
+ }
}
db.unlockWrite(overflow, merged, nil)
diff --git a/leveldb/errors/errors.go b/leveldb/errors/errors.go
index 8d6146b..0c7f64b 100644
--- a/leveldb/errors/errors.go
+++ b/leveldb/errors/errors.go
@@ -73,6 +73,7 @@ func SetFd(err error, fd storage.FileDesc) error {
case *ErrCorrupted:
x.Fd = fd
return x
+ default:
+ return err
}
- return err
}
diff --git a/leveldb/iterator/array_iter.go b/leveldb/iterator/array_iter.go
index a23ab05..1e4fe4e 100644
--- a/leveldb/iterator/array_iter.go
+++ b/leveldb/iterator/array_iter.go
@@ -88,10 +88,7 @@ func (i *basicArrayIterator) Seek(key []byte) bool {
return false
}
i.pos = i.array.Search(key)
- if i.pos >= n {
- return false
- }
- return true
+ return i.pos < n
}
func (i *basicArrayIterator) Next() bool {
diff --git a/leveldb/iterator/indexed_iter.go b/leveldb/iterator/indexed_iter.go
index 939adbb..fd0b55a 100644
--- a/leveldb/iterator/indexed_iter.go
+++ b/leveldb/iterator/indexed_iter.go
@@ -26,10 +26,9 @@ type indexedIterator struct {
index IteratorIndexer
strict bool
- data Iterator
- err error
- errf func(err error)
- closed bool
+ data Iterator
+ err error
+ errf func(err error)
}
func (i *indexedIterator) setData() {
diff --git a/leveldb/iterator/merged_iter.go b/leveldb/iterator/merged_iter.go
index 1a7e29d..374e82b 100644
--- a/leveldb/iterator/merged_iter.go
+++ b/leveldb/iterator/merged_iter.go
@@ -7,6 +7,8 @@
package iterator
import (
+ "container/heap"
+
"github.com/syndtr/goleveldb/leveldb/comparer"
"github.com/syndtr/goleveldb/leveldb/errors"
"github.com/syndtr/goleveldb/leveldb/util"
@@ -33,6 +35,9 @@ type mergedIterator struct {
err error
errf func(err error)
releaser util.Releaser
+
+ indexes []int // the heap of iterator indexes
+ reverse bool //nolint: structcheck // if true, indexes is a max-heap
}
func assertKey(key []byte) []byte {
@@ -67,16 +72,20 @@ func (i *mergedIterator) First() bool {
return false
}
+ h := i.indexHeap()
+ h.Reset(false)
for x, iter := range i.iters {
switch {
case iter.First():
i.keys[x] = assertKey(iter.Key())
+ h.Push(x)
case i.iterErr(iter):
return false
default:
i.keys[x] = nil
}
}
+ heap.Init(h)
i.dir = dirSOI
return i.next()
}
@@ -89,16 +98,20 @@ func (i *mergedIterator) Last() bool {
return false
}
+ h := i.indexHeap()
+ h.Reset(true)
for x, iter := range i.iters {
switch {
case iter.Last():
i.keys[x] = assertKey(iter.Key())
+ h.Push(x)
case i.iterErr(iter):
return false
default:
i.keys[x] = nil
}
}
+ heap.Init(h)
i.dir = dirEOI
return i.prev()
}
@@ -111,35 +124,31 @@ func (i *mergedIterator) Seek(key []byte) bool {
return false
}
+ h := i.indexHeap()
+ h.Reset(false)
for x, iter := range i.iters {
switch {
case iter.Seek(key):
i.keys[x] = assertKey(iter.Key())
+ h.Push(x)
case i.iterErr(iter):
return false
default:
i.keys[x] = nil
}
}
+ heap.Init(h)
i.dir = dirSOI
return i.next()
}
func (i *mergedIterator) next() bool {
- var key []byte
- if i.dir == dirForward {
- key = i.keys[i.index]
- }
- for x, tkey := range i.keys {
- if tkey != nil && (key == nil || i.cmp.Compare(tkey, key) < 0) {
- key = tkey
- i.index = x
- }
- }
- if key == nil {
+ h := i.indexHeap()
+ if h.Len() == 0 {
i.dir = dirEOI
return false
}
+ i.index = heap.Pop(h).(int)
i.dir = dirForward
return true
}
@@ -156,7 +165,7 @@ func (i *mergedIterator) Next() bool {
case dirSOI:
return i.First()
case dirBackward:
- key := append([]byte{}, i.keys[i.index]...)
+ key := append([]byte(nil), i.keys[i.index]...)
if !i.Seek(key) {
return false
}
@@ -168,6 +177,7 @@ func (i *mergedIterator) Next() bool {
switch {
case iter.Next():
i.keys[x] = assertKey(iter.Key())
+ heap.Push(i.indexHeap(), x)
case i.iterErr(iter):
return false
default:
@@ -177,20 +187,12 @@ func (i *mergedIterator) Next() bool {
}
func (i *mergedIterator) prev() bool {
- var key []byte
- if i.dir == dirBackward {
- key = i.keys[i.index]
- }
- for x, tkey := range i.keys {
- if tkey != nil && (key == nil || i.cmp.Compare(tkey, key) > 0) {
- key = tkey
- i.index = x
- }
- }
- if key == nil {
+ h := i.indexHeap()
+ if h.Len() == 0 {
i.dir = dirSOI
return false
}
+ i.index = heap.Pop(h).(int)
i.dir = dirBackward
return true
}
@@ -207,7 +209,9 @@ func (i *mergedIterator) Prev() bool {
case dirEOI:
return i.Last()
case dirForward:
- key := append([]byte{}, i.keys[i.index]...)
+ key := append([]byte(nil), i.keys[i.index]...)
+ h := i.indexHeap()
+ h.Reset(true)
for x, iter := range i.iters {
if x == i.index {
continue
@@ -216,12 +220,14 @@ func (i *mergedIterator) Prev() bool {
switch {
case seek && iter.Prev(), !seek && iter.Last():
i.keys[x] = assertKey(iter.Key())
+ h.Push(x)
case i.iterErr(iter):
return false
default:
i.keys[x] = nil
}
}
+ heap.Init(h)
}
x := i.index
@@ -229,6 +235,7 @@ func (i *mergedIterator) Prev() bool {
switch {
case iter.Prev():
i.keys[x] = assertKey(iter.Key())
+ heap.Push(i.indexHeap(), x)
case i.iterErr(iter):
return false
default:
@@ -259,6 +266,7 @@ func (i *mergedIterator) Release() {
}
i.iters = nil
i.keys = nil
+ i.indexes = nil
if i.releaser != nil {
i.releaser.Release()
i.releaser = nil
@@ -284,6 +292,10 @@ func (i *mergedIterator) SetErrorCallback(f func(err error)) {
i.errf = f
}
+func (i *mergedIterator) indexHeap() *indexHeap {
+ return (*indexHeap)(i)
+}
+
// NewMergedIterator returns an iterator that merges its input. Walking the
// resultant iterator will return all key/value pairs of all input iterators
// in strictly increasing key order, as defined by cmp.
@@ -296,9 +308,43 @@ func (i *mergedIterator) SetErrorCallback(f func(err error)) {
// continue to the next 'input iterator'.
func NewMergedIterator(iters []Iterator, cmp comparer.Comparer, strict bool) Iterator {
return &mergedIterator{
- iters: iters,
- cmp: cmp,
- strict: strict,
- keys: make([][]byte, len(iters)),
+ iters: iters,
+ cmp: cmp,
+ strict: strict,
+ keys: make([][]byte, len(iters)),
+ indexes: make([]int, 0, len(iters)),
}
}
+
+// indexHeap implements heap.Interface.
+type indexHeap mergedIterator
+
+func (h *indexHeap) Len() int { return len(h.indexes) }
+func (h *indexHeap) Less(i, j int) bool {
+ i, j = h.indexes[i], h.indexes[j]
+ r := h.cmp.Compare(h.keys[i], h.keys[j])
+ if h.reverse {
+ return r > 0
+ }
+ return r < 0
+}
+
+func (h *indexHeap) Swap(i, j int) {
+ h.indexes[i], h.indexes[j] = h.indexes[j], h.indexes[i]
+}
+
+func (h *indexHeap) Push(value interface{}) {
+ h.indexes = append(h.indexes, value.(int))
+}
+
+func (h *indexHeap) Pop() interface{} {
+ e := len(h.indexes) - 1
+ popped := h.indexes[e]
+ h.indexes = h.indexes[:e]
+ return popped
+}
+
+func (h *indexHeap) Reset(reverse bool) {
+ h.reverse = reverse
+ h.indexes = h.indexes[:0]
+}
diff --git a/leveldb/iterator/merged_iter_test.go b/leveldb/iterator/merged_iter_test.go
index ee40881..80cff0e 100644
--- a/leveldb/iterator/merged_iter_test.go
+++ b/leveldb/iterator/merged_iter_test.go
@@ -7,6 +7,8 @@
package iterator_test
import (
+ "testing"
+
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
@@ -58,3 +60,22 @@ var _ = testutil.Defer(func() {
Describe("with one filled, two empty iterators", Test(1, 2))
})
})
+
+func BenchmarkMergedIterator(b *testing.B) {
+ n := 11
+ iters := make([]Iterator, n)
+ for i := range iters {
+ kv := testutil.KeyValue_Generate(nil, 100, 1, 1, 10, 4, 4)
+ iters[i] = NewArrayIterator(kv)
+ }
+
+ mi := NewMergedIterator(iters, comparer.DefaultComparer, true)
+ b.ResetTimer()
+
+ for i := 0; i < b.N; i++ {
+ mi.First()
+ for mi.Next() {
+ mi.Key()
+ }
+ }
+}
diff --git a/leveldb/journal/journal.go b/leveldb/journal/journal.go
index d094c3d..f7f8b54 100644
--- a/leveldb/journal/journal.go
+++ b/leveldb/journal/journal.go
@@ -354,6 +354,8 @@ type Writer struct {
// buf[:written] has already been written to w.
// written is zero unless Flush has been called.
written int
+ // blockNumber is the zero based block number currently held in buf.
+ blockNumber int64
// first is whether the current chunk is the first chunk of the journal.
first bool
// pending is whether a chunk is buffered but not yet written.
@@ -402,6 +404,7 @@ func (w *Writer) writeBlock() {
w.i = 0
w.j = headerSize
w.written = 0
+ w.blockNumber++
}
// writePending finishes the current journal and writes the buffer to the
@@ -457,6 +460,7 @@ func (w *Writer) Reset(writer io.Writer) (err error) {
w.i = 0
w.j = 0
w.written = 0
+ w.blockNumber = 0
w.first = false
w.pending = false
w.err = nil
@@ -474,7 +478,7 @@ func (w *Writer) Next() (io.Writer, error) {
w.fillHeader(true)
}
w.i = w.j
- w.j = w.j + headerSize
+ w.j += headerSize
// Check if there is room in the block for the header.
if w.j > blockSize {
// Fill in the rest of the block with zeroes.
@@ -491,6 +495,14 @@ func (w *Writer) Next() (io.Writer, error) {
return singleWriter{w, w.seq}, nil
}
+// Size returns the current size of the file.
+func (w *Writer) Size() int64 {
+ if w == nil {
+ return 0
+ }
+ return w.blockNumber*blockSize + int64(w.j)
+}
+
type singleWriter struct {
w *Writer
seq int
diff --git a/leveldb/journal/journal_test.go b/leveldb/journal/journal_test.go
index 0fcf225..663f07d 100644
--- a/leveldb/journal/journal_test.go
+++ b/leveldb/journal/journal_test.go
@@ -19,6 +19,8 @@ import (
"math/rand"
"strings"
"testing"
+
+ "github.com/stretchr/testify/require"
)
type dropper struct {
@@ -171,9 +173,13 @@ func TestFlush(t *testing.T) {
// Write a couple of records. Everything should still be held
// in the record.Writer buffer, so that buf.Len should be 0.
w0, _ := w.Next()
- w0.Write([]byte("0"))
+ if _, err := w0.Write([]byte("0")); err != nil {
+ t.Fatal(err)
+ }
w1, _ := w.Next()
- w1.Write([]byte("11"))
+ if _, err := w1.Write([]byte("11")); err != nil {
+ t.Fatal(err)
+ }
if got, want := buf.Len(), 0; got != want {
t.Fatalf("buffer length #0: got %d want %d", got, want)
}
@@ -188,7 +194,9 @@ func TestFlush(t *testing.T) {
// Do another write, one that isn't large enough to complete the block.
// The write should not have flowed through to buf.
w2, _ := w.Next()
- w2.Write(bytes.Repeat([]byte("2"), 10000))
+ if _, err := w2.Write(bytes.Repeat([]byte("2"), 10000)); err != nil {
+ t.Fatal(err)
+ }
if got, want := buf.Len(), 17; got != want {
t.Fatalf("buffer length #2: got %d want %d", got, want)
}
@@ -204,7 +212,9 @@ func TestFlush(t *testing.T) {
// We should now have 32768 bytes (a complete block), without
// an explicit flush.
w3, _ := w.Next()
- w3.Write(bytes.Repeat([]byte("3"), 40000))
+ if _, err := w3.Write(bytes.Repeat([]byte("3"), 40000)); err != nil {
+ t.Fatal(err)
+ }
if got, want := buf.Len(), 32768; got != want {
t.Fatalf("buffer length #4: got %d want %d", got, want)
}
@@ -243,7 +253,9 @@ func TestNonExhaustiveRead(t *testing.T) {
length := len(p) + rnd.Intn(3*blockSize)
s := string(uint8(i)) + "123456789abcdefgh"
ww, _ := w.Next()
- ww.Write([]byte(big(s, length)))
+ if _, err := ww.Write([]byte(big(s, length))); err != nil {
+ t.Fatal(err)
+ }
}
if err := w.Close(); err != nil {
t.Fatal(err)
@@ -271,12 +283,16 @@ func TestStaleReader(t *testing.T) {
if err != nil {
t.Fatal(err)
}
- w0.Write([]byte("0"))
+ if _, err := w0.Write([]byte("0")); err != nil {
+ t.Fatal(err)
+ }
w1, err := w.Next()
if err != nil {
t.Fatal(err)
}
- w1.Write([]byte("11"))
+ if _, err := w1.Write([]byte("11")); err != nil {
+ t.Fatal(err)
+ }
if err := w.Close(); err != nil {
t.Fatal(err)
}
@@ -328,6 +344,28 @@ func TestStaleWriter(t *testing.T) {
}
}
+func TestSize(t *testing.T) {
+ var buf bytes.Buffer
+ zeroes := make([]byte, 8<<10)
+ w := NewWriter(&buf)
+ for i := 0; i < 100; i++ {
+ writer, err := w.Next()
+ require.NoError(t, err)
+
+ for j := 0; j < rand.Intn(10); j++ {
+ n := rand.Intn(len(zeroes))
+ _, err = writer.Write(zeroes[:n])
+ require.NoError(t, err)
+ }
+
+ require.NoError(t, w.Flush())
+ if buf.Len() != int(w.Size()) {
+ t.Fatalf("expected %d, but found %d", buf.Len(), w.Size())
+ }
+ }
+ require.NoError(t, w.Close())
+}
+
func TestCorrupt_MissingLastBlock(t *testing.T) {
buf := new(bytes.Buffer)
@@ -377,7 +415,7 @@ func TestCorrupt_MissingLastBlock(t *testing.T) {
if err != nil {
t.Fatal(err)
}
- n, err = io.Copy(ioutil.Discard, rr)
+ _, err = io.Copy(ioutil.Discard, rr)
if err != io.ErrUnexpectedEOF {
t.Fatalf("read #1: unexpected error: %v", err)
}
@@ -542,7 +580,7 @@ func TestCorrupt_CorruptedMiddleBlock(t *testing.T) {
if err != nil {
t.Fatal(err)
}
- n, err = io.Copy(ioutil.Discard, rr)
+ _, err = io.Copy(ioutil.Discard, rr)
if err != io.ErrUnexpectedEOF {
t.Fatalf("read #1: unexpected error: %v", err)
}
@@ -662,7 +700,7 @@ func TestCorrupt_CorruptedLastBlock(t *testing.T) {
if err != nil {
t.Fatal(err)
}
- n, err = io.Copy(ioutil.Discard, rr)
+ _, err = io.Copy(ioutil.Discard, rr)
if err != io.ErrUnexpectedEOF {
t.Fatalf("read #3: unexpected error: %v", err)
}
@@ -733,7 +771,7 @@ func TestCorrupt_FirstChuckLengthOverflow(t *testing.T) {
if err != nil {
t.Fatal(err)
}
- n, err = io.Copy(ioutil.Discard, rr)
+ _, err = io.Copy(ioutil.Discard, rr)
if err != io.ErrUnexpectedEOF {
t.Fatalf("read #1: unexpected error: %v", err)
}
diff --git a/leveldb/key.go b/leveldb/key.go
index ad8f51e..dc7be1f 100644
--- a/leveldb/key.go
+++ b/leveldb/key.go
@@ -25,7 +25,7 @@ func (e *ErrInternalKeyCorrupted) Error() string {
}
func newErrInternalKeyCorrupted(ikey []byte, reason string) error {
- return errors.NewErrCorrupted(storage.FileDesc{}, &ErrInternalKeyCorrupted{append([]byte{}, ikey...), reason})
+ return errors.NewErrCorrupted(storage.FileDesc{}, &ErrInternalKeyCorrupted{append([]byte(nil), ikey...), reason})
}
type keyType uint
@@ -90,7 +90,7 @@ func parseInternalKey(ik []byte) (ukey []byte, seq uint64, kt keyType, err error
return nil, 0, 0, newErrInternalKeyCorrupted(ik, "invalid length")
}
num := binary.LittleEndian.Uint64(ik[len(ik)-8:])
- seq, kt = uint64(num>>8), keyType(num&0xff)
+ seq, kt = num>>8, keyType(num&0xff)
if kt > keyTypeVal {
return nil, 0, 0, newErrInternalKeyCorrupted(ik, "invalid type")
}
@@ -124,7 +124,7 @@ func (ik internalKey) num() uint64 {
func (ik internalKey) parseNum() (seq uint64, kt keyType) {
num := ik.num()
- seq, kt = uint64(num>>8), keyType(num&0xff)
+ seq, kt = num>>8, keyType(num&0xff)
if kt > keyTypeVal {
panic(fmt.Sprintf("leveldb: internal key %q, len=%d: invalid type %#x", []byte(ik), len(ik), kt))
}
diff --git a/leveldb/key_test.go b/leveldb/key_test.go
index 2f33ccb..5fb43d6 100644
--- a/leveldb/key_test.go
+++ b/leveldb/key_test.go
@@ -16,7 +16,7 @@ import (
var defaultIComparer = &iComparer{comparer.DefaultComparer}
func ikey(key string, seq uint64, kt keyType) internalKey {
- return makeInternalKey(nil, []byte(key), uint64(seq), kt)
+ return makeInternalKey(nil, []byte(key), seq, kt)
}
func shortSep(a, b []byte) []byte {
@@ -110,7 +110,7 @@ func TestInternalKeyShortSeparator(t *testing.T) {
ikey("bar", 99, keyTypeVal)))
// When user keys are different, but correctly ordered
- assertBytes(t, ikey("g", uint64(keyMaxSeq), keyTypeSeek),
+ assertBytes(t, ikey("g", keyMaxSeq, keyTypeSeek),
shortSep(ikey("foo", 100, keyTypeVal),
ikey("hello", 200, keyTypeVal)))
@@ -126,7 +126,7 @@ func TestInternalKeyShortSeparator(t *testing.T) {
}
func TestInternalKeyShortestSuccessor(t *testing.T) {
- assertBytes(t, ikey("g", uint64(keyMaxSeq), keyTypeSeek),
+ assertBytes(t, ikey("g", keyMaxSeq, keyTypeSeek),
shortSuccessor(ikey("foo", 100, keyTypeVal)))
assertBytes(t, ikey("\xff\xff", 100, keyTypeVal),
shortSuccessor(ikey("\xff\xff", 100, keyTypeVal)))
diff --git a/leveldb/memdb/bench_test.go b/leveldb/memdb/bench_test.go
index b05084c..7bc7e1a 100644
--- a/leveldb/memdb/bench_test.go
+++ b/leveldb/memdb/bench_test.go
@@ -23,7 +23,9 @@ func BenchmarkPut(b *testing.B) {
b.ResetTimer()
p := New(comparer.DefaultComparer, 0)
for i := range buf {
- p.Put(buf[i][:], nil)
+ if err := p.Put(buf[i][:], nil); err != nil {
+ b.Fatal(err)
+ }
}
}
@@ -36,7 +38,9 @@ func BenchmarkPutRandom(b *testing.B) {
b.ResetTimer()
p := New(comparer.DefaultComparer, 0)
for i := range buf {
- p.Put(buf[i][:], nil)
+ if err := p.Put(buf[i][:], nil); err != nil {
+ b.Fatal(err)
+ }
}
}
@@ -48,12 +52,16 @@ func BenchmarkGet(b *testing.B) {
p := New(comparer.DefaultComparer, 0)
for i := range buf {
- p.Put(buf[i][:], nil)
+ if err := p.Put(buf[i][:], nil); err != nil {
+ b.Fatal(err)
+ }
}
b.ResetTimer()
for i := range buf {
- p.Get(buf[i][:])
+ if _, err := p.Get(buf[i][:]); err != nil {
+ b.Fatal(err)
+ }
}
}
@@ -65,11 +73,15 @@ func BenchmarkGetRandom(b *testing.B) {
p := New(comparer.DefaultComparer, 0)
for i := range buf {
- p.Put(buf[i][:], nil)
+ if err := p.Put(buf[i][:], nil); err != nil {
+ b.Fatal(err)
+ }
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
- p.Get(buf[rand.Int()%b.N][:])
+ if _, err := p.Get(buf[rand.Int()%b.N][:]); err != nil {
+ b.Fatal(err)
+ }
}
}
diff --git a/leveldb/memdb/memdb_test.go b/leveldb/memdb/memdb_test.go
index 3f0a31e..f20056c 100644
--- a/leveldb/memdb/memdb_test.go
+++ b/leveldb/memdb/memdb_test.go
@@ -45,13 +45,11 @@ func (p *DB) TestFindLast() (rkey, value []byte, err error) {
}
func (p *DB) TestPut(key []byte, value []byte) error {
- p.Put(key, value)
- return nil
+ return p.Put(key, value)
}
func (p *DB) TestDelete(key []byte) error {
- p.Delete(key)
- return nil
+ return p.Delete(key)
}
func (p *DB) TestFind(key []byte) (rkey, rvalue []byte, err error) {
@@ -94,7 +92,7 @@ var _ = testutil.Defer(func() {
// Building the DB.
db := New(comparer.DefaultComparer, 0)
kv.IterateShuffled(nil, func(i int, key, value []byte) {
- db.Put(key, value)
+ Expect(db.Put(key, value)).ShouldNot(HaveOccurred())
})
if kv.Len() > 1 {
diff --git a/leveldb/opt/options.go b/leveldb/opt/options.go
index dead5fd..48fb041 100644
--- a/leveldb/opt/options.go
+++ b/leveldb/opt/options.go
@@ -41,6 +41,7 @@ var (
DefaultWriteL0PauseTrigger = 12
DefaultWriteL0SlowdownTrigger = 8
DefaultFilterBaseLg = 11
+ DefaultMaxManifestFileSize = int64(64 * MiB)
)
// Cacher is a caching algorithm.
@@ -48,23 +49,60 @@ type Cacher interface {
New(capacity int) cache.Cacher
}
-type CacherFunc struct {
+type cacherFunc struct {
NewFunc func(capacity int) cache.Cacher
}
-func (f *CacherFunc) New(capacity int) cache.Cacher {
+func (f *cacherFunc) New(capacity int) cache.Cacher {
if f != nil && f.NewFunc != nil {
return f.NewFunc(capacity)
}
return nil
}
+func CacherFunc(f func(capacity int) cache.Cacher) Cacher {
+ return &cacherFunc{f}
+}
+
+type passthroughCacher struct {
+ Cacher cache.Cacher
+}
+
+func (p *passthroughCacher) New(capacity int) cache.Cacher {
+ return p.Cacher
+}
+
+// PassthroughCacher can be used to passthrough pre-initialized
+// 'cacher instance'. This is useful for sharing cache over multiple
+// DB instances.
+//
+// Shared cache example:
+//
+// fileCache := opt.NewLRU(500)
+// blockCache := opt.NewLRU(8 * opt.MiB)
+// options := &opt.Options{
+// OpenFilesCacher: fileCache,
+// BlockCacher: blockCache,
+// }
+// db1, err1 := leveldb.OpenFile("path/to/db1", options)
+// ...
+// db2, err2 := leveldb.OpenFile("path/to/db2", options)
+// ...
+func PassthroughCacher(x cache.Cacher) Cacher {
+ return &passthroughCacher{x}
+}
+
+// NewLRU creates LRU 'passthrough cacher'.
+func NewLRU(capacity int) Cacher {
+ return PassthroughCacher(cache.NewLRU(capacity))
+}
+
var (
// LRUCacher is the LRU-cache algorithm.
- LRUCacher = &CacherFunc{cache.NewLRU}
+ LRUCacher = CacherFunc(cache.NewLRU)
// NoCacher is the value to disable caching algorithm.
- NoCacher = &CacherFunc{}
+ NoCacher = CacherFunc(nil)
)
// Compression is the 'sorted table' block compression algorithm to use.
@@ -376,6 +414,13 @@ type Options struct {
//
// The default value is 11(as well as 2KB)
FilterBaseLg int
+
+ // MaxManifestFileSize is the maximum size limit of the MANIFEST-****** file.
+ // When the MANIFEST-****** file grows beyond this size, LevelDB will create
+ // a new MANIFEST file.
+ //
+ // The default value is 64 MiB.
+ MaxManifestFileSize int64
}
func (o *Options) GetAltFilters() []filter.Filter {
@@ -715,7 +760,13 @@ func (wo *WriteOptions) GetSync() bool {
func GetStrict(o *Options, ro *ReadOptions, strict Strict) bool {
if ro.GetStrict(StrictOverride) {
return ro.GetStrict(strict)
- } else {
- return o.GetStrict(strict) || ro.GetStrict(strict)
}
+ return o.GetStrict(strict) || ro.GetStrict(strict)
+}
+
+func (o *Options) GetMaxManifestFileSize() int64 {
+ if o == nil || o.MaxManifestFileSize <= 0 {
+ return DefaultMaxManifestFileSize
+ }
+ return o.MaxManifestFileSize
}
diff --git a/leveldb/opt/options_darwin.go b/leveldb/opt/options_darwin.go
index 67b8204..e749081 100644
--- a/leveldb/opt/options_darwin.go
+++ b/leveldb/opt/options_darwin.go
@@ -1,3 +1,4 @@
+//go:build darwin
// +build darwin
package opt
diff --git a/leveldb/opt/options_default.go b/leveldb/opt/options_default.go
index 97a14a8..4c9f4b0 100644
--- a/leveldb/opt/options_default.go
+++ b/leveldb/opt/options_default.go
@@ -1,3 +1,4 @@
+//go:build !darwin
// +build !darwin
package opt
diff --git a/leveldb/session.go b/leveldb/session.go
index e143352..036570e 100644
--- a/leveldb/session.go
+++ b/leveldb/session.go
@@ -54,7 +54,7 @@ type session struct {
stCompPtrs []internalKey // compaction pointers; need external synchronization
stVersion *version // current version
- ntVersionId int64 // next version id to assign
+ ntVersionID int64 // next version id to assign
refCh chan *vTask
relCh chan *vTask
deltaCh chan *vDelta
@@ -107,7 +107,7 @@ func (s *session) close() {
}
s.manifest = nil
s.manifestWriter = nil
- s.setVersion(nil, &version{s: s, closing: true, id: s.ntVersionId})
+ s.setVersion(nil, &version{s: s, closing: true, id: s.ntVersionID})
// Close all background goroutines
close(s.closeC)
@@ -171,7 +171,7 @@ func (s *session) recover() (err error) {
if err == nil {
// save compact pointers
for _, r := range rec.compPtrs {
- s.setCompPtr(r.level, internalKey(r.ikey))
+ s.setCompPtr(r.level, r.ikey)
}
// commit record to version staging
staging.commit(rec)
@@ -226,6 +226,9 @@ func (s *session) commit(r *sessionRecord, trivial bool) (err error) {
if s.manifest == nil {
// manifest journal writer not yet created, create one
err = s.newManifest(r, nv)
+ } else if s.manifest.Size() >= s.o.GetMaxManifestFileSize() {
+ // pass nil sessionRecord to avoid over-reference table file
+ err = s.newManifest(nil, nv)
} else {
err = s.flushManifest(r)
}
diff --git a/leveldb/session_compaction.go b/leveldb/session_compaction.go
index b46a3e4..2fd5f32 100644
--- a/leveldb/session_compaction.go
+++ b/leveldb/session_compaction.go
@@ -48,7 +48,7 @@ func (s *session) flushMemdb(rec *sessionRecord, mdb *memdb.DB, maxLevel int) (i
flushLevel := s.pickMemdbLevel(t.imin.ukey(), t.imax.ukey(), maxLevel)
rec.addTableFile(flushLevel, t)
- s.logf("memdb@flush created L%d@%d N·%d S·%s %q:%q", flushLevel, t.fd.Num, n, shortenb(int(t.size)), t.imin, t.imax)
+ s.logf("memdb@flush created L%d@%d N·%d S·%s %q:%q", flushLevel, t.fd.Num, n, shortenb(t.size), t.imin, t.imax)
return flushLevel, nil
}
@@ -226,8 +226,8 @@ func (c *compaction) expand() {
exp1 := vt1.getOverlaps(nil, c.s.icmp, xmin.ukey(), xmax.ukey(), false)
if len(exp1) == len(t1) {
c.s.logf("table@compaction expanding L%d+L%d (F·%d S·%s)+(F·%d S·%s) -> (F·%d S·%s)+(F·%d S·%s)",
- c.sourceLevel, c.sourceLevel+1, len(t0), shortenb(int(t0.size())), len(t1), shortenb(int(t1.size())),
- len(exp0), shortenb(int(exp0.size())), len(exp1), shortenb(int(exp1.size())))
+ c.sourceLevel, c.sourceLevel+1, len(t0), shortenb(t0.size()), len(t1), shortenb(t1.size()),
+ len(exp0), shortenb(exp0.size()), len(exp1), shortenb(exp1.size()))
imin, imax = xmin, xmax
t0, t1 = exp0, exp1
amin, amax = append(t0, t1...).getRange(c.s.icmp)
diff --git a/leveldb/session_record.go b/leveldb/session_record.go
index 854e1aa..b1a352f 100644
--- a/leveldb/session_record.go
+++ b/leveldb/session_record.go
@@ -201,7 +201,7 @@ func (p *sessionRecord) readUvarintMayEOF(field string, r io.ByteReader, mayEOF
}
x, err := binary.ReadUvarint(r)
if err != nil {
- if err == io.ErrUnexpectedEOF || (mayEOF == false && err == io.EOF) {
+ if err == io.ErrUnexpectedEOF || (!mayEOF && err == io.EOF) {
p.err = errors.NewErrCorrupted(storage.FileDesc{}, &ErrManifestCorrupted{field, "short read"})
} else if strings.HasPrefix(err.Error(), "binary:") {
p.err = errors.NewErrCorrupted(storage.FileDesc{}, &ErrManifestCorrupted{field, err.Error()})
diff --git a/leveldb/session_util.go b/leveldb/session_util.go
index fc56b63..f467f2d 100644
--- a/leveldb/session_util.go
+++ b/leveldb/session_util.go
@@ -24,7 +24,7 @@ type dropper struct {
func (d dropper) Drop(err error) {
if e, ok := err.(*journal.ErrCorrupted); ok {
- d.s.logf("journal@drop %s-%d S·%s %q", d.fd.Type, d.fd.Num, shortenb(e.Size), e.Reason)
+ d.s.logf("journal@drop %s-%d S·%s %q", d.fd.Type, d.fd.Num, shortenb(int64(e.Size)), e.Reason)
} else {
d.s.logf("journal@drop %s-%d %q", d.fd.Type, d.fd.Num, err)
}
@@ -130,7 +130,7 @@ func (s *session) refLoop() {
for {
// Skip any abandoned version number to prevent blocking processing.
if skipAbandoned() {
- next += 1
+ next++
continue
}
// Don't bother the version that has been released.
@@ -162,13 +162,13 @@ func (s *session) refLoop() {
referenced[next] = struct{}{}
delete(ref, next)
delete(deltas, next)
- next += 1
+ next++
}
// Use delta information to process all released versions.
for {
if skipAbandoned() {
- next += 1
+ next++
continue
}
if d, exist := released[next]; exist {
@@ -176,7 +176,7 @@ func (s *session) refLoop() {
applyDelta(d)
}
delete(released, next)
- next += 1
+ next++
continue
}
return
@@ -396,7 +396,7 @@ func (s *session) recordCommited(rec *sessionRecord) {
}
for _, r := range rec.compPtrs {
- s.setCompPtr(r.level, internalKey(r.ikey))
+ s.setCompPtr(r.level, r.ikey)
}
}
@@ -429,14 +429,16 @@ func (s *session) newManifest(rec *sessionRecord, v *version) (err error) {
s.manifestWriter.Close()
}
if !s.manifestFd.Zero() {
- s.stor.Remove(s.manifestFd)
+ err = s.stor.Remove(s.manifestFd)
}
s.manifestFd = fd
s.manifestWriter = writer
s.manifest = jw
} else {
writer.Close()
- s.stor.Remove(fd)
+ if rerr := s.stor.Remove(fd); err != nil {
+ err = fmt.Errorf("newManifest error: %v, cleanup error (%v)", err, rerr)
+ }
s.reuseFileNum(fd.Num)
}
}()
@@ -453,6 +455,12 @@ func (s *session) newManifest(rec *sessionRecord, v *version) (err error) {
if err != nil {
return
}
+ if !s.o.GetNoSync() {
+ err = writer.Sync()
+ if err != nil {
+ return
+ }
+ }
err = s.stor.SetMeta(fd)
return
}
diff --git a/leveldb/storage/file_storage.go b/leveldb/storage/file_storage.go
index 9ba71fd..3c5e70a 100644
--- a/leveldb/storage/file_storage.go
+++ b/leveldb/storage/file_storage.go
@@ -111,7 +111,9 @@ func OpenFile(path string, readOnly bool) (Storage, error) {
defer func() {
if err != nil {
- flock.release()
+ if ferr := flock.release(); ferr != nil {
+ err = fmt.Errorf("error opening file (%v); error unlocking file (%v)", err, ferr)
+ }
}
}()
@@ -175,12 +177,13 @@ func itoa(buf []byte, i int, wid int) []byte {
return append(buf, b[bp:]...)
}
-func (fs *fileStorage) printDay(t time.Time) {
+func (fs *fileStorage) printDay(t time.Time) error {
if fs.day == t.Day() {
- return
+ return nil
}
fs.day = t.Day()
- fs.logw.Write([]byte("=============== " + t.Format("Jan 2, 2006 (MST)") + " ===============\n"))
+ _, err := fs.logw.Write([]byte("=============== " + t.Format("Jan 2, 2006 (MST)") + " ===============\n"))
+ return err
}
func (fs *fileStorage) doLog(t time.Time, str string) {
@@ -189,7 +192,9 @@ func (fs *fileStorage) doLog(t time.Time, str string) {
fs.logw.Close()
fs.logw = nil
fs.logSize = 0
- rename(filepath.Join(fs.path, "LOG"), filepath.Join(fs.path, "LOG.old"))
+ if err := rename(filepath.Join(fs.path, "LOG"), filepath.Join(fs.path, "LOG.old")); err != nil {
+ return
+ }
}
if fs.logw == nil {
var err error
@@ -200,7 +205,9 @@ func (fs *fileStorage) doLog(t time.Time, str string) {
// Force printDay on new log file.
fs.day = 0
}
- fs.printDay(t)
+ if err := fs.printDay(t); err != nil {
+ return
+ }
hour, min, sec := t.Clock()
msec := t.Nanosecond() / 1e3
// time
@@ -634,8 +641,9 @@ func fsGenOldName(fd FileDesc) string {
switch fd.Type {
case TypeTable:
return fmt.Sprintf("%06d.sst", fd.Num)
+ default:
+ return fsGenName(fd)
}
- return fsGenName(fd)
}
func fsParseName(name string) (fd FileDesc, ok bool) {
diff --git a/leveldb/storage/file_storage_nacl.go b/leveldb/storage/file_storage_nacl.go
index 5545aee..b23d465 100644
--- a/leveldb/storage/file_storage_nacl.go
+++ b/leveldb/storage/file_storage_nacl.go
@@ -4,6 +4,7 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
+//go:build nacl
// +build nacl
package storage
diff --git a/leveldb/storage/file_storage_solaris.go b/leveldb/storage/file_storage_solaris.go
index 79901ee..cd84ce2 100644
--- a/leveldb/storage/file_storage_solaris.go
+++ b/leveldb/storage/file_storage_solaris.go
@@ -4,6 +4,7 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
+//go:build solaris
// +build solaris
package storage
diff --git a/leveldb/storage/file_storage_test.go b/leveldb/storage/file_storage_test.go
index 2e60315..22550f7 100644
--- a/leveldb/storage/file_storage_test.go
+++ b/leveldb/storage/file_storage_test.go
@@ -87,7 +87,9 @@ func TestFileStorage_MetaSetGet(t *testing.T) {
if err != nil {
t.Fatalf("Create(%d): got error: %v", i, err)
}
- w.Write([]byte("TEST"))
+ if _, err := w.Write([]byte("TEST")); err != nil {
+ t.Fatalf("Write(%d): got error: %v", i, err)
+ }
w.Close()
if err := fs.SetMeta(fd); err != nil {
t.Fatalf("SetMeta(%d): got error: %v", i, err)
diff --git a/leveldb/storage/file_storage_unix.go b/leveldb/storage/file_storage_unix.go
index d75f66a..601ffe3 100644
--- a/leveldb/storage/file_storage_unix.go
+++ b/leveldb/storage/file_storage_unix.go
@@ -4,6 +4,7 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
+//go:build darwin || dragonfly || freebsd || linux || netbsd || openbsd
// +build darwin dragonfly freebsd linux netbsd openbsd
package storage
diff --git a/leveldb/storage/mem_storage.go b/leveldb/storage/mem_storage.go
index 838f1be..a32972a 100644
--- a/leveldb/storage/mem_storage.go
+++ b/leveldb/storage/mem_storage.go
@@ -29,7 +29,6 @@ func (lock *memStorageLock) Unlock() {
if ms.slock == lock {
ms.slock = nil
}
- return
}
// memStorage is a memory-backed storage.
diff --git a/leveldb/storage/mem_storage_test.go b/leveldb/storage/mem_storage_test.go
index bb0a19d..faeca2c 100644
--- a/leveldb/storage/mem_storage_test.go
+++ b/leveldb/storage/mem_storage_test.go
@@ -35,7 +35,9 @@ func TestMemStorage(t *testing.T) {
if err != nil {
t.Fatal("Storage.Create: ", err)
}
- w.Write([]byte("abc"))
+ if _, err := w.Write([]byte("abc")); err != nil {
+ t.Fatal("Storage.Write: ", err)
+ }
w.Close()
if fds, _ := m.List(TypeAll); len(fds) != 1 {
t.Fatal("invalid GetFiles len")
@@ -45,7 +47,9 @@ func TestMemStorage(t *testing.T) {
if err != nil {
t.Fatal("Open: got error: ", err)
}
- buf.ReadFrom(r)
+ if _, err := buf.ReadFrom(r); err != nil {
+ t.Fatal("ReadFrom: got error: ", err)
+ }
r.Close()
if got := buf.String(); got != "abc" {
t.Fatalf("Read: invalid value, want=abc got=%s", got)
@@ -56,7 +60,9 @@ func TestMemStorage(t *testing.T) {
if _, err := m.Open(FileDesc{TypeTable, 1}); err == nil {
t.Fatal("expecting error")
}
- m.Remove(FileDesc{TypeTable, 1})
+ if err := m.Remove(FileDesc{TypeTable, 1}); err != nil {
+ t.Fatal("Remove: got error: ", err)
+ }
if fds, _ := m.List(TypeAll); len(fds) != 0 {
t.Fatal("invalid GetFiles len", len(fds))
}
diff --git a/leveldb/storage/storage.go b/leveldb/storage/storage.go
index 4e4a724..b385fc6 100644
--- a/leveldb/storage/storage.go
+++ b/leveldb/storage/storage.go
@@ -59,8 +59,9 @@ func isCorrupted(err error) bool {
switch err.(type) {
case *ErrCorrupted:
return true
+ default:
+ return false
}
- return false
}
func (e *ErrCorrupted) Error() string {
diff --git a/leveldb/table.go b/leveldb/table.go
index 884be5d..d0fab40 100644
--- a/leveldb/table.go
+++ b/leveldb/table.go
@@ -88,18 +88,6 @@ type tFiles []*tFile
func (tf tFiles) Len() int { return len(tf) }
func (tf tFiles) Swap(i, j int) { tf[i], tf[j] = tf[j], tf[i] }
-func (tf tFiles) nums() string {
- x := "[ "
- for i, f := range tf {
- if i != 0 {
- x += ", "
- }
- x += fmt.Sprint(f.fd.Num)
- }
- x += " ]"
- return x
-}
-
// Returns true if i smallest key is less than j.
// This used for sort by key in ascending order.
func (tf tFiles) lessByKey(icmp *iComparer, i, j int) bool {
@@ -360,13 +348,13 @@ type tOps struct {
s *session
noSync bool
evictRemoved bool
- cache *cache.Cache
- bcache *cache.Cache
- bpool *util.BufferPool
+ fileCache *cache.Cache
+ blockCache *cache.Cache
+ blockBuffer *util.BufferPool
}
// Creates an empty table and returns table writer.
-func (t *tOps) create() (*tWriter, error) {
+func (t *tOps) create(tSize int) (*tWriter, error) {
fd := storage.FileDesc{Type: storage.TypeTable, Num: t.s.allocFileNum()}
fw, err := t.s.stor.Create(fd)
if err != nil {
@@ -376,20 +364,22 @@ func (t *tOps) create() (*tWriter, error) {
t: t,
fd: fd,
w: fw,
- tw: table.NewWriter(fw, t.s.o.Options),
+ tw: table.NewWriter(fw, t.s.o.Options, t.blockBuffer, tSize),
}, nil
}
// Builds table from src iterator.
func (t *tOps) createFrom(src iterator.Iterator) (f *tFile, n int, err error) {
- w, err := t.create()
+ w, err := t.create(0)
if err != nil {
return
}
defer func() {
if err != nil {
- w.drop()
+ if derr := w.drop(); derr != nil {
+ err = fmt.Errorf("error createFrom (%v); error dropping (%v)", err, derr)
+ }
}
}()
@@ -412,22 +402,22 @@ func (t *tOps) createFrom(src iterator.Iterator) (f *tFile, n int, err error) {
// Opens table. It returns a cache handle, which should
// be released after use.
func (t *tOps) open(f *tFile) (ch *cache.Handle, err error) {
- ch = t.cache.Get(0, uint64(f.fd.Num), func() (size int, value cache.Value) {
+ ch = t.fileCache.Get(0, uint64(f.fd.Num), func() (size int, value cache.Value) {
var r storage.Reader
r, err = t.s.stor.Open(f.fd)
if err != nil {
return 0, nil
}
- var bcache *cache.NamespaceGetter
- if t.bcache != nil {
- bcache = &cache.NamespaceGetter{Cache: t.bcache, NS: uint64(f.fd.Num)}
+ var blockCache *cache.NamespaceGetter
+ if t.blockCache != nil {
+ blockCache = &cache.NamespaceGetter{Cache: t.blockCache, NS: uint64(f.fd.Num)}
}
var tr *table.Reader
- tr, err = table.NewReader(r, f.size, f.fd, bcache, t.bpool, t.s.o.Options)
+ tr, err = table.NewReader(r, f.size, f.fd, blockCache, t.blockBuffer, t.s.o.Options)
if err != nil {
- r.Close()
+ _ = r.Close()
return 0, nil
}
return 1, tr
@@ -484,14 +474,14 @@ func (t *tOps) newIterator(f *tFile, slice *util.Range, ro *opt.ReadOptions) ite
// Removes table from persistent storage. It waits until
// no one use the the table.
func (t *tOps) remove(fd storage.FileDesc) {
- t.cache.Delete(0, uint64(fd.Num), func() {
+ t.fileCache.Delete(0, uint64(fd.Num), func() {
if err := t.s.stor.Remove(fd); err != nil {
t.s.logf("table@remove removing @%d %q", fd.Num, err)
} else {
t.s.logf("table@remove removed @%d", fd.Num)
}
- if t.evictRemoved && t.bcache != nil {
- t.bcache.EvictNS(uint64(fd.Num))
+ if t.evictRemoved && t.blockCache != nil {
+ t.blockCache.EvictNS(uint64(fd.Num))
}
// Try to reuse file num, useful for discarded transaction.
t.s.reuseFileNum(fd.Num)
@@ -501,40 +491,39 @@ func (t *tOps) remove(fd storage.FileDesc) {
// Closes the table ops instance. It will close all tables,
// regadless still used or not.
func (t *tOps) close() {
- t.bpool.Close()
- t.cache.Close()
- if t.bcache != nil {
- t.bcache.CloseWeak()
+ t.fileCache.Close(true)
+ if t.blockCache != nil {
+ t.blockCache.Close(false)
}
}
// Creates new initialized table ops instance.
func newTableOps(s *session) *tOps {
var (
- cacher cache.Cacher
- bcache *cache.Cache
- bpool *util.BufferPool
+ fileCacher cache.Cacher
+ blockCache *cache.Cache
+ blockBuffer *util.BufferPool
)
if s.o.GetOpenFilesCacheCapacity() > 0 {
- cacher = s.o.GetOpenFilesCacher().New(s.o.GetOpenFilesCacheCapacity())
+ fileCacher = s.o.GetOpenFilesCacher().New(s.o.GetOpenFilesCacheCapacity())
}
if !s.o.GetDisableBlockCache() {
- var bcacher cache.Cacher
+ var blockCacher cache.Cacher
if s.o.GetBlockCacheCapacity() > 0 {
- bcacher = s.o.GetBlockCacher().New(s.o.GetBlockCacheCapacity())
+ blockCacher = s.o.GetBlockCacher().New(s.o.GetBlockCacheCapacity())
}
- bcache = cache.NewCache(bcacher)
+ blockCache = cache.NewCache(blockCacher)
}
if !s.o.GetDisableBufferPool() {
- bpool = util.NewBufferPool(s.o.GetBlockSize() + 5)
+ blockBuffer = util.NewBufferPool(s.o.GetBlockSize() + 5)
}
return &tOps{
s: s,
noSync: s.o.GetNoSync(),
evictRemoved: s.o.GetBlockCacheEvictRemoved(),
- cache: cache.NewCache(cacher),
- bcache: bcache,
- bpool: bpool,
+ fileCache: cache.NewCache(fileCacher),
+ blockCache: blockCache,
+ blockBuffer: blockBuffer,
}
}
@@ -553,7 +542,7 @@ type tWriter struct {
// Append key/value pair to the table.
func (w *tWriter) append(key, value []byte) error {
if w.first == nil {
- w.first = append([]byte{}, key...)
+ w.first = append([]byte(nil), key...)
}
w.last = append(w.last[:0], key...)
return w.tw.Append(key, value)
@@ -565,16 +554,27 @@ func (w *tWriter) empty() bool {
}
// Closes the storage.Writer.
-func (w *tWriter) close() {
+func (w *tWriter) close() error {
if w.w != nil {
- w.w.Close()
+ if err := w.w.Close(); err != nil {
+ return err
+ }
w.w = nil
}
+ return nil
}
// Finalizes the table and returns table file.
func (w *tWriter) finish() (f *tFile, err error) {
- defer w.close()
+ defer func() {
+ if cerr := w.close(); cerr != nil {
+ if err == nil {
+ err = cerr
+ } else {
+ err = fmt.Errorf("error opening file (%v); error unlocking file (%v)", err, cerr)
+ }
+ }
+ }()
err = w.tw.Close()
if err != nil {
return
@@ -590,11 +590,16 @@ func (w *tWriter) finish() (f *tFile, err error) {
}
// Drops the table.
-func (w *tWriter) drop() {
- w.close()
- w.t.s.stor.Remove(w.fd)
- w.t.s.reuseFileNum(w.fd.Num)
+func (w *tWriter) drop() error {
+ if err := w.close(); err != nil {
+ return err
+ }
w.tw = nil
w.first = nil
w.last = nil
+ if err := w.t.s.stor.Remove(w.fd); err != nil {
+ return err
+ }
+ w.t.s.reuseFileNum(w.fd.Num)
+ return nil
}
diff --git a/leveldb/table/block_test.go b/leveldb/table/block_test.go
index 00e6f9e..392026f 100644
--- a/leveldb/table/block_test.go
+++ b/leveldb/table/block_test.go
@@ -37,9 +37,9 @@ var _ = testutil.Defer(func() {
scratch: make([]byte, 30),
}
kv.Iterate(func(i int, key, value []byte) {
- bw.append(key, value)
+ Expect(bw.append(key, value)).ShouldNot(HaveOccurred())
})
- bw.finish()
+ Expect(bw.finish()).ShouldNot(HaveOccurred())
// Opening the block.
data := bw.buf.Bytes()
diff --git a/leveldb/table/reader.go b/leveldb/table/reader.go
index 496feb6..8128794 100644
--- a/leveldb/table/reader.go
+++ b/leveldb/table/reader.go
@@ -901,7 +901,7 @@ func (r *Reader) find(key []byte, filtered bool, ro *opt.ReadOptions, noValue bo
} else {
// Value does use block buffer, and since the buffer will be
// recycled, it need to be copied.
- value = append([]byte{}, data.Value()...)
+ value = append([]byte(nil), data.Value()...)
}
}
data.Release()
diff --git a/leveldb/table/table_test.go b/leveldb/table/table_test.go
index 232efcd..d1d864d 100644
--- a/leveldb/table/table_test.go
+++ b/leveldb/table/table_test.go
@@ -47,15 +47,23 @@ var _ = testutil.Defer(func() {
)
// Building the table.
- tw := NewWriter(buf, o)
- tw.Append([]byte("k01"), []byte("hello"))
- tw.Append([]byte("k02"), []byte("hello2"))
- tw.Append([]byte("k03"), bytes.Repeat([]byte{'x'}, 10000))
- tw.Append([]byte("k04"), bytes.Repeat([]byte{'x'}, 200000))
- tw.Append([]byte("k05"), bytes.Repeat([]byte{'x'}, 300000))
- tw.Append([]byte("k06"), []byte("hello3"))
- tw.Append([]byte("k07"), bytes.Repeat([]byte{'x'}, 100000))
- err := tw.Close()
+ tw := NewWriter(buf, o, nil, 0)
+ err := tw.Append([]byte("k01"), []byte("hello"))
+ Expect(err).ShouldNot(HaveOccurred())
+ err = tw.Append([]byte("k02"), []byte("hello2"))
+ Expect(err).ShouldNot(HaveOccurred())
+ err = tw.Append([]byte("k03"), bytes.Repeat([]byte{'x'}, 10000))
+ Expect(err).ShouldNot(HaveOccurred())
+ err = tw.Append([]byte("k04"), bytes.Repeat([]byte{'x'}, 200000))
+ Expect(err).ShouldNot(HaveOccurred())
+ err = tw.Append([]byte("k05"), bytes.Repeat([]byte{'x'}, 300000))
+ Expect(err).ShouldNot(HaveOccurred())
+ err = tw.Append([]byte("k06"), []byte("hello3"))
+ Expect(err).ShouldNot(HaveOccurred())
+ err = tw.Append([]byte("k07"), bytes.Repeat([]byte{'x'}, 100000))
+ Expect(err).ShouldNot(HaveOccurred())
+
+ err = tw.Close()
It("Should be able to approximate offset of a key correctly", func() {
Expect(err).ShouldNot(HaveOccurred())
@@ -90,9 +98,9 @@ var _ = testutil.Defer(func() {
buf := &bytes.Buffer{}
// Building the table.
- tw := NewWriter(buf, o)
+ tw := NewWriter(buf, o, nil, 0)
kv.Iterate(func(i int, key, value []byte) {
- tw.Append(key, value)
+ Expect(tw.Append(key, value)).ShouldNot(HaveOccurred())
})
tw.Close()
diff --git a/leveldb/table/writer.go b/leveldb/table/writer.go
index fda697b..ea89d60 100644
--- a/leveldb/table/writer.go
+++ b/leveldb/table/writer.go
@@ -40,7 +40,7 @@ type blockWriter struct {
scratch []byte
}
-func (w *blockWriter) append(key, value []byte) {
+func (w *blockWriter) append(key, value []byte) (err error) {
nShared := 0
if w.nEntries%w.restartInterval == 0 {
w.restarts = append(w.restarts, uint32(w.buf.Len()))
@@ -50,14 +50,21 @@ func (w *blockWriter) append(key, value []byte) {
n := binary.PutUvarint(w.scratch[0:], uint64(nShared))
n += binary.PutUvarint(w.scratch[n:], uint64(len(key)-nShared))
n += binary.PutUvarint(w.scratch[n:], uint64(len(value)))
- w.buf.Write(w.scratch[:n])
- w.buf.Write(key[nShared:])
- w.buf.Write(value)
+ if _, err = w.buf.Write(w.scratch[:n]); err != nil {
+ return err
+ }
+ if _, err = w.buf.Write(key[nShared:]); err != nil {
+ return err
+ }
+ if _, err = w.buf.Write(value); err != nil {
+ return err
+ }
w.prevKey = append(w.prevKey[:0], key...)
w.nEntries++
+ return nil
}
-func (w *blockWriter) finish() {
+func (w *blockWriter) finish() error {
// Write restarts entry.
if w.nEntries == 0 {
// Must have at least one restart entry.
@@ -68,6 +75,7 @@ func (w *blockWriter) finish() {
buf4 := w.buf.Alloc(4)
binary.LittleEndian.PutUint32(buf4, x)
}
+ return nil
}
func (w *blockWriter) reset() {
@@ -109,9 +117,9 @@ func (w *filterWriter) flush(offset uint64) {
}
}
-func (w *filterWriter) finish() {
+func (w *filterWriter) finish() error {
if w.generator == nil {
- return
+ return nil
}
// Generate last keys.
@@ -123,7 +131,7 @@ func (w *filterWriter) finish() {
buf4 := w.buf.Alloc(4)
binary.LittleEndian.PutUint32(buf4, x)
}
- w.buf.WriteByte(byte(w.baseLg))
+ return w.buf.WriteByte(byte(w.baseLg))
}
func (w *filterWriter) generate() {
@@ -146,6 +154,7 @@ type Writer struct {
compression opt.Compression
blockSize int
+ bpool *util.BufferPool
dataBlock blockWriter
indexBlock blockWriter
filterBlock filterWriter
@@ -193,9 +202,9 @@ func (w *Writer) writeBlock(buf *util.Buffer, compression opt.Compression) (bh b
return
}
-func (w *Writer) flushPendingBH(key []byte) {
+func (w *Writer) flushPendingBH(key []byte) error {
if w.pendingBH.length == 0 {
- return
+ return nil
}
var separator []byte
if len(key) == 0 {
@@ -210,15 +219,20 @@ func (w *Writer) flushPendingBH(key []byte) {
}
n := encodeBlockHandle(w.scratch[:20], w.pendingBH)
// Append the block handle to the index block.
- w.indexBlock.append(separator, w.scratch[:n])
+ if err := w.indexBlock.append(separator, w.scratch[:n]); err != nil {
+ return err
+ }
// Reset prev key of the data block.
w.dataBlock.prevKey = w.dataBlock.prevKey[:0]
// Clear pending block handle.
w.pendingBH = blockHandle{}
+ return nil
}
func (w *Writer) finishBlock() error {
- w.dataBlock.finish()
+ if err := w.dataBlock.finish(); err != nil {
+ return err
+ }
bh, err := w.writeBlock(&w.dataBlock.buf, w.compression)
if err != nil {
return err
@@ -244,9 +258,13 @@ func (w *Writer) Append(key, value []byte) error {
return w.err
}
- w.flushPendingBH(key)
+ if err := w.flushPendingBH(key); err != nil {
+ return err
+ }
// Append key/value pair to the data block.
- w.dataBlock.append(key, value)
+ if err := w.dataBlock.append(key, value); err != nil {
+ return err
+ }
// Add key to the filter block.
w.filterBlock.add(key)
@@ -285,6 +303,16 @@ func (w *Writer) BytesLen() int {
// after Close, but calling BlocksLen, EntriesLen and BytesLen
// is still possible.
func (w *Writer) Close() error {
+ defer func() {
+ if w.bpool != nil {
+ // Buffer.Bytes() returns [offset:] of the buffer.
+ // We need to Reset() so that the offset = 0, resulting
+ // in buf.Bytes() returning the whole allocated bytes.
+ w.dataBlock.buf.Reset()
+ w.bpool.Put(w.dataBlock.buf.Bytes())
+ }
+ }()
+
if w.err != nil {
return w.err
}
@@ -297,11 +325,15 @@ func (w *Writer) Close() error {
return w.err
}
}
- w.flushPendingBH(nil)
+ if err := w.flushPendingBH(nil); err != nil {
+ return err
+ }
// Write the filter block.
var filterBH blockHandle
- w.filterBlock.finish()
+ if err := w.filterBlock.finish(); err != nil {
+ return err
+ }
if buf := &w.filterBlock.buf; buf.Len() > 0 {
filterBH, w.err = w.writeBlock(buf, opt.NoCompression)
if w.err != nil {
@@ -313,9 +345,13 @@ func (w *Writer) Close() error {
if filterBH.length > 0 {
key := []byte("filter." + w.filter.Name())
n := encodeBlockHandle(w.scratch[:20], filterBH)
- w.dataBlock.append(key, w.scratch[:n])
+ if err := w.dataBlock.append(key, w.scratch[:n]); err != nil {
+ return err
+ }
+ }
+ if err := w.dataBlock.finish(); err != nil {
+ return err
}
- w.dataBlock.finish()
metaindexBH, err := w.writeBlock(&w.dataBlock.buf, w.compression)
if err != nil {
w.err = err
@@ -323,7 +359,9 @@ func (w *Writer) Close() error {
}
// Write the index block.
- w.indexBlock.finish()
+ if err := w.indexBlock.finish(); err != nil {
+ return err
+ }
indexBH, err := w.writeBlock(&w.indexBlock.buf, w.compression)
if err != nil {
w.err = err
@@ -351,7 +389,15 @@ func (w *Writer) Close() error {
// NewWriter creates a new initialized table writer for the file.
//
// Table writer is not safe for concurrent use.
-func NewWriter(f io.Writer, o *opt.Options) *Writer {
+func NewWriter(f io.Writer, o *opt.Options, pool *util.BufferPool, size int) *Writer {
+ var bufBytes []byte
+ if pool == nil {
+ bufBytes = make([]byte, size)
+ } else {
+ bufBytes = pool.Get(size)
+ }
+ bufBytes = bufBytes[:0]
+
w := &Writer{
writer: f,
cmp: o.GetComparer(),
@@ -359,6 +405,8 @@ func NewWriter(f io.Writer, o *opt.Options) *Writer {
compression: o.GetCompression(),
blockSize: o.GetBlockSize(),
comparerScratch: make([]byte, 0),
+ bpool: pool,
+ dataBlock: blockWriter{buf: *util.NewBuffer(bufBytes)},
}
// data block
w.dataBlock.restartInterval = o.GetBlockRestartInterval()
diff --git a/leveldb/testutil/kv.go b/leveldb/testutil/kv.go
index 608cbf3..c44654c 100644
--- a/leveldb/testutil/kv.go
+++ b/leveldb/testutil/kv.go
@@ -343,7 +343,7 @@ func KeyValue_Generate(rnd *rand.Rand, n, incr, minlen, maxlen, vminlen, vmaxlen
key[j] = keymap[gen[j]]
}
value := make([]byte, rrand(vminlen, vmaxlen))
- for n := copy(value, []byte(fmt.Sprintf("v%d", i))); n < len(value); n++ {
+ for n := copy(value, fmt.Sprintf("v%d", i)); n < len(value); n++ {
value[n] = 'x'
}
kv.Put(key, value)
diff --git a/leveldb/testutil/util.go b/leveldb/testutil/util.go
index 97c5294..424f938 100644
--- a/leveldb/testutil/util.go
+++ b/leveldb/testutil/util.go
@@ -86,7 +86,7 @@ func BytesSeparator(a, b []byte) []byte {
}
for ; i < n && (a[i] == b[i]); i++ {
}
- x := append([]byte{}, a[:i]...)
+ x := append([]byte(nil), a[:i]...)
if i < n {
if c := a[i] + 1; c < b[i] {
return append(x, c)
@@ -112,9 +112,8 @@ func BytesAfter(b []byte) []byte {
for _, c := range b {
if c < 0xff {
return append(x, c+1)
- } else {
- x = append(x, c)
}
+ x = append(x, c)
}
return append(x, 'x')
}
@@ -126,7 +125,6 @@ func RandomIndex(rnd *rand.Rand, n, round int, fn func(i int)) {
for x := 0; x < round; x++ {
fn(rnd.Intn(n))
}
- return
}
func ShuffledIndex(rnd *rand.Rand, n, round int, fn func(i int)) {
@@ -138,7 +136,6 @@ func ShuffledIndex(rnd *rand.Rand, n, round int, fn func(i int)) {
fn(i)
}
}
- return
}
func RandomRange(rnd *rand.Rand, n, round int, fn func(start, limit int)) {
@@ -153,7 +150,6 @@ func RandomRange(rnd *rand.Rand, n, round int, fn func(start, limit int)) {
}
fn(start, start+length)
}
- return
}
func Max(x, y int) int {
diff --git a/leveldb/util.go b/leveldb/util.go
index 0e2b519..1ef859d 100644
--- a/leveldb/util.go
+++ b/leveldb/util.go
@@ -22,7 +22,7 @@ func shorten(str string) string {
var bunits = [...]string{"", "Ki", "Mi", "Gi", "Ti"}
-func shortenb(bytes int) string {
+func shortenb(bytes int64) string {
i := 0
for ; bytes > 1024 && i < 4; i++ {
bytes /= 1024
@@ -30,7 +30,7 @@ func shortenb(bytes int) string {
return fmt.Sprintf("%d%sB", bytes, bunits[i])
}
-func sshortenb(bytes int) string {
+func sshortenb(bytes int64) string {
if bytes == 0 {
return "~"
}
@@ -58,13 +58,6 @@ func sint(x int) string {
return fmt.Sprintf("%s%d", sign, x)
}
-func minInt(a, b int) int {
- if a < b {
- return a
- }
- return b
-}
-
func maxInt(a, b int) int {
if a > b {
return a
diff --git a/leveldb/util/buffer_pool.go b/leveldb/util/buffer_pool.go
index 5ab1f86..4f512f6 100644
--- a/leveldb/util/buffer_pool.go
+++ b/leveldb/util/buffer_pool.go
@@ -10,30 +10,15 @@ import (
"fmt"
"sync"
"sync/atomic"
- "time"
)
-type buffer struct {
- b []byte
- miss int
-}
-
// BufferPool is a 'buffer pool'.
type BufferPool struct {
- pool [6]chan []byte
- size [5]uint32
- sizeMiss [5]uint32
- sizeHalf [5]uint32
- baseline [4]int
- baseline0 int
-
- mu sync.RWMutex
- closed bool
- closeC chan struct{}
+ pool [6]sync.Pool
+ baseline [5]int
get uint32
put uint32
- half uint32
less uint32
equal uint32
greater uint32
@@ -41,15 +26,12 @@ type BufferPool struct {
}
func (p *BufferPool) poolNum(n int) int {
- if n <= p.baseline0 && n > p.baseline0/2 {
- return 0
- }
for i, x := range p.baseline {
if n <= x {
- return i + 1
+ return i
}
}
- return len(p.baseline) + 1
+ return len(p.baseline)
}
// Get returns buffer with length of n.
@@ -57,101 +39,47 @@ func (p *BufferPool) Get(n int) []byte {
if p == nil {
return make([]byte, n)
}
+ atomic.AddUint32(&p.get, 1)
- p.mu.RLock()
- defer p.mu.RUnlock()
+ poolNum := p.poolNum(n)
- if p.closed {
- return make([]byte, n)
- }
+ b := p.pool[poolNum].Get().(*[]byte)
- atomic.AddUint32(&p.get, 1)
+ if cap(*b) == 0 {
+ // If we grabbed nothing, increment the miss stats.
+ atomic.AddUint32(&p.miss, 1)
- poolNum := p.poolNum(n)
- pool := p.pool[poolNum]
- if poolNum == 0 {
- // Fast path.
- select {
- case b := <-pool:
- switch {
- case cap(b) > n:
- if cap(b)-n >= n {
- atomic.AddUint32(&p.half, 1)
- select {
- case pool <- b:
- default:
- }
- return make([]byte, n)
- } else {
- atomic.AddUint32(&p.less, 1)
- return b[:n]
- }
- case cap(b) == n:
- atomic.AddUint32(&p.equal, 1)
- return b[:n]
- default:
- atomic.AddUint32(&p.greater, 1)
- }
- default:
- atomic.AddUint32(&p.miss, 1)
+ if poolNum == len(p.baseline) {
+ *b = make([]byte, n)
+ return *b
}
- return make([]byte, n, p.baseline0)
+ *b = make([]byte, p.baseline[poolNum])
+ *b = (*b)[:n]
+ return *b
} else {
- sizePtr := &p.size[poolNum-1]
-
- select {
- case b := <-pool:
- switch {
- case cap(b) > n:
- if cap(b)-n >= n {
- atomic.AddUint32(&p.half, 1)
- sizeHalfPtr := &p.sizeHalf[poolNum-1]
- if atomic.AddUint32(sizeHalfPtr, 1) == 20 {
- atomic.StoreUint32(sizePtr, uint32(cap(b)/2))
- atomic.StoreUint32(sizeHalfPtr, 0)
- } else {
- select {
- case pool <- b:
- default:
- }
- }
- return make([]byte, n)
- } else {
- atomic.AddUint32(&p.less, 1)
- return b[:n]
- }
- case cap(b) == n:
- atomic.AddUint32(&p.equal, 1)
- return b[:n]
- default:
- atomic.AddUint32(&p.greater, 1)
- if uint32(cap(b)) >= atomic.LoadUint32(sizePtr) {
- select {
- case pool <- b:
- default:
- }
- }
- }
- default:
- atomic.AddUint32(&p.miss, 1)
+ // If there is enough capacity in the bytes grabbed, resize the length
+ // to n and return.
+ if n < cap(*b) {
+ atomic.AddUint32(&p.less, 1)
+ *b = (*b)[:n]
+ return *b
+ } else if n == cap(*b) {
+ atomic.AddUint32(&p.equal, 1)
+ *b = (*b)[:n]
+ return *b
+ } else if n > cap(*b) {
+ atomic.AddUint32(&p.greater, 1)
}
+ }
- if size := atomic.LoadUint32(sizePtr); uint32(n) > size {
- if size == 0 {
- atomic.CompareAndSwapUint32(sizePtr, 0, uint32(n))
- } else {
- sizeMissPtr := &p.sizeMiss[poolNum-1]
- if atomic.AddUint32(sizeMissPtr, 1) == 20 {
- atomic.StoreUint32(sizePtr, uint32(n))
- atomic.StoreUint32(sizeMissPtr, 0)
- }
- }
- return make([]byte, n)
- } else {
- return make([]byte, n, size)
- }
+ if poolNum == len(p.baseline) {
+ *b = make([]byte, n)
+ return *b
}
+ *b = make([]byte, p.baseline[poolNum])
+ *b = (*b)[:n]
+ return *b
}
// Put adds given buffer to the pool.
@@ -160,68 +88,18 @@ func (p *BufferPool) Put(b []byte) {
return
}
- p.mu.RLock()
- defer p.mu.RUnlock()
-
- if p.closed {
- return
- }
+ poolNum := p.poolNum(cap(b))
atomic.AddUint32(&p.put, 1)
-
- pool := p.pool[p.poolNum(cap(b))]
- select {
- case pool <- b:
- default:
- }
-
-}
-
-func (p *BufferPool) Close() {
- if p == nil {
- return
- }
-
- p.mu.Lock()
- if !p.closed {
- p.closed = true
- p.closeC <- struct{}{}
- }
- p.mu.Unlock()
+ p.pool[poolNum].Put(&b)
}
func (p *BufferPool) String() string {
if p == nil {
return "<nil>"
}
-
- p.mu.Lock()
- defer p.mu.Unlock()
-
- return fmt.Sprintf("BufferPool{B·%d Z·%v Zm·%v Zh·%v G·%d P·%d H·%d <·%d =·%d >·%d M·%d}",
- p.baseline0, p.size, p.sizeMiss, p.sizeHalf, p.get, p.put, p.half, p.less, p.equal, p.greater, p.miss)
-}
-
-func (p *BufferPool) drain() {
- ticker := time.NewTicker(2 * time.Second)
- defer ticker.Stop()
- for {
- select {
- case <-ticker.C:
- for _, ch := range p.pool {
- select {
- case <-ch:
- default:
- }
- }
- case <-p.closeC:
- close(p.closeC)
- for _, ch := range p.pool {
- close(ch)
- }
- return
- }
- }
+ return fmt.Sprintf("BufferPool{B·%d G·%d P·%d <·%d =·%d >·%d M·%d}",
+ p.baseline, p.get, p.put, p.less, p.equal, p.greater, p.miss)
}
// NewBufferPool creates a new initialized 'buffer pool'.
@@ -229,14 +107,29 @@ func NewBufferPool(baseline int) *BufferPool {
if baseline <= 0 {
panic("baseline can't be <= 0")
}
- p := &BufferPool{
- baseline0: baseline,
- baseline: [...]int{baseline / 4, baseline / 2, baseline * 2, baseline * 4},
- closeC: make(chan struct{}, 1),
+ bufPool := &BufferPool{
+ baseline: [...]int{baseline / 4, baseline / 2, baseline, baseline * 2, baseline * 4},
+ pool: [6]sync.Pool{
+ {
+ New: func() interface{} { return new([]byte) },
+ },
+ {
+ New: func() interface{} { return new([]byte) },
+ },
+ {
+ New: func() interface{} { return new([]byte) },
+ },
+ {
+ New: func() interface{} { return new([]byte) },
+ },
+ {
+ New: func() interface{} { return new([]byte) },
+ },
+ {
+ New: func() interface{} { return new([]byte) },
+ },
+ },
}
- for i, cap := range []int{2, 2, 4, 4, 2, 1} {
- p.pool[i] = make(chan []byte, cap)
- }
- go p.drain()
- return p
+
+ return bufPool
}
diff --git a/leveldb/util/buffer_test.go b/leveldb/util/buffer_test.go
index 9487e48..9b44de2 100644
--- a/leveldb/util/buffer_test.go
+++ b/leveldb/util/buffer_test.go
@@ -110,25 +110,32 @@ func TestBasicOperations(t *testing.T) {
}
check(t, "TestBasicOperations (4)", &buf, "a")
- buf.WriteByte(data[1])
+ if err := buf.WriteByte(data[1]); err != nil {
+ t.Fatal(err)
+ }
check(t, "TestBasicOperations (5)", &buf, "ab")
n, err = buf.Write([]byte(data[2:26]))
+ if err != nil {
+ t.Fatal(err)
+ }
if n != 24 {
t.Errorf("wrote 25 bytes, but n == %d", n)
}
- check(t, "TestBasicOperations (6)", &buf, string(data[0:26]))
+ check(t, "TestBasicOperations (6)", &buf, data[0:26])
buf.Truncate(26)
- check(t, "TestBasicOperations (7)", &buf, string(data[0:26]))
+ check(t, "TestBasicOperations (7)", &buf, data[0:26])
buf.Truncate(20)
- check(t, "TestBasicOperations (8)", &buf, string(data[0:20]))
+ check(t, "TestBasicOperations (8)", &buf, data[0:20])
- empty(t, "TestBasicOperations (9)", &buf, string(data[0:20]), make([]byte, 5))
+ empty(t, "TestBasicOperations (9)", &buf, data[0:20], make([]byte, 5))
empty(t, "TestBasicOperations (10)", &buf, "", make([]byte, 100))
- buf.WriteByte(data[1])
+ if err := buf.WriteByte(data[1]); err != nil {
+ t.Fatal(err)
+ }
c, err := buf.ReadByte()
if err != nil {
t.Error("ReadByte unexpected eof")
@@ -136,7 +143,7 @@ func TestBasicOperations(t *testing.T) {
if c != data[1] {
t.Errorf("ReadByte wrong value c=%v", c)
}
- c, err = buf.ReadByte()
+ _, err = buf.ReadByte()
if err == nil {
t.Error("ReadByte unexpected not eof")
}
@@ -191,7 +198,9 @@ func TestReadFrom(t *testing.T) {
for i := 3; i < 30; i += 3 {
s := fillBytes(t, "TestReadFrom (1)", &buf, "", 5, testBytes[0:len(testBytes)/i])
var b Buffer
- b.ReadFrom(&buf)
+ if _, err := b.ReadFrom(&buf); err != nil {
+ t.Fatal(err)
+ }
empty(t, "TestReadFrom (2)", &b, s, make([]byte, len(data)))
}
}
@@ -201,7 +210,9 @@ func TestWriteTo(t *testing.T) {
for i := 3; i < 30; i += 3 {
s := fillBytes(t, "TestWriteTo (1)", &buf, "", 5, testBytes[0:len(testBytes)/i])
var b Buffer
- buf.WriteTo(&b)
+ if _, err := buf.WriteTo(&b); err != nil {
+ t.Fatal(err)
+ }
empty(t, "TestWriteTo (2)", &b, s, make([]byte, len(data)))
}
}
@@ -289,7 +300,9 @@ func TestGrow(t *testing.T) {
// Check no allocation occurs in write, as long as we're single-threaded.
var m1, m2 runtime.MemStats
runtime.ReadMemStats(&m1)
- buf.Write(yBytes)
+ if _, err := buf.Write(yBytes); err != nil {
+ t.Fatal(err)
+ }
runtime.ReadMemStats(&m2)
if runtime.GOMAXPROCS(-1) == 1 && m1.Mallocs != m2.Mallocs {
t.Errorf("allocation occurred during write")
@@ -322,11 +335,17 @@ func TestReadEmptyAtEOF(t *testing.T) {
func TestBufferGrowth(t *testing.T) {
var b Buffer
buf := make([]byte, 1024)
- b.Write(buf[0:1])
+ if _, err := b.Write(buf[0:1]); err != nil {
+ t.Fatal(err)
+ }
var cap0 int
for i := 0; i < 5<<10; i++ {
- b.Write(buf)
- b.Read(buf)
+ if _, err := b.Write(buf); err != nil {
+ t.Fatal(err)
+ }
+ if _, err := b.Read(buf); err != nil {
+ t.Fatal(err)
+ }
if i == 0 {
cap0 = cap(b.buf)
}
@@ -346,7 +365,9 @@ func BenchmarkWriteByte(b *testing.B) {
for i := 0; i < b.N; i++ {
buf.Reset()
for i := 0; i < n; i++ {
- buf.WriteByte('x')
+ if err := buf.WriteByte('x'); err != nil {
+ b.Fatal(err)
+ }
}
}
}
@@ -367,11 +388,17 @@ func BenchmarkAlloc(b *testing.B) {
func BenchmarkBufferNotEmptyWriteRead(b *testing.B) {
buf := make([]byte, 1024)
for i := 0; i < b.N; i++ {
- var b Buffer
- b.Write(buf[0:1])
+ var buf2 Buffer
+ if _, err := buf2.Write(buf[0:1]); err != nil {
+ b.Fatal(err)
+ }
for i := 0; i < 5<<10; i++ {
- b.Write(buf)
- b.Read(buf)
+ if _, err := buf2.Write(buf); err != nil {
+ b.Fatal(err)
+ }
+ if _, err := buf2.Read(buf); err != nil {
+ b.Fatal(err)
+ }
}
}
}
@@ -380,14 +407,22 @@ func BenchmarkBufferNotEmptyWriteRead(b *testing.B) {
func BenchmarkBufferFullSmallReads(b *testing.B) {
buf := make([]byte, 1024)
for i := 0; i < b.N; i++ {
- var b Buffer
- b.Write(buf)
- for b.Len()+20 < cap(b.buf) {
- b.Write(buf[:10])
+ var buf2 Buffer
+ if _, err := buf2.Write(buf); err != nil {
+ b.Fatal(err)
+ }
+ for buf2.Len()+20 < cap(buf2.buf) {
+ if _, err := buf2.Write(buf[:10]); err != nil {
+ b.Fatal(err)
+ }
}
for i := 0; i < 5<<10; i++ {
- b.Read(buf[:1])
- b.Write(buf[:1])
+ if _, err := buf2.Read(buf[:1]); err != nil {
+ b.Fatal(err)
+ }
+ if _, err := buf2.Write(buf[:1]); err != nil {
+ b.Fatal(err)
+ }
}
}
}
diff --git a/leveldb/version.go b/leveldb/version.go
index 9535e35..4672509 100644
--- a/leveldb/version.go
+++ b/leveldb/version.go
@@ -43,7 +43,7 @@ type version struct {
// newVersion creates a new version with an unique monotonous increasing id.
func newVersion(s *session) *version {
- id := atomic.AddInt64(&s.ntVersionId, 1)
+ id := atomic.AddInt64(&s.ntVersionID, 1)
nv := &version{s: s, id: id - 1}
return nv
}
@@ -388,7 +388,7 @@ func (v *version) computeCompaction() {
}
statFiles[level] = len(tables)
- statSizes[level] = shortenb(int(size))
+ statSizes[level] = shortenb(size)
statScore[level] = fmt.Sprintf("%.2f", score)
statTotSize += size
}
@@ -396,7 +396,7 @@ func (v *version) computeCompaction() {
v.cLevel = bestLevel
v.cScore = bestScore
- v.s.logf("version@stat F·%v S·%s%v Sc·%v", statFiles, shortenb(int(statTotSize)), statSizes, statScore)
+ v.s.logf("version@stat F·%v S·%s%v Sc·%v", statFiles, shortenb(statTotSize), statSizes, statScore)
}
func (v *version) needCompaction() bool {
diff --git a/leveldb/version_test.go b/leveldb/version_test.go
index 43e5b1f..adb51b2 100644
--- a/leveldb/version_test.go
+++ b/leveldb/version_test.go
@@ -422,7 +422,7 @@ func benchmarkVersionStaging(b *testing.B, trivial bool, size int) {
cnt := 0
for j := index; j < size && cnt <= 3; j++ {
rec.addTable(1, int64(i), 1, ik, ik)
- cnt += 1
+ cnt++
}
vs := v.newStaging()
vs.commit(rec)
diff --git a/manualtest/dbstress/key.go b/manualtest/dbstress/key.go
index c9f6963..160e08c 100644
--- a/manualtest/dbstress/key.go
+++ b/manualtest/dbstress/key.go
@@ -18,7 +18,7 @@ func (e *ErrIkeyCorrupted) Error() string {
}
func newErrIkeyCorrupted(ikey []byte, reason string) error {
- return errors.NewErrCorrupted(storage.FileDesc{}, &ErrIkeyCorrupted{append([]byte{}, ikey...), reason})
+ return errors.NewErrCorrupted(storage.FileDesc{}, &ErrIkeyCorrupted{append([]byte(nil), ikey...), reason})
}
type kType int
@@ -63,75 +63,15 @@ func init() {
binary.LittleEndian.PutUint64(kMaxNumBytes, kMaxNum)
}
-type iKey []byte
-
-func newIkey(ukey []byte, seq uint64, kt kType) iKey {
- if seq > kMaxSeq {
- panic("leveldb: invalid sequence number")
- } else if kt > ktVal {
- panic("leveldb: invalid type")
- }
-
- ik := make(iKey, len(ukey)+8)
- copy(ik, ukey)
- binary.LittleEndian.PutUint64(ik[len(ukey):], (seq<<8)|uint64(kt))
- return ik
-}
-
func parseIkey(ik []byte) (ukey []byte, seq uint64, kt kType, err error) {
if len(ik) < 8 {
return nil, 0, 0, newErrIkeyCorrupted(ik, "invalid length")
}
num := binary.LittleEndian.Uint64(ik[len(ik)-8:])
- seq, kt = uint64(num>>8), kType(num&0xff)
+ seq, kt = num>>8, kType(num&0xff)
if kt > ktVal {
return nil, 0, 0, newErrIkeyCorrupted(ik, "invalid type")
}
ukey = ik[:len(ik)-8]
return
}
-
-func validIkey(ik []byte) bool {
- _, _, _, err := parseIkey(ik)
- return err == nil
-}
-
-func (ik iKey) assert() {
- if ik == nil {
- panic("leveldb: nil iKey")
- }
- if len(ik) < 8 {
- panic(fmt.Sprintf("leveldb: iKey %q, len=%d: invalid length", ik, len(ik)))
- }
-}
-
-func (ik iKey) ukey() []byte {
- ik.assert()
- return ik[:len(ik)-8]
-}
-
-func (ik iKey) num() uint64 {
- ik.assert()
- return binary.LittleEndian.Uint64(ik[len(ik)-8:])
-}
-
-func (ik iKey) parseNum() (seq uint64, kt kType) {
- num := ik.num()
- seq, kt = uint64(num>>8), kType(num&0xff)
- if kt > ktVal {
- panic(fmt.Sprintf("leveldb: iKey %q, len=%d: invalid type %#x", ik, len(ik), kt))
- }
- return
-}
-
-func (ik iKey) String() string {
- if ik == nil {
- return "<nil>"
- }
-
- if ukey, seq, kt, err := parseIkey(ik); err == nil {
- return fmt.Sprintf("%x,%s%d", ukey, kt, seq)
- } else {
- return "<invalid>"
- }
-}
diff --git a/manualtest/dbstress/main.go b/manualtest/dbstress/main.go
index d2a94f3..3e2c8d7 100644
--- a/manualtest/dbstress/main.go
+++ b/manualtest/dbstress/main.go
@@ -17,6 +17,7 @@ import (
"strings"
"sync"
"sync/atomic"
+ "syscall"
"time"
"github.com/syndtr/goleveldb/leveldb"
@@ -39,6 +40,7 @@ var (
enableBlockCache = false
enableCompression = false
enableBufferPool = false
+ maxManifestFileSize = opt.DefaultMaxManifestFileSize
wg = new(sync.WaitGroup)
done, fail uint32
@@ -86,6 +88,7 @@ func init() {
flag.BoolVar(&enableBufferPool, "enablebufferpool", enableBufferPool, "enable buffer pool")
flag.BoolVar(&enableBlockCache, "enableblockcache", enableBlockCache, "enable block cache")
flag.BoolVar(&enableCompression, "enablecompression", enableCompression, "enable block compression")
+ flag.Int64Var(&maxManifestFileSize, "maxManifestFileSize", maxManifestFileSize, "max manifest file size")
}
func randomData(dst []byte, ns, prefix byte, i uint32, dataLen int) []byte {
@@ -152,16 +155,16 @@ type testingStorage struct {
storage.Storage
}
-func (ts *testingStorage) scanTable(fd storage.FileDesc, checksum bool) (corrupted bool) {
+func (ts *testingStorage) scanTable(fd storage.FileDesc, checksum bool) (corrupted bool, err error) {
r, err := ts.Open(fd)
if err != nil {
- log.Fatal(err)
+ return false, err
}
defer r.Close()
size, err := r.Seek(0, os.SEEK_END)
if err != nil {
- log.Fatal(err)
+ return false, err
}
o := &opt.Options{
@@ -173,7 +176,7 @@ func (ts *testingStorage) scanTable(fd storage.FileDesc, checksum bool) (corrupt
}
tr, err := table.NewReader(r, size, fd, nil, bpool, o)
if err != nil {
- log.Fatal(err)
+ return false, err
}
defer tr.Release()
@@ -208,13 +211,13 @@ func (ts *testingStorage) scanTable(fd storage.FileDesc, checksum bool) (corrupt
corrupted = true
log.Printf("FATAL: [%v] Corrupted ikey i=%d: %v", fd, i, kerr)
- return
+ return corrupted, nil
}
if checkData(i, "key", ukey) {
- return
+ return corrupted, nil
}
if kt == ktVal && checkData(i, "value", iter.Value()) {
- return
+ return corrupted, nil
}
}
if err := iter.Error(); err != nil {
@@ -225,11 +228,11 @@ func (ts *testingStorage) scanTable(fd storage.FileDesc, checksum bool) (corrupt
log.Printf("FATAL: [%v] Corruption detected: %v", fd, err)
} else {
- log.Fatal(err)
+ return false, err
}
}
- return
+ return corrupted, nil
}
func (ts *testingStorage) Remove(fd storage.FileDesc) error {
@@ -238,7 +241,11 @@ func (ts *testingStorage) Remove(fd storage.FileDesc) error {
}
if fd.Type == storage.TypeTable {
- if ts.scanTable(fd, true) {
+ corrupted, err := ts.scanTable(fd, true)
+ if err != nil {
+ return err
+ }
+ if corrupted {
return nil
}
}
@@ -259,7 +266,7 @@ func (s *latencyStats) record(n int) {
if s.mark.IsZero() {
panic("not started")
}
- dur := time.Now().Sub(s.mark)
+ dur := time.Since(s.mark)
dur1 := dur / time.Duration(n)
if dur1 < s.min || s.min == 0 {
s.min = dur1
@@ -334,7 +341,10 @@ func main() {
cerr := err.(*errors.ErrCorrupted)
if !cerr.Fd.Zero() && cerr.Fd.Type == storage.TypeTable {
log.Print("FATAL: corruption detected, scanning...")
- if !tstor.scanTable(storage.FileDesc{Type: storage.TypeTable, Num: cerr.Fd.Num}, false) {
+ corrupted, serr := tstor.scanTable(storage.FileDesc{Type: storage.TypeTable, Num: cerr.Fd.Num}, false)
+ if serr != nil {
+ log.Printf("FATAL: unable to scan table %v", serr)
+ } else if !corrupted {
log.Printf("FATAL: unable to find corrupted key/value pair in table %v", cerr.Fd)
}
}
@@ -352,6 +362,7 @@ func main() {
ErrorIfExist: true,
Compression: opt.NoCompression,
Filter: filter.NewBloomFilter(10),
+ MaxManifestFileSize: maxManifestFileSize,
}
if enableCompression {
o.Compression = opt.DefaultCompression
@@ -359,7 +370,7 @@ func main() {
db, err := leveldb.Open(tstor, o)
if err != nil {
- log.Fatal(err)
+ log.Fatal(err) // nolint: gocritic
}
defer db.Close()
@@ -410,7 +421,7 @@ func main() {
log.Print("------------------------")
- log.Printf("> Elapsed=%v", time.Now().Sub(startTime))
+ log.Printf("> Elapsed=%v", time.Since(startTime))
mu.Lock()
log.Printf("> GetLatencyMin=%v GetLatencyMax=%v GetLatencyAvg=%v GetRatePerSec=%d",
gGetStat.min, gGetStat.max, gGetStat.avg(), gGetStat.ratePerSec())
@@ -612,7 +623,7 @@ func main() {
} else {
writeAckAck <- struct{}{}
}
- log.Printf("[%02d] SCANNER #%d Deleted=%d Time=%v", ns, i, delB.Len(), time.Now().Sub(t))
+ log.Printf("[%02d] SCANNER #%d Deleted=%d Time=%v", ns, i, delB.Len(), time.Since(t))
}
i++
@@ -622,8 +633,8 @@ func main() {
}
go func() {
- sig := make(chan os.Signal)
- signal.Notify(sig, os.Interrupt, os.Kill)
+ sig := make(chan os.Signal, 1)
+ signal.Notify(sig, os.Interrupt, syscall.SIGTERM)
log.Printf("Got signal: %v, exiting...", <-sig)
atomic.StoreUint32(&done, 1)
}()
Debdiff
File lists identical (after any substitutions)
No differences were encountered in the control files