New Upstream Snapshot - golang-github-samuel-go-zookeeper
Ready changes
Summary
Merged new upstream version: 0.0~git20201211.7117e9e (was: 0.0~git20180130.c4fab1a).
Resulting package
Built on 2022-10-21T16:18 (took 11m25s)
The resulting binary packages can be installed (if you have the apt repository enabled) by running one of:
apt install -t fresh-snapshots golang-github-samuel-go-zookeeper-dev
Lintian Result
- golang-github-samuel-go-zookeeper-dev_0.0~git20201211.7117e9e-1~jan+nus2_all.deb
- golang-github-samuel-go-zookeeper_0.0~git20201211.7117e9e-1~jan+nus2.dsc
- golang-github-samuel-go-zookeeper_0.0~git20201211.7117e9e-1~jan+nus2_amd64.buildinfo
- golang-github-samuel-go-zookeeper_0.0~git20201211.7117e9e-1~jan+nus2_amd64.changes
Diff
diff --git a/.deepsource.toml b/.deepsource.toml
new file mode 100644
index 0000000..45a99e7
--- /dev/null
+++ b/.deepsource.toml
@@ -0,0 +1,17 @@
+version = 1
+
+test_patterns = [
+ '*_test.go'
+]
+
+exclude_patterns = [
+
+]
+
+[[analyzers]]
+name = 'go'
+enabled = true
+
+
+ [analyzers.meta]
+ import_path = 'github.com/samuel/go-zookeeper/zk'
diff --git a/.gitignore b/.gitignore
index e43b0f9..ca41152 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1 +1,6 @@
+.vscode/
.DS_Store
+profile.cov
+zookeeper
+zookeeper-*/
+zookeeper-*.tar.gz
diff --git a/.travis.yml b/.travis.yml
index 65b27a8..70f0ecf 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -1,6 +1,13 @@
+arch:
+ - amd64
+ - ppc64le
language: go
go:
- - 1.9
+ - "1.11"
+ - "1.x"
+ - tip
+
+go_import_path: github.com/samuel/go-zookeeper
jdk:
- oraclejdk9
@@ -12,22 +19,23 @@ branches:
- master
before_install:
- - wget http://apache.cs.utah.edu/zookeeper/zookeeper-${zk_version}/zookeeper-${zk_version}.tar.gz
- - tar -zxvf zookeeper*tar.gz && zip -d zookeeper-${zk_version}/contrib/fatjar/zookeeper-${zk_version}-fatjar.jar 'META-INF/*.SF' 'META-INF/*.DSA'
- - go get github.com/mattn/goveralls
- - go get golang.org/x/tools/cmd/cover
+ - make setup ZK_VERSION=${zk_version}
+
+before_script:
+ - make lint
script:
- - jdk_switcher use oraclejdk9
- - go build ./...
- - go fmt ./...
- - go vet ./...
- - go test -i -race ./...
- - go test -race -covermode atomic -coverprofile=profile.cov ./zk
- - goveralls -coverprofile=profile.cov -service=travis-ci
+ - jdk_switcher use oraclejdk9 || true
+ - make
+
+matrix:
+ allow_failures:
+ - go: tip
+ fast_finish: true
env:
global:
secure: Coha3DDcXmsekrHCZlKvRAc+pMBaQU1QS/3++3YCCUXVDBWgVsC1ZIc9df4RLdZ/ncGd86eoRq/S+zyn1XbnqK5+ePqwJoUnJ59BE8ZyHLWI9ajVn3fND1MTduu/ksGsS79+IYbdVI5wgjSgjD3Ktp6Y5uPl+BPosjYBGdNcHS4=
matrix:
- - zk_version=3.4.10
+ - zk_version=3.5.4-beta
+ - zk_version=3.4.12
diff --git a/Makefile b/Makefile
new file mode 100644
index 0000000..895a319
--- /dev/null
+++ b/Makefile
@@ -0,0 +1,39 @@
+# make file to hold the logic of build and test setup
+ZK_VERSION ?= 3.4.12
+
+ZK = zookeeper-$(ZK_VERSION)
+ZK_URL = "https://archive.apache.org/dist/zookeeper/$(ZK)/$(ZK).tar.gz"
+
+PACKAGES := $(shell go list ./... | grep -v examples)
+
+.DEFAULT_GOAL := test
+
+$(ZK):
+ wget $(ZK_URL)
+ tar -zxf $(ZK).tar.gz
+ # we link to a standard directory path so then the tests dont need to find based on version
+ # in the test code. this allows backward compatable testing.
+ ln -s $(ZK) zookeeper
+
+.PHONY: install-covertools
+install-covertools:
+ go get github.com/mattn/goveralls
+ go get golang.org/x/tools/cmd/cover
+
+.PHONY: setup
+setup: $(ZK) install-covertools
+
+.PHONY: lint
+lint:
+ go fmt ./...
+ go vet ./...
+
+.PHONY: build
+build:
+ go build ./...
+
+.PHONY: test
+test: build
+ go test -timeout 500s -v -race -covermode atomic -coverprofile=profile.cov $(PACKAGES)
+ # ignore if we fail to publish coverage
+ -goveralls -coverprofile=profile.cov -service=travis-ci
diff --git a/README.md b/README.md
index afc1d08..d19e58d 100644
--- a/README.md
+++ b/README.md
@@ -9,3 +9,8 @@ License
-------
3-clause BSD. See LICENSE file.
+
+This Repository is No Longer Maintained
+=======================================
+
+Please use https://github.com/go-zookeeper/zk for an actively maintained fork.
diff --git a/debian/changelog b/debian/changelog
index 0ceacde..9685785 100644
--- a/debian/changelog
+++ b/debian/changelog
@@ -1,4 +1,4 @@
-golang-github-samuel-go-zookeeper (0.0~git20180130.c4fab1a-2) UNRELEASED; urgency=medium
+golang-github-samuel-go-zookeeper (0.0~git20201211.7117e9e-1) UNRELEASED; urgency=medium
[ Alexandre Viau ]
* Point Vcs-* urls to salsa.debian.org.
@@ -6,8 +6,9 @@ golang-github-samuel-go-zookeeper (0.0~git20180130.c4fab1a-2) UNRELEASED; urgenc
[ Debian Janitor ]
* Remove constraints unnecessary since stretch:
+ Build-Depends: Drop versioned constraint on dh-golang.
+ * New upstream snapshot.
- -- Alexandre Viau <aviau@debian.org> Mon, 02 Apr 2018 20:27:12 -0400
+ -- Alexandre Viau <aviau@debian.org> Fri, 21 Oct 2022 16:10:47 -0000
golang-github-samuel-go-zookeeper (0.0~git20180130.c4fab1a-1) unstable; urgency=medium
diff --git a/zk/cluster_test.go b/zk/cluster_test.go
index dcceaa4..6460a97 100644
--- a/zk/cluster_test.go
+++ b/zk/cluster_test.go
@@ -17,17 +17,17 @@ func (lw logWriter) Write(b []byte) (int, error) {
}
func TestBasicCluster(t *testing.T) {
- ts, err := StartTestCluster(3, nil, logWriter{t: t, p: "[ZKERR] "})
+ ts, err := StartTestCluster(t, 3, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
defer ts.Stop()
- zk1, err := ts.Connect(0)
+ zk1, _, err := ts.Connect(0)
if err != nil {
t.Fatalf("Connect returned error: %+v", err)
}
defer zk1.Close()
- zk2, err := ts.Connect(1)
+ zk2, _, err := ts.Connect(1)
if err != nil {
t.Fatalf("Connect returned error: %+v", err)
}
@@ -38,6 +38,7 @@ func TestBasicCluster(t *testing.T) {
if _, err := zk1.Create("/gozk-test", []byte("foo-cluster"), 0, WorldACL(PermAll)); err != nil {
t.Fatalf("Create failed on node 1: %+v", err)
}
+
if by, _, err := zk2.Get("/gozk-test"); err != nil {
t.Fatalf("Get failed on node 2: %+v", err)
} else if string(by) != "foo-cluster" {
@@ -47,7 +48,7 @@ func TestBasicCluster(t *testing.T) {
// If the current leader dies, then the session is reestablished with the new one.
func TestClientClusterFailover(t *testing.T) {
- tc, err := StartTestCluster(3, nil, logWriter{t: t, p: "[ZKERR] "})
+ tc, err := StartTestCluster(t, 3, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
@@ -89,7 +90,7 @@ func TestClientClusterFailover(t *testing.T) {
// If a ZooKeeper cluster looses quorum then a session is reconnected as soon
// as the quorum is restored.
func TestNoQuorum(t *testing.T) {
- tc, err := StartTestCluster(3, nil, logWriter{t: t, p: "[ZKERR] "})
+ tc, err := StartTestCluster(t, 3, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
@@ -149,7 +150,7 @@ func TestNoQuorum(t *testing.T) {
DefaultLogger.Printf(" Retrying no luck...")
var firstDisconnect *Event
begin := time.Now()
- for time.Now().Sub(begin) < 6*time.Second {
+ for time.Since(begin) < 6*time.Second {
disconnectedEvent := sl.NewWatcher(sessionStateMatcher(StateDisconnected)).Wait(4 * time.Second)
if disconnectedEvent == nil {
t.Fatalf("Disconnected event expected")
@@ -185,12 +186,12 @@ func TestNoQuorum(t *testing.T) {
}
func TestWaitForClose(t *testing.T) {
- ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "})
+ ts, err := StartTestCluster(t, 1, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
defer ts.Stop()
- zk, err := ts.Connect(0)
+ zk, _, err := ts.Connect(0)
if err != nil {
t.Fatalf("Connect returned error: %+v", err)
}
@@ -221,7 +222,7 @@ CONNECTED:
}
func TestBadSession(t *testing.T) {
- ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "})
+ ts, err := StartTestCluster(t, 1, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
diff --git a/zk/conn.go b/zk/conn.go
index f79a51b..da9503a 100644
--- a/zk/conn.go
+++ b/zk/conn.go
@@ -409,13 +409,11 @@ func (c *Conn) resendZkAuth(reauthReadyChan chan struct{}) {
defer close(reauthReadyChan)
if c.logInfo {
- c.logger.Printf("Re-submitting `%d` credentials after reconnect",
- len(c.creds))
+ c.logger.Printf("re-submitting `%d` credentials after reconnect", len(c.creds))
}
for _, cred := range c.creds {
if shouldCancel() {
- c.logger.Printf("Cancel rer-submitting credentials")
return
}
resChan, err := c.sendRequest(
@@ -428,7 +426,7 @@ func (c *Conn) resendZkAuth(reauthReadyChan chan struct{}) {
nil)
if err != nil {
- c.logger.Printf("Call to sendRequest failed during credential resubmit: %s", err)
+ c.logger.Printf("call to sendRequest failed during credential resubmit: %s", err)
// FIXME(prozlach): lets ignore errors for now
continue
}
@@ -437,14 +435,14 @@ func (c *Conn) resendZkAuth(reauthReadyChan chan struct{}) {
select {
case res = <-resChan:
case <-c.closeChan:
- c.logger.Printf("Recv closed, cancel re-submitting credentials")
+ c.logger.Printf("recv closed, cancel re-submitting credentials")
return
case <-c.shouldQuit:
- c.logger.Printf("Should quit, cancel re-submitting credentials")
+ c.logger.Printf("should quit, cancel re-submitting credentials")
return
}
if res.err != nil {
- c.logger.Printf("Credential re-submit failed: %s", res.err)
+ c.logger.Printf("credential re-submit failed: %s", res.err)
// FIXME(prozlach): lets ignore errors for now
continue
}
@@ -486,14 +484,14 @@ func (c *Conn) loop() {
err := c.authenticate()
switch {
case err == ErrSessionExpired:
- c.logger.Printf("Authentication failed: %s", err)
+ c.logger.Printf("authentication failed: %s", err)
c.invalidateWatches(err)
case err != nil && c.conn != nil:
- c.logger.Printf("Authentication failed: %s", err)
+ c.logger.Printf("authentication failed: %s", err)
c.conn.Close()
case err == nil:
if c.logInfo {
- c.logger.Printf("Authenticated: id=%d, timeout=%d", c.SessionID(), c.sessionTimeoutMs)
+ c.logger.Printf("authenticated: id=%d, timeout=%d", c.SessionID(), c.sessionTimeoutMs)
}
c.hostProvider.Connected() // mark success
c.closeChan = make(chan struct{}) // channel to tell send loop stop
@@ -508,7 +506,7 @@ func (c *Conn) loop() {
}
err := c.sendLoop()
if err != nil || c.logInfo {
- c.logger.Printf("Send loop terminated: err=%v", err)
+ c.logger.Printf("send loop terminated: err=%v", err)
}
c.conn.Close() // causes recv loop to EOF/exit
wg.Done()
@@ -523,7 +521,7 @@ func (c *Conn) loop() {
err = c.recvLoop(c.conn)
}
if err != io.EOF || c.logInfo {
- c.logger.Printf("Recv loop terminated: err=%v", err)
+ c.logger.Printf("recv loop terminated: err=%v", err)
}
if err == nil {
panic("zk: recvLoop should never return nil error")
@@ -697,20 +695,28 @@ func (c *Conn) authenticate() error {
binary.BigEndian.PutUint32(buf[:4], uint32(n))
- c.conn.SetWriteDeadline(time.Now().Add(c.recvTimeout * 10))
+ if err := c.conn.SetWriteDeadline(time.Now().Add(c.recvTimeout * 10)); err != nil {
+ return err
+ }
_, err = c.conn.Write(buf[:n+4])
- c.conn.SetWriteDeadline(time.Time{})
if err != nil {
return err
}
+ if err := c.conn.SetWriteDeadline(time.Time{}); err != nil {
+ return err
+ }
// Receive and decode a connect response.
- c.conn.SetReadDeadline(time.Now().Add(c.recvTimeout * 10))
+ if err := c.conn.SetReadDeadline(time.Now().Add(c.recvTimeout * 10)); err != nil {
+ return err
+ }
_, err = io.ReadFull(c.conn, buf[:4])
- c.conn.SetReadDeadline(time.Time{})
if err != nil {
return err
}
+ if err := c.conn.SetReadDeadline(time.Time{}); err != nil {
+ return err
+ }
blen := int(binary.BigEndian.Uint32(buf[:4]))
if cap(buf) < blen {
@@ -772,14 +778,18 @@ func (c *Conn) sendData(req *request) error {
c.requests[req.xid] = req
c.requestsLock.Unlock()
- c.conn.SetWriteDeadline(time.Now().Add(c.recvTimeout))
+ if err := c.conn.SetWriteDeadline(time.Now().Add(c.recvTimeout)); err != nil {
+ return err
+ }
_, err = c.conn.Write(c.buf[:n+4])
- c.conn.SetWriteDeadline(time.Time{})
if err != nil {
req.recvChan <- response{-1, err}
c.conn.Close()
return err
}
+ if err := c.conn.SetWriteDeadline(time.Time{}); err != nil {
+ return err
+ }
return nil
}
@@ -802,13 +812,17 @@ func (c *Conn) sendLoop() error {
binary.BigEndian.PutUint32(c.buf[:4], uint32(n))
- c.conn.SetWriteDeadline(time.Now().Add(c.recvTimeout))
+ if err := c.conn.SetWriteDeadline(time.Now().Add(c.recvTimeout)); err != nil {
+ return err
+ }
_, err = c.conn.Write(c.buf[:n+4])
- c.conn.SetWriteDeadline(time.Time{})
if err != nil {
c.conn.Close()
return err
}
+ if err := c.conn.SetWriteDeadline(time.Time{}); err != nil {
+ return err
+ }
case <-c.closeChan:
return nil
}
@@ -823,10 +837,12 @@ func (c *Conn) recvLoop(conn net.Conn) error {
buf := make([]byte, sz)
for {
// package length
- conn.SetReadDeadline(time.Now().Add(c.recvTimeout))
+ if err := conn.SetReadDeadline(time.Now().Add(c.recvTimeout)); err != nil {
+ c.logger.Printf("failed to set connection deadline: %v", err)
+ }
_, err := io.ReadFull(conn, buf[:4])
if err != nil {
- return err
+ return fmt.Errorf("failed to read from connection: %v", err)
}
blen := int(binary.BigEndian.Uint32(buf[:4]))
@@ -838,10 +854,12 @@ func (c *Conn) recvLoop(conn net.Conn) error {
}
_, err = io.ReadFull(conn, buf[:blen])
- conn.SetReadDeadline(time.Time{})
if err != nil {
return err
}
+ if err := conn.SetReadDeadline(time.Time{}); err != nil {
+ return err
+ }
res := responseHeader{}
_, err = decodePacket(buf[:16], &res)
@@ -874,7 +892,7 @@ func (c *Conn) recvLoop(conn net.Conn) error {
c.watchersLock.Lock()
for _, t := range wTypes {
wpt := watchPathType{res.Path, t}
- if watchers := c.watchers[wpt]; watchers != nil && len(watchers) > 0 {
+ if watchers, ok := c.watchers[wpt]; ok {
for _, ch := range watchers {
ch <- ev
close(ch)
@@ -1220,6 +1238,38 @@ func (c *Conn) Multi(ops ...interface{}) ([]MultiResponse, error) {
return mr, err
}
+// IncrementalReconfig is the zookeeper reconfiguration api that allows adding and removing servers
+// by lists of members.
+// Return the new configuration stats.
+func (c *Conn) IncrementalReconfig(joining, leaving []string, version int64) (*Stat, error) {
+ // TODO: validate the shape of the member string to give early feedback.
+ request := &reconfigRequest{
+ JoiningServers: []byte(strings.Join(joining, ",")),
+ LeavingServers: []byte(strings.Join(leaving, ",")),
+ CurConfigId: version,
+ }
+
+ return c.internalReconfig(request)
+}
+
+// Reconfig is the non-incremental update functionality for Zookeeper where the list preovided
+// is the entire new member list.
+// the optional version allows for conditional reconfigurations, -1 ignores the condition.
+func (c *Conn) Reconfig(members []string, version int64) (*Stat, error) {
+ request := &reconfigRequest{
+ NewMembers: []byte(strings.Join(members, ",")),
+ CurConfigId: version,
+ }
+
+ return c.internalReconfig(request)
+}
+
+func (c *Conn) internalReconfig(request *reconfigRequest) (*Stat, error) {
+ response := &reconfigReponse{}
+ _, err := c.request(opReconfig, request, response, nil)
+ return &response.Stat, err
+}
+
// Server returns the current or last-connected server name.
func (c *Conn) Server() string {
c.serverMu.Lock()
diff --git a/zk/conn_test.go b/zk/conn_test.go
index 94206d9..ee67093 100644
--- a/zk/conn_test.go
+++ b/zk/conn_test.go
@@ -22,7 +22,7 @@ func TestRecurringReAuthHang(t *testing.T) {
}
}()
- zkC, err := StartTestCluster(2, ioutil.Discard, ioutil.Discard)
+ zkC, err := StartTestCluster(t, 2, ioutil.Discard, ioutil.Discard)
if err != nil {
panic(err)
}
diff --git a/zk/constants.go b/zk/constants.go
index 33b5563..ccafcfc 100644
--- a/zk/constants.go
+++ b/zk/constants.go
@@ -2,6 +2,7 @@ package zk
import (
"errors"
+ "fmt"
)
const (
@@ -25,6 +26,7 @@ const (
opGetChildren2 = 12
opCheck = 13
opMulti = 14
+ opReconfig = 16
opClose = -11
opSetAuth = 100
opSetWatches = 101
@@ -92,7 +94,7 @@ func (s State) String() string {
if name := stateNames[s]; name != "" {
return name
}
- return "Unknown"
+ return "unknown state"
}
type ErrCode int32
@@ -113,8 +115,10 @@ var (
ErrClosing = errors.New("zk: zookeeper is closing")
ErrNothing = errors.New("zk: no server responsees to process")
ErrSessionMoved = errors.New("zk: session moved to another server, so operation is ignored")
-
+ ErrReconfigDisabled = errors.New("attempts to perform a reconfiguration operation when reconfiguration feature is disabled")
+ ErrBadArguments = errors.New("invalid arguments")
// ErrInvalidCallback = errors.New("zk: invalid callback specified")
+
errCodeToError = map[ErrCode]error{
0: nil,
errAPIError: ErrAPIError,
@@ -126,11 +130,13 @@ var (
errNotEmpty: ErrNotEmpty,
errSessionExpired: ErrSessionExpired,
// errInvalidCallback: ErrInvalidCallback,
- errInvalidAcl: ErrInvalidACL,
- errAuthFailed: ErrAuthFailed,
- errClosing: ErrClosing,
- errNothing: ErrNothing,
- errSessionMoved: ErrSessionMoved,
+ errInvalidAcl: ErrInvalidACL,
+ errAuthFailed: ErrAuthFailed,
+ errClosing: ErrClosing,
+ errNothing: ErrNothing,
+ errSessionMoved: ErrSessionMoved,
+ errZReconfigDisabled: ErrReconfigDisabled,
+ errBadArguments: ErrBadArguments,
}
)
@@ -138,7 +144,7 @@ func (e ErrCode) toError() error {
if err, ok := errCodeToError[e]; ok {
return err
}
- return ErrUnknown
+ return fmt.Errorf("unknown error: %v", e)
}
const (
@@ -168,6 +174,8 @@ const (
errClosing ErrCode = -116
errNothing ErrCode = -117
errSessionMoved ErrCode = -118
+ // Attempts to perform a reconfiguration operation when reconfiguration feature is disabled
+ errZReconfigDisabled ErrCode = -123
)
// Constants for ACL permissions
@@ -197,6 +205,7 @@ var (
opGetChildren2: "getChildren2",
opCheck: "check",
opMulti: "multi",
+ opReconfig: "reconfig",
opClose: "close",
opSetAuth: "setAuth",
opSetWatches: "setWatches",
diff --git a/zk/dnshostprovider_test.go b/zk/dnshostprovider_test.go
index 77a6065..48000a5 100644
--- a/zk/dnshostprovider_test.go
+++ b/zk/dnshostprovider_test.go
@@ -16,7 +16,7 @@ func localhostLookupHost(host string) ([]string, error) {
// TestDNSHostProviderCreate is just like TestCreate, but with an
// overridden HostProvider that ignores the provided hostname.
func TestDNSHostProviderCreate(t *testing.T) {
- ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "})
+ ts, err := StartTestCluster(t, 1, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
@@ -97,7 +97,7 @@ var _ HostProvider = &localHostPortsFacade{}
// remaps addresses to localhost:$PORT combinations corresponding to
// the test ZooKeeper instances.
func TestDNSHostProviderReconnect(t *testing.T) {
- ts, err := StartTestCluster(3, nil, logWriter{t: t, p: "[ZKERR] "})
+ ts, err := StartTestCluster(t, 3, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
diff --git a/zk/flw.go b/zk/flw.go
index 3e97f96..1fb8b2a 100644
--- a/zk/flw.go
+++ b/zk/flw.go
@@ -255,12 +255,16 @@ func fourLetterWord(server, command string, timeout time.Duration) ([]byte, erro
// once the command has been processed, but better safe than sorry
defer conn.Close()
- conn.SetWriteDeadline(time.Now().Add(timeout))
+ if err := conn.SetWriteDeadline(time.Now().Add(timeout)); err != nil {
+ return nil, err
+ }
_, err = conn.Write([]byte(command))
if err != nil {
return nil, err
}
- conn.SetReadDeadline(time.Now().Add(timeout))
+ if err := conn.SetReadDeadline(time.Now().Add(timeout)); err != nil {
+ return nil, err
+ }
return ioutil.ReadAll(conn)
}
diff --git a/zk/lock_test.go b/zk/lock_test.go
index 8a3478a..77dce9b 100644
--- a/zk/lock_test.go
+++ b/zk/lock_test.go
@@ -6,7 +6,7 @@ import (
)
func TestLock(t *testing.T) {
- ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "})
+ ts, err := StartTestCluster(t, 1, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
@@ -64,11 +64,12 @@ func TestLock(t *testing.T) {
// This tests creating a lock with a path that's more than 1 node deep (e.g. "/test-multi-level/lock"),
// when a part of that path already exists (i.e. "/test-multi-level" node already exists).
func TestMultiLevelLock(t *testing.T) {
- ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "})
+ ts, err := StartTestCluster(t, 1, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
defer ts.Stop()
+
zk, _, err := ts.ConnectAll()
if err != nil {
t.Fatalf("Connect returned error: %+v", err)
diff --git a/zk/server_help.go b/zk/server_help_test.go
similarity index 65%
rename from zk/server_help.go
rename to zk/server_help_test.go
index 3663064..e595fb4 100644
--- a/zk/server_help.go
+++ b/zk/server_help_test.go
@@ -8,100 +8,113 @@ import (
"os"
"path/filepath"
"strings"
+ "testing"
"time"
)
+const (
+ _testConfigName = "zoo.cfg"
+ _testMyIDFileName = "myid"
+)
+
func init() {
rand.Seed(time.Now().UnixNano())
}
type TestServer struct {
- Port int
- Path string
- Srv *Server
+ Port int
+ Path string
+ Srv *server
+ Config ServerConfigServer
}
type TestCluster struct {
Path string
+ Config ServerConfig
Servers []TestServer
}
-func StartTestCluster(size int, stdout, stderr io.Writer) (*TestCluster, error) {
- tmpPath, err := ioutil.TempDir("", "gozk")
- if err != nil {
- return nil, err
+func StartTestCluster(t *testing.T, size int, stdout, stderr io.Writer) (*TestCluster, error) {
+ if testing.Short() {
+ t.Skip("ZK cluster tests skipped in short case.")
}
+ tmpPath, err := ioutil.TempDir("", "gozk")
+ requireNoError(t, err, "failed to create tmp dir for test server setup")
+
success := false
startPort := int(rand.Int31n(6000) + 10000)
cluster := &TestCluster{Path: tmpPath}
+
defer func() {
if !success {
cluster.Stop()
}
}()
+
for serverN := 0; serverN < size; serverN++ {
- srvPath := filepath.Join(tmpPath, fmt.Sprintf("srv%d", serverN))
+ srvPath := filepath.Join(tmpPath, fmt.Sprintf("srv%d", serverN+1))
if err := os.Mkdir(srvPath, 0700); err != nil {
- return nil, err
+ requireNoError(t, err, "failed to make server path")
}
+
port := startPort + serverN*3
cfg := ServerConfig{
ClientPort: port,
DataDir: srvPath,
}
+
for i := 0; i < size; i++ {
- cfg.Servers = append(cfg.Servers, ServerConfigServer{
+ serverNConfig := ServerConfigServer{
ID: i + 1,
Host: "127.0.0.1",
PeerPort: startPort + i*3 + 1,
LeaderElectionPort: startPort + i*3 + 2,
- })
+ }
+
+ cfg.Servers = append(cfg.Servers, serverNConfig)
}
- cfgPath := filepath.Join(srvPath, "zoo.cfg")
+
+ cfgPath := filepath.Join(srvPath, _testConfigName)
fi, err := os.Create(cfgPath)
- if err != nil {
- return nil, err
- }
- err = cfg.Marshall(fi)
+ requireNoError(t, err)
+
+ requireNoError(t, cfg.Marshall(fi))
fi.Close()
- if err != nil {
- return nil, err
- }
- fi, err = os.Create(filepath.Join(srvPath, "myid"))
- if err != nil {
- return nil, err
- }
+ fi, err = os.Create(filepath.Join(srvPath, _testMyIDFileName))
+ requireNoError(t, err)
+
_, err = fmt.Fprintf(fi, "%d\n", serverN+1)
fi.Close()
- if err != nil {
- return nil, err
- }
+ requireNoError(t, err)
+
+ srv, err := NewIntegrationTestServer(t, cfgPath, stdout, stderr)
+ requireNoError(t, err)
- srv := &Server{
- ConfigPath: cfgPath,
- Stdout: stdout,
- Stderr: stderr,
- }
if err := srv.Start(); err != nil {
return nil, err
}
+
cluster.Servers = append(cluster.Servers, TestServer{
- Path: srvPath,
- Port: cfg.ClientPort,
- Srv: srv,
+ Path: srvPath,
+ Port: cfg.ClientPort,
+ Srv: srv,
+ Config: cfg.Servers[serverN],
})
+ cluster.Config = cfg
}
- if err := cluster.waitForStart(10, time.Second); err != nil {
+
+ if err := cluster.waitForStart(30, time.Second); err != nil {
return nil, err
}
+
success = true
+
return cluster, nil
}
-func (tc *TestCluster) Connect(idx int) (*Conn, error) {
- zk, _, err := Connect([]string{fmt.Sprintf("127.0.0.1:%d", tc.Servers[idx].Port)}, time.Second*15)
- return zk, err
+func (tc *TestCluster) Connect(idx int) (*Conn, <-chan Event, error) {
+ return Connect([]string{fmt.Sprintf("127.0.0.1:%d", tc.Servers[idx].Port)}, time.Second*15)
}
func (tc *TestCluster) ConnectAll() (*Conn, <-chan Event, error) {
@@ -144,6 +157,7 @@ func (tc *TestCluster) waitForStart(maxRetry int, interval time.Duration) error
}
time.Sleep(interval)
}
+
return fmt.Errorf("unable to verify health of servers")
}
@@ -180,7 +194,7 @@ func (tc *TestCluster) StartServer(server string) {
return
}
}
- panic(fmt.Sprintf("Unknown server: %s", server))
+ panic(fmt.Sprintf("unknown server: %s", server))
}
func (tc *TestCluster) StopServer(server string) {
@@ -190,27 +204,44 @@ func (tc *TestCluster) StopServer(server string) {
return
}
}
- panic(fmt.Sprintf("Unknown server: %s", server))
+ panic(fmt.Sprintf("unknown server: %s", server))
}
func (tc *TestCluster) StartAllServers() error {
for _, s := range tc.Servers {
if err := s.Srv.Start(); err != nil {
- return fmt.Errorf(
- "Failed to start server listening on port `%d` : %+v", s.Port, err)
+ return fmt.Errorf("failed to start server listening on port `%d` : %+v", s.Port, err)
}
}
+ if err := tc.waitForStart(10, time.Second*2); err != nil {
+ return fmt.Errorf("failed to wait to startup zk servers: %v", err)
+ }
+
return nil
}
func (tc *TestCluster) StopAllServers() error {
+ var err error
for _, s := range tc.Servers {
if err := s.Srv.Stop(); err != nil {
- return fmt.Errorf(
- "Failed to stop server listening on port `%d` : %+v", s.Port, err)
+ err = fmt.Errorf("failed to stop server listening on port `%d` : %v", s.Port, err)
}
}
+ if err != nil {
+ return err
+ }
+
+ if err := tc.waitForStop(5, time.Second); err != nil {
+ return fmt.Errorf("failed to wait to startup zk servers: %v", err)
+ }
return nil
}
+
+func requireNoError(t *testing.T, err error, msgAndArgs ...interface{}) {
+ if err != nil {
+ t.Logf("received unexpected error: %v", err)
+ t.Fatal(msgAndArgs...)
+ }
+}
diff --git a/zk/server_java.go b/zk/server_java.go
deleted file mode 100644
index e553ec1..0000000
--- a/zk/server_java.go
+++ /dev/null
@@ -1,136 +0,0 @@
-package zk
-
-import (
- "fmt"
- "io"
- "os"
- "os/exec"
- "path/filepath"
-)
-
-type ErrMissingServerConfigField string
-
-func (e ErrMissingServerConfigField) Error() string {
- return fmt.Sprintf("zk: missing server config field '%s'", string(e))
-}
-
-const (
- DefaultServerTickTime = 2000
- DefaultServerInitLimit = 10
- DefaultServerSyncLimit = 5
- DefaultServerAutoPurgeSnapRetainCount = 3
- DefaultPeerPort = 2888
- DefaultLeaderElectionPort = 3888
-)
-
-type ServerConfigServer struct {
- ID int
- Host string
- PeerPort int
- LeaderElectionPort int
-}
-
-type ServerConfig struct {
- TickTime int // Number of milliseconds of each tick
- InitLimit int // Number of ticks that the initial synchronization phase can take
- SyncLimit int // Number of ticks that can pass between sending a request and getting an acknowledgement
- DataDir string // Direcrory where the snapshot is stored
- ClientPort int // Port at which clients will connect
- AutoPurgeSnapRetainCount int // Number of snapshots to retain in dataDir
- AutoPurgePurgeInterval int // Purge task internal in hours (0 to disable auto purge)
- Servers []ServerConfigServer
-}
-
-func (sc ServerConfig) Marshall(w io.Writer) error {
- if sc.DataDir == "" {
- return ErrMissingServerConfigField("dataDir")
- }
- fmt.Fprintf(w, "dataDir=%s\n", sc.DataDir)
- if sc.TickTime <= 0 {
- sc.TickTime = DefaultServerTickTime
- }
- fmt.Fprintf(w, "tickTime=%d\n", sc.TickTime)
- if sc.InitLimit <= 0 {
- sc.InitLimit = DefaultServerInitLimit
- }
- fmt.Fprintf(w, "initLimit=%d\n", sc.InitLimit)
- if sc.SyncLimit <= 0 {
- sc.SyncLimit = DefaultServerSyncLimit
- }
- fmt.Fprintf(w, "syncLimit=%d\n", sc.SyncLimit)
- if sc.ClientPort <= 0 {
- sc.ClientPort = DefaultPort
- }
- fmt.Fprintf(w, "clientPort=%d\n", sc.ClientPort)
- if sc.AutoPurgePurgeInterval > 0 {
- if sc.AutoPurgeSnapRetainCount <= 0 {
- sc.AutoPurgeSnapRetainCount = DefaultServerAutoPurgeSnapRetainCount
- }
- fmt.Fprintf(w, "autopurge.snapRetainCount=%d\n", sc.AutoPurgeSnapRetainCount)
- fmt.Fprintf(w, "autopurge.purgeInterval=%d\n", sc.AutoPurgePurgeInterval)
- }
- if len(sc.Servers) > 0 {
- for _, srv := range sc.Servers {
- if srv.PeerPort <= 0 {
- srv.PeerPort = DefaultPeerPort
- }
- if srv.LeaderElectionPort <= 0 {
- srv.LeaderElectionPort = DefaultLeaderElectionPort
- }
- fmt.Fprintf(w, "server.%d=%s:%d:%d\n", srv.ID, srv.Host, srv.PeerPort, srv.LeaderElectionPort)
- }
- }
- return nil
-}
-
-var jarSearchPaths = []string{
- "zookeeper-*/contrib/fatjar/zookeeper-*-fatjar.jar",
- "../zookeeper-*/contrib/fatjar/zookeeper-*-fatjar.jar",
- "/usr/share/java/zookeeper-*.jar",
- "/usr/local/zookeeper-*/contrib/fatjar/zookeeper-*-fatjar.jar",
- "/usr/local/Cellar/zookeeper/*/libexec/contrib/fatjar/zookeeper-*-fatjar.jar",
-}
-
-func findZookeeperFatJar() string {
- var paths []string
- zkPath := os.Getenv("ZOOKEEPER_PATH")
- if zkPath == "" {
- paths = jarSearchPaths
- } else {
- paths = []string{filepath.Join(zkPath, "contrib/fatjar/zookeeper-*-fatjar.jar")}
- }
- for _, path := range paths {
- matches, _ := filepath.Glob(path)
- // TODO: could sort by version and pick latest
- if len(matches) > 0 {
- return matches[0]
- }
- }
- return ""
-}
-
-type Server struct {
- JarPath string
- ConfigPath string
- Stdout, Stderr io.Writer
-
- cmd *exec.Cmd
-}
-
-func (srv *Server) Start() error {
- if srv.JarPath == "" {
- srv.JarPath = findZookeeperFatJar()
- if srv.JarPath == "" {
- return fmt.Errorf("zk: unable to find server jar")
- }
- }
- srv.cmd = exec.Command("java", "-jar", srv.JarPath, "server", srv.ConfigPath)
- srv.cmd.Stdout = srv.Stdout
- srv.cmd.Stderr = srv.Stderr
- return srv.cmd.Start()
-}
-
-func (srv *Server) Stop() error {
- srv.cmd.Process.Signal(os.Kill)
- return srv.cmd.Wait()
-}
diff --git a/zk/server_java_test.go b/zk/server_java_test.go
new file mode 100644
index 0000000..2a6c400
--- /dev/null
+++ b/zk/server_java_test.go
@@ -0,0 +1,169 @@
+package zk
+
+import (
+ "context"
+ "fmt"
+ "io"
+ "os"
+ "os/exec"
+ "path/filepath"
+ "testing"
+)
+
+type ErrMissingServerConfigField string
+
+func (e ErrMissingServerConfigField) Error() string {
+ return fmt.Sprintf("zk: missing server config field '%s'", string(e))
+}
+
+const (
+ DefaultServerTickTime = 500
+ DefaultServerInitLimit = 10
+ DefaultServerSyncLimit = 5
+ DefaultServerAutoPurgeSnapRetainCount = 3
+ DefaultPeerPort = 2888
+ DefaultLeaderElectionPort = 3888
+)
+
+type server struct {
+ stdout, stderr io.Writer
+ cmdString string
+ cmdArgs []string
+ cmdEnv []string
+ cmd *exec.Cmd
+ // this cancel will kill the command being run in this case the server itself.
+ cancelFunc context.CancelFunc
+}
+
+func NewIntegrationTestServer(t *testing.T, configPath string, stdout, stderr io.Writer) (*server, error) {
+ // allow external systems to configure this zk server bin path.
+ zkPath := os.Getenv("ZOOKEEPER_BIN_PATH")
+ if zkPath == "" {
+ // default to a static reletive path that can be setup with a build system
+ zkPath = "../zookeeper/bin"
+ }
+ if _, err := os.Stat(zkPath); err != nil {
+ if os.IsNotExist(err) {
+ return nil, fmt.Errorf("zk: could not find testing zookeeper bin path at %q: %v ", zkPath, err)
+ }
+ }
+ // password is 'test'
+ superString := `SERVER_JVMFLAGS=-Dzookeeper.DigestAuthenticationProvider.superDigest=super:D/InIHSb7yEEbrWz8b9l71RjZJU=`
+
+ return &server{
+ cmdString: filepath.Join(zkPath, "zkServer.sh"),
+ cmdArgs: []string{"start-foreground", configPath},
+ cmdEnv: []string{superString},
+ stdout: stdout, stderr: stderr,
+ }, nil
+}
+
+func (srv *server) Start() error {
+ ctx, cancel := context.WithCancel(context.Background())
+ srv.cancelFunc = cancel
+
+ srv.cmd = exec.CommandContext(ctx, srv.cmdString, srv.cmdArgs...)
+ srv.cmd.Stdout = srv.stdout
+ srv.cmd.Stderr = srv.stderr
+
+ srv.cmd.Env = srv.cmdEnv
+ return srv.cmd.Start()
+}
+
+func (srv *server) Stop() error {
+ srv.cancelFunc()
+ return srv.cmd.Wait()
+}
+
+type ServerConfigServer struct {
+ ID int
+ Host string
+ PeerPort int
+ LeaderElectionPort int
+}
+
+type ServerConfig struct {
+ TickTime int // Number of milliseconds of each tick
+ InitLimit int // Number of ticks that the initial synchronization phase can take
+ SyncLimit int // Number of ticks that can pass between sending a request and getting an acknowledgement
+ DataDir string // Direcrory where the snapshot is stored
+ ClientPort int // Port at which clients will connect
+ AutoPurgeSnapRetainCount int // Number of snapshots to retain in dataDir
+ AutoPurgePurgeInterval int // Purge task internal in hours (0 to disable auto purge)
+ Servers []ServerConfigServer
+}
+
+func (sc ServerConfig) Marshall(w io.Writer) error {
+ // the admin server is not wanted in test cases as it slows the startup process and is
+ // of little unit test value.
+ fmt.Fprintln(w, "admin.enableServer=false")
+ if sc.DataDir == "" {
+ return ErrMissingServerConfigField("dataDir")
+ }
+ fmt.Fprintf(w, "dataDir=%s\n", sc.DataDir)
+ if sc.TickTime <= 0 {
+ sc.TickTime = DefaultServerTickTime
+ }
+ fmt.Fprintf(w, "tickTime=%d\n", sc.TickTime)
+ if sc.InitLimit <= 0 {
+ sc.InitLimit = DefaultServerInitLimit
+ }
+ fmt.Fprintf(w, "initLimit=%d\n", sc.InitLimit)
+ if sc.SyncLimit <= 0 {
+ sc.SyncLimit = DefaultServerSyncLimit
+ }
+ fmt.Fprintf(w, "syncLimit=%d\n", sc.SyncLimit)
+ if sc.ClientPort <= 0 {
+ sc.ClientPort = DefaultPort
+ }
+ fmt.Fprintf(w, "clientPort=%d\n", sc.ClientPort)
+ if sc.AutoPurgePurgeInterval > 0 {
+ if sc.AutoPurgeSnapRetainCount <= 0 {
+ sc.AutoPurgeSnapRetainCount = DefaultServerAutoPurgeSnapRetainCount
+ }
+ fmt.Fprintf(w, "autopurge.snapRetainCount=%d\n", sc.AutoPurgeSnapRetainCount)
+ fmt.Fprintf(w, "autopurge.purgeInterval=%d\n", sc.AutoPurgePurgeInterval)
+ }
+ // enable reconfig.
+ // TODO: allow setting this
+ fmt.Fprintln(w, "reconfigEnabled=true")
+ fmt.Fprintln(w, "4lw.commands.whitelist=*")
+
+ if len(sc.Servers) < 2 {
+ // if we dont have more than 2 servers we just dont specify server list to start in standalone mode
+ // see https://zookeeper.apache.org/doc/current/zookeeperStarted.html#sc_InstallingSingleMode for more details.
+ return nil
+ }
+ // if we then have more than one server force it to be distributed
+ fmt.Fprintln(w, "standaloneEnabled=false")
+
+ for _, srv := range sc.Servers {
+ if srv.PeerPort <= 0 {
+ srv.PeerPort = DefaultPeerPort
+ }
+ if srv.LeaderElectionPort <= 0 {
+ srv.LeaderElectionPort = DefaultLeaderElectionPort
+ }
+ fmt.Fprintf(w, "server.%d=%s:%d:%d\n", srv.ID, srv.Host, srv.PeerPort, srv.LeaderElectionPort)
+ }
+ return nil
+}
+
+// this is a helper to wait for the zk connection to at least get to the HasSession state
+func waitForSession(ctx context.Context, eventChan <-chan Event) error {
+ select {
+ case event, ok := <-eventChan:
+ // The eventChan is used solely to determine when the ZK conn has
+ // stopped.
+ if !ok {
+ return fmt.Errorf("connection closed before state reached")
+ }
+ if event.State == StateHasSession {
+ return nil
+ }
+ case <-ctx.Done():
+ return ctx.Err()
+ }
+
+ return nil
+}
diff --git a/zk/structs.go b/zk/structs.go
index d4af27d..9400c3c 100644
--- a/zk/structs.go
+++ b/zk/structs.go
@@ -6,6 +6,7 @@ import (
"log"
"reflect"
"runtime"
+ "strings"
"time"
)
@@ -277,6 +278,18 @@ type multiResponse struct {
DoneHeader multiHeader
}
+// zk version 3.5 reconfig API
+type reconfigRequest struct {
+ JoiningServers []byte
+ LeavingServers []byte
+ NewMembers []byte
+ // curConfigId version of the current configuration
+ // optional - causes reconfiguration to return an error if configuration is no longer current
+ CurConfigId int64
+}
+
+type reconfigReponse getDataResponse
+
func (r *multiRequest) Encode(buf []byte) (int, error) {
total := 0
for _, op := range r.Ops {
@@ -392,7 +405,7 @@ type encoder interface {
func decodePacket(buf []byte, st interface{}) (n int, err error) {
defer func() {
if r := recover(); r != nil {
- if e, ok := r.(runtime.Error); ok && e.Error() == "runtime error: slice bounds out of range" {
+ if e, ok := r.(runtime.Error); ok && strings.HasPrefix(e.Error(), "runtime error: slice bounds out of range") {
err = ErrShortBuffer
} else {
panic(r)
@@ -483,7 +496,7 @@ func decodePacketValue(buf []byte, v reflect.Value) (int, error) {
func encodePacket(buf []byte, st interface{}) (n int, err error) {
defer func() {
if r := recover(); r != nil {
- if e, ok := r.(runtime.Error); ok && e.Error() == "runtime error: slice bounds out of range" {
+ if e, ok := r.(runtime.Error); ok && strings.HasPrefix(e.Error(), "runtime error: slice bounds out of range") {
err = ErrShortBuffer
} else {
panic(r)
@@ -604,6 +617,8 @@ func requestStructForOp(op int32) interface{} {
return &CheckVersionRequest{}
case opMulti:
return &multiRequest{}
+ case opReconfig:
+ return &reconfigRequest{}
}
return nil
}
diff --git a/zk/structs_test.go b/zk/structs_test.go
index a3f2797..3a38ab4 100644
--- a/zk/structs_test.go
+++ b/zk/structs_test.go
@@ -15,6 +15,7 @@ func TestEncodeDecodePacket(t *testing.T) {
encodeDecodeTest(t, &pathWatchRequest{"path", true})
encodeDecodeTest(t, &pathWatchRequest{"path", false})
encodeDecodeTest(t, &CheckVersionRequest{"/", -1})
+ encodeDecodeTest(t, &reconfigRequest{nil, nil, nil, -1})
encodeDecodeTest(t, &multiRequest{Ops: []multiRequestOp{{multiHeader{opCheck, false, -1}, &CheckVersionRequest{"/", -1}}}})
}
diff --git a/zk/throttle_test.go b/zk/throttle_test.go
index 633ce05..241f1d0 100644
--- a/zk/throttle_test.go
+++ b/zk/throttle_test.go
@@ -85,7 +85,7 @@ func (c *conn) start() {
func (c *conn) writeLoop() {
for req := range c.wchan {
- time.Sleep(req.writeAt.Sub(time.Now()))
+ time.Sleep(time.Until(req.writeAt))
var res nErr
for len(req.p) > 0 && res.err == nil {
writep := req.p
diff --git a/zk/zk_test.go b/zk/zk_test.go
index c81ef9f..bfed1e7 100644
--- a/zk/zk_test.go
+++ b/zk/zk_test.go
@@ -1,11 +1,15 @@
package zk
import (
- "crypto/rand"
+ "context"
"encoding/hex"
"fmt"
"io"
+ "io/ioutil"
+ "math/rand"
"net"
+ "os"
+ "path/filepath"
"reflect"
"regexp"
"sort"
@@ -17,7 +21,7 @@ import (
)
func TestStateChanges(t *testing.T) {
- ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "})
+ ts, err := StartTestCluster(t, 1, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
@@ -63,7 +67,7 @@ func TestStateChanges(t *testing.T) {
}
func TestCreate(t *testing.T) {
- ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "})
+ ts, err := StartTestCluster(t, 1, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
@@ -93,8 +97,155 @@ func TestCreate(t *testing.T) {
}
}
+func TestIncrementalReconfig(t *testing.T) {
+ if val, ok := os.LookupEnv("zk_version"); ok {
+ if !strings.HasPrefix(val, "3.5") {
+ t.Skip("running with zookeeper that does not support this api")
+ }
+ } else {
+ t.Skip("did not detect zk_version from env. skipping reconfig test")
+ }
+ ts, err := StartTestCluster(t, 3, nil, logWriter{t: t, p: "[ZKERR] "})
+ requireNoError(t, err, "failed to setup test cluster")
+ defer ts.Stop()
+
+ // start and add a new server.
+ tmpPath, err := ioutil.TempDir("", "gozk")
+ requireNoError(t, err, "failed to create tmp dir for test server setup")
+ defer os.RemoveAll(tmpPath)
+
+ startPort := int(rand.Int31n(6000) + 10000)
+
+ srvPath := filepath.Join(tmpPath, fmt.Sprintf("srv4"))
+ if err := os.Mkdir(srvPath, 0700); err != nil {
+ requireNoError(t, err, "failed to make server path")
+ }
+ testSrvConfig := ServerConfigServer{
+ ID: 4,
+ Host: "127.0.0.1",
+ PeerPort: startPort + 1,
+ LeaderElectionPort: startPort + 2,
+ }
+ cfg := ServerConfig{
+ ClientPort: startPort,
+ DataDir: srvPath,
+ Servers: []ServerConfigServer{testSrvConfig},
+ }
+
+ // TODO: clean all this server creating up to a better helper method
+ cfgPath := filepath.Join(srvPath, _testConfigName)
+ fi, err := os.Create(cfgPath)
+ requireNoError(t, err)
+
+ requireNoError(t, cfg.Marshall(fi))
+ fi.Close()
+
+ fi, err = os.Create(filepath.Join(srvPath, _testMyIDFileName))
+ requireNoError(t, err)
+
+ _, err = fmt.Fprintln(fi, "4")
+ fi.Close()
+ requireNoError(t, err)
+
+ testServer, err := NewIntegrationTestServer(t, cfgPath, nil, nil)
+ requireNoError(t, err)
+ requireNoError(t, testServer.Start())
+ defer testServer.Stop()
+
+ zk, events, err := ts.ConnectAll()
+ requireNoError(t, err, "failed to connect to cluster")
+ defer zk.Close()
+
+ err = zk.AddAuth("digest", []byte("super:test"))
+ requireNoError(t, err, "failed to auth to cluster")
+
+ waitCtx, cancel := context.WithTimeout(context.Background(), time.Second)
+ defer cancel()
+
+ err = waitForSession(waitCtx, events)
+ requireNoError(t, err, "failed to wail for session")
+
+ _, _, err = zk.Get("/zookeeper/config")
+ if err != nil {
+ t.Fatalf("get config returned error: %+v", err)
+ }
+
+ // initially should be 1<<32, which is 0x100000000. This is the zxid
+ // of the first NEWLEADER message, used as the inital version
+ // reflect.DeepEqual(bytes.Split(data, []byte("\n")), []byte("version=100000000"))
+
+ // remove node 3.
+ _, err = zk.IncrementalReconfig(nil, []string{"3"}, -1)
+ if err != nil && err == ErrConnectionClosed {
+ t.Log("conneciton closed is fine since the cluster re-elects and we dont reconnect")
+ } else {
+ requireNoError(t, err, "failed to remove node from cluster")
+ }
+
+ // add node a new 4th node
+ server := fmt.Sprintf("server.%d=%s:%d:%d;%d", testSrvConfig.ID, testSrvConfig.Host, testSrvConfig.PeerPort, testSrvConfig.LeaderElectionPort, cfg.ClientPort)
+ _, err = zk.IncrementalReconfig([]string{server}, nil, -1)
+ if err != nil && err == ErrConnectionClosed {
+ t.Log("conneciton closed is fine since the cluster re-elects and we dont reconnect")
+ } else {
+ requireNoError(t, err, "failed to add new server to cluster")
+ }
+}
+
+func TestReconfig(t *testing.T) {
+ if val, ok := os.LookupEnv("zk_version"); ok {
+ if !strings.HasPrefix(val, "3.5") {
+ t.Skip("running with zookeeper that does not support this api")
+ }
+ } else {
+ t.Skip("did not detect zk_version from env. skipping reconfig test")
+ }
+
+ // This test enures we can do an non-incremental reconfig
+ ts, err := StartTestCluster(t, 3, nil, logWriter{t: t, p: "[ZKERR] "})
+ requireNoError(t, err, "failed to setup test cluster")
+ defer ts.Stop()
+
+ zk, events, err := ts.ConnectAll()
+ requireNoError(t, err, "failed to connect to cluster")
+ defer zk.Close()
+
+ err = zk.AddAuth("digest", []byte("super:test"))
+ requireNoError(t, err, "failed to auth to cluster")
+
+ waitCtx, cancel := context.WithTimeout(context.Background(), time.Second)
+ defer cancel()
+
+ err = waitForSession(waitCtx, events)
+ requireNoError(t, err, "failed to wail for session")
+
+ _, _, err = zk.Get("/zookeeper/config")
+ if err != nil {
+ t.Fatalf("get config returned error: %+v", err)
+ }
+
+ // essentially remove the first node
+ var s []string
+ for _, host := range ts.Config.Servers[1:] {
+ s = append(s, fmt.Sprintf("server.%d=%s:%d:%d;%d\n", host.ID, host.Host, host.PeerPort, host.LeaderElectionPort, ts.Config.ClientPort))
+ }
+
+ _, err = zk.Reconfig(s, -1)
+ requireNoError(t, err, "failed to reconfig cluster")
+
+ // reconfig to all the hosts again
+ s = []string{}
+ for _, host := range ts.Config.Servers {
+ s = append(s, fmt.Sprintf("server.%d=%s:%d:%d;%d\n", host.ID, host.Host, host.PeerPort, host.LeaderElectionPort, ts.Config.ClientPort))
+ }
+
+ _, err = zk.Reconfig(s, -1)
+ requireNoError(t, err, "failed to reconfig cluster")
+
+}
+
func TestMulti(t *testing.T) {
- ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "})
+ ts, err := StartTestCluster(t, 1, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
@@ -135,7 +286,7 @@ func TestIfAuthdataSurvivesReconnect(t *testing.T) {
// reconnect.
testNode := "/auth-testnode"
- ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "})
+ ts, err := StartTestCluster(t, 1, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
@@ -187,7 +338,7 @@ func TestMultiFailures(t *testing.T) {
const firstPath = "/gozk-test-first"
const secondPath = "/gozk-test-second"
- ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "})
+ ts, err := StartTestCluster(t, 1, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
@@ -230,7 +381,7 @@ func TestMultiFailures(t *testing.T) {
}
func TestGetSetACL(t *testing.T) {
- ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "})
+ ts, err := StartTestCluster(t, 1, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
@@ -284,7 +435,7 @@ func TestGetSetACL(t *testing.T) {
}
func TestAuth(t *testing.T) {
- ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "})
+ ts, err := StartTestCluster(t, 1, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
@@ -334,7 +485,7 @@ func TestAuth(t *testing.T) {
}
func TestChildren(t *testing.T) {
- ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "})
+ ts, err := StartTestCluster(t, 1, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
@@ -387,7 +538,7 @@ func TestChildren(t *testing.T) {
}
func TestChildWatch(t *testing.T) {
- ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "})
+ ts, err := StartTestCluster(t, 1, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
@@ -425,7 +576,7 @@ func TestChildWatch(t *testing.T) {
if ev.Path != "/" {
t.Fatalf("Child watcher wrong path %s instead of %s", ev.Path, "/")
}
- case _ = <-time.After(time.Second * 2):
+ case <-time.After(time.Second * 2):
t.Fatal("Child watcher timed out")
}
@@ -452,13 +603,13 @@ func TestChildWatch(t *testing.T) {
if ev.Path != "/gozk-test" {
t.Fatalf("Child watcher wrong path %s instead of %s", ev.Path, "/")
}
- case _ = <-time.After(time.Second * 2):
+ case <-time.After(time.Second * 2):
t.Fatal("Child watcher timed out")
}
}
func TestSetWatchers(t *testing.T) {
- ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "})
+ ts, err := StartTestCluster(t, 1, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
@@ -601,7 +752,7 @@ func TestSetWatchers(t *testing.T) {
}
func TestExpiringWatch(t *testing.T) {
- ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "})
+ ts, err := StartTestCluster(t, 1, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
@@ -667,7 +818,7 @@ func TestRequestFail(t *testing.T) {
}
func TestSlowServer(t *testing.T) {
- ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "})
+ ts, err := StartTestCluster(t, 1, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
@@ -725,66 +876,8 @@ func TestSlowServer(t *testing.T) {
}
}
-func startSlowProxy(t *testing.T, up, down Rate, upstream string, adj func(ln *Listener)) (string, chan bool, error) {
- ln, err := net.Listen("tcp", "127.0.0.1:0")
- if err != nil {
- return "", nil, err
- }
- tln := &Listener{
- Listener: ln,
- Up: up,
- Down: down,
- }
- stopCh := make(chan bool)
- go func() {
- <-stopCh
- tln.Close()
- }()
- go func() {
- for {
- cn, err := tln.Accept()
- if err != nil {
- if !strings.Contains(err.Error(), "use of closed network connection") {
- t.Fatalf("Accept failed: %s", err.Error())
- }
- return
- }
- if adj != nil {
- adj(tln)
- }
- go func(cn net.Conn) {
- defer cn.Close()
- upcn, err := net.Dial("tcp", upstream)
- if err != nil {
- t.Log(err)
- return
- }
- // This will leave hanging goroutines util stopCh is closed
- // but it doesn't matter in the context of running tests.
- go func() {
- <-stopCh
- upcn.Close()
- }()
- go func() {
- if _, err := io.Copy(upcn, cn); err != nil {
- if !strings.Contains(err.Error(), "use of closed network connection") {
- // log.Printf("Upstream write failed: %s", err.Error())
- }
- }
- }()
- if _, err := io.Copy(cn, upcn); err != nil {
- if !strings.Contains(err.Error(), "use of closed network connection") {
- // log.Printf("Upstream read failed: %s", err.Error())
- }
- }
- }(cn)
- }
- }()
- return ln.Addr().String(), stopCh, nil
-}
-
func TestMaxBufferSize(t *testing.T) {
- ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "})
+ ts, err := StartTestCluster(t, 1, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
@@ -889,6 +982,64 @@ func TestMaxBufferSize(t *testing.T) {
}
}
+func startSlowProxy(t *testing.T, up, down Rate, upstream string, adj func(ln *Listener)) (string, chan bool, error) {
+ ln, err := net.Listen("tcp", "127.0.0.1:0")
+ if err != nil {
+ return "", nil, err
+ }
+ tln := &Listener{
+ Listener: ln,
+ Up: up,
+ Down: down,
+ }
+ stopCh := make(chan bool)
+ go func() {
+ <-stopCh
+ tln.Close()
+ }()
+ go func() {
+ for {
+ cn, err := tln.Accept()
+ if err != nil {
+ if !strings.Contains(err.Error(), "use of closed network connection") {
+ t.Fatalf("Accept failed: %s", err.Error())
+ }
+ return
+ }
+ if adj != nil {
+ adj(tln)
+ }
+ go func(cn net.Conn) {
+ defer cn.Close()
+ upcn, err := net.Dial("tcp", upstream)
+ if err != nil {
+ t.Log(err)
+ return
+ }
+ // This will leave hanging goroutines util stopCh is closed
+ // but it doesn't matter in the context of running tests.
+ go func() {
+ <-stopCh
+ upcn.Close()
+ }()
+ go func() {
+ if _, err := io.Copy(upcn, cn); err != nil {
+ if !strings.Contains(err.Error(), "use of closed network connection") {
+ // log.Printf("Upstream write failed: %s", err.Error())
+ }
+ }
+ }()
+ if _, err := io.Copy(cn, upcn); err != nil {
+ if !strings.Contains(err.Error(), "use of closed network connection") {
+ // log.Printf("Upstream read failed: %s", err.Error())
+ }
+ }
+ }(cn)
+ }
+ }()
+ return ln.Addr().String(), stopCh, nil
+}
+
func expectErr(t *testing.T, err error, expected error) {
if err == nil {
t.Fatalf("Get for node that is too large should have returned error!")
Debdiff
[The following lists of changes regard files as different if they have different names, permissions or owners.]
Files in second set of .debs but not in first
-rw-r--r-- root/root /usr/share/gocode/src/github.com/samuel/go-zookeeper/zk/server_help_test.go -rw-r--r-- root/root /usr/share/gocode/src/github.com/samuel/go-zookeeper/zk/server_java_test.go
Files in first set of .debs but not in second
-rw-r--r-- root/root /usr/share/gocode/src/github.com/samuel/go-zookeeper/zk/server_help.go -rw-r--r-- root/root /usr/share/gocode/src/github.com/samuel/go-zookeeper/zk/server_java.go
No differences were encountered in the control files