ZooKeeper Publisher Refactoring and Cleanup
- Removed dependency on channels package for path watches
- Separated unit tests and integration tests
- Removed CircleCI and reverted back the original Travis configuration.
- Proper handler support for Client Session Events
- Renamed configOption to Option
- Fixed error strings
- Renamed Option functions
- Properly handle errors on CreateParentNodes
- Integration tests can be run using tags=integration. Install ZooKeeper inside the loadbalancer/zk/ subdirectory to run:
Bas van Beek
8 years ago
3 | 3 | - 1.4.2 |
4 | 4 | - 1.5 |
5 | 5 | - tip |
6 | ||
7 | before_install: | |
8 | - wget http://archive.apache.org/dist/zookeeper/zookeeper-3.3.3/zookeeper-3.3.3.tar.gz | |
9 | ||
10 | before_script: | |
11 | - tar -C loadbalancer/zk -xzf zookeeper*tar.gz |
0 | dependencies: | |
1 | pre: | |
2 | - wget http://archive.apache.org/dist/zookeeper/zookeeper-3.3.3/zookeeper-3.3.3.tar.gz | |
3 | - tar -C loadbalancer/zk -xzf zookeeper*tar.gz |
5 | 5 | "strings" |
6 | 6 | "time" |
7 | 7 | |
8 | "github.com/eapache/channels" | |
9 | 8 | "github.com/samuel/go-zookeeper/zk" |
10 | 9 | |
11 | 10 | "github.com/go-kit/kit/log" |
12 | 11 | ) |
13 | 12 | |
14 | // DefaultACL is the default ACL to use for creating znodes | |
15 | var DefaultACL = zk.WorldACL(zk.PermAll) | |
13 | // DefaultACL is the default ACL to use for creating znodes. | |
14 | var ( | |
15 | DefaultACL = zk.WorldACL(zk.PermAll) | |
16 | ErrInvalidPath = errors.New("path must start with / character") | |
17 | ErrInvalidCredentials = errors.New("invalid credentials provided") | |
18 | ErrClientClosed = errors.New("client service closed") | |
19 | ) | |
16 | 20 | |
17 | 21 | const ( |
18 | // ConnectTimeout is the default timeout to establish a connection to a | |
19 | // ZooKeeper node. | |
20 | ConnectTimeout = 2 * time.Second | |
21 | // SessionTimeout is the default timeout to keep the current ZooKeeper | |
22 | // session alive during a temporary disconnect. | |
23 | SessionTimeout = 5 * time.Second | |
22 | // DefaultConnectTimeout is the default timeout to establish a connection to | |
23 | // a ZooKeeper node. | |
24 | DefaultConnectTimeout = 2 * time.Second | |
25 | // DefaultSessionTimeout is the default timeout to keep the current | |
26 | // ZooKeeper session alive during a temporary disconnect. | |
27 | DefaultSessionTimeout = 5 * time.Second | |
24 | 28 | ) |
29 | ||
30 | // Event is a data container for znode watch events returned by GetEntries. It | |
31 | // provides a consistent trigger for watch events to Publishers, while still | |
32 | // allowing the underlying ZooKeeper client library watch payload to be | |
33 | // inspected. | |
34 | type Event struct { | |
35 | payload interface{} | |
36 | } | |
25 | 37 | |
26 | 38 | // Client is a wrapper around a lower level ZooKeeper client implementation. |
27 | 39 | type Client interface { |
28 | 40 | // GetEntries should query the provided path in ZooKeeper, place a watch on |
29 | 41 | // it and retrieve data from its current child nodes. |
30 | GetEntries(path string) ([]string, channels.SimpleOutChannel, error) | |
42 | GetEntries(path string) ([]string, <-chan Event, error) | |
31 | 43 | // CreateParentNodes should try to create the path in case it does not exist |
32 | 44 | // yet on ZooKeeper. |
33 | 45 | CreateParentNodes(path string) error |
46 | // Stop should properly shutdown the client implementation | |
47 | Stop() | |
34 | 48 | } |
35 | 49 | |
36 | 50 | type clientConfig struct { |
51 | logger log.Logger | |
37 | 52 | acl []zk.ACL |
53 | credentials []byte | |
38 | 54 | connectTimeout time.Duration |
39 | 55 | sessionTimeout time.Duration |
40 | 56 | rootNodePayload [][]byte |
41 | } | |
42 | ||
43 | type configOption func(c *clientConfig) error | |
57 | eventHandler func(zk.Event) | |
58 | } | |
59 | ||
60 | // Option functions enable friendly APIs. | |
61 | type Option func(*clientConfig) error | |
44 | 62 | |
45 | 63 | type client struct { |
46 | 64 | *zk.Conn |
47 | 65 | clientConfig |
48 | Eventc <-chan zk.Event | |
66 | active bool | |
49 | 67 | quit chan struct{} |
50 | 68 | } |
51 | 69 | |
52 | // WithACL returns a configOption specifying a non-default ACL. | |
53 | func WithACL(acl []zk.ACL) configOption { | |
70 | // ACL returns an Option specifying a non-default ACL for creating parent nodes. | |
71 | func ACL(acl []zk.ACL) Option { | |
54 | 72 | return func(c *clientConfig) error { |
55 | 73 | c.acl = acl |
56 | 74 | return nil |
57 | 75 | } |
58 | 76 | } |
59 | 77 | |
60 | // WithConnectTimeout returns a configOption specifying a non-default connection | |
61 | // timeout when we try to establish a connection to a ZooKeeper server. | |
62 | func WithConnectTimeout(t time.Duration) configOption { | |
78 | // Credentials returns an Option specifying a user/password combination which | |
79 | // the client will use to authenticate itself with. | |
80 | func Credentials(user, pass string) Option { | |
81 | return func(c *clientConfig) error { | |
82 | if user == "" || pass == "" { | |
83 | return ErrInvalidCredentials | |
84 | } | |
85 | c.credentials = []byte(user + ":" + pass) | |
86 | return nil | |
87 | } | |
88 | } | |
89 | ||
90 | // ConnectTimeout returns an Option specifying a non-default connection timeout | |
91 | // when we try to establish a connection to a ZooKeeper server. | |
92 | func ConnectTimeout(t time.Duration) Option { | |
63 | 93 | return func(c *clientConfig) error { |
64 | 94 | if t.Seconds() < 1 { |
65 | return errors.New("Invalid Connect Timeout. Minimum value is 1 second") | |
95 | return errors.New("invalid connect timeout (minimum value is 1 second)") | |
66 | 96 | } |
67 | 97 | c.connectTimeout = t |
68 | 98 | return nil |
69 | 99 | } |
70 | 100 | } |
71 | 101 | |
72 | // WithSessionTimeout returns a configOption specifying a non-default session | |
73 | // timeout. | |
74 | func WithSessionTimeout(t time.Duration) configOption { | |
102 | // SessionTimeout returns an Option specifying a non-default session timeout. | |
103 | func SessionTimeout(t time.Duration) Option { | |
75 | 104 | return func(c *clientConfig) error { |
76 | 105 | if t.Seconds() < 1 { |
77 | return errors.New("Invalid Session Timeout. Minimum value is 1 second") | |
106 | return errors.New("invalid session timeout (minimum value is 1 second)") | |
78 | 107 | } |
79 | 108 | c.sessionTimeout = t |
80 | 109 | return nil |
81 | 110 | } |
82 | 111 | } |
83 | 112 | |
84 | // WithPayload returns a configOption specifying non-default data values for | |
85 | // each znode created by CreateParentNodes. | |
86 | func WithPayload(payload [][]byte) configOption { | |
113 | // Payload returns an Option specifying non-default data values for each znode | |
114 | // created by CreateParentNodes. | |
115 | func Payload(payload [][]byte) Option { | |
87 | 116 | return func(c *clientConfig) error { |
88 | 117 | c.rootNodePayload = payload |
118 | return nil | |
119 | } | |
120 | } | |
121 | ||
122 | // EventHandler returns an Option specifying a callback function to handle | |
123 | // incoming zk.Event payloads (ZooKeeper connection events). | |
124 | func EventHandler(handler func(zk.Event)) Option { | |
125 | return func(c *clientConfig) error { | |
126 | c.eventHandler = handler | |
89 | 127 | return nil |
90 | 128 | } |
91 | 129 | } |
92 | 130 | |
93 | 131 | // NewClient returns a ZooKeeper client with a connection to the server cluster. |
94 | 132 | // It will return an error if the server cluster cannot be resolved. |
95 | func NewClient(servers []string, logger log.Logger, options ...configOption) (Client, error) { | |
133 | func NewClient(servers []string, logger log.Logger, options ...Option) (Client, error) { | |
134 | defaultEventHandler := func(event zk.Event) { | |
135 | logger.Log("eventtype", event.Type.String(), "server", event.Server, "state", event.State.String(), "err", event.Err) | |
136 | } | |
96 | 137 | config := clientConfig{ |
97 | 138 | acl: DefaultACL, |
98 | connectTimeout: ConnectTimeout, | |
99 | sessionTimeout: SessionTimeout, | |
139 | connectTimeout: DefaultConnectTimeout, | |
140 | sessionTimeout: DefaultSessionTimeout, | |
141 | eventHandler: defaultEventHandler, | |
142 | logger: logger, | |
100 | 143 | } |
101 | 144 | for _, option := range options { |
102 | 145 | if err := option(&config); err != nil { |
105 | 148 | } |
106 | 149 | // dialer overrides the default ZooKeeper library Dialer so we can configure |
107 | 150 | // the connectTimeout. The current library has a hardcoded value of 1 second |
108 | // and there are reports of race conditions, due to slow DNS resolvers | |
109 | // and other network latency issues. | |
151 | // and there are reports of race conditions, due to slow DNS resolvers and | |
152 | // other network latency issues. | |
110 | 153 | dialer := func(network, address string, _ time.Duration) (net.Conn, error) { |
111 | 154 | return net.DialTimeout(network, address, config.connectTimeout) |
112 | 155 | } |
113 | 156 | conn, eventc, err := zk.Connect(servers, config.sessionTimeout, withLogger(logger), zk.WithDialer(dialer)) |
157 | ||
114 | 158 | if err != nil { |
115 | 159 | return nil, err |
116 | 160 | } |
117 | return &client{conn, config, eventc, make(chan struct{})}, nil | |
161 | ||
162 | if len(config.credentials) > 0 { | |
163 | err = conn.AddAuth("digest", config.credentials) | |
164 | if err != nil { | |
165 | return nil, err | |
166 | } | |
167 | } | |
168 | ||
169 | c := &client{conn, config, true, make(chan struct{})} | |
170 | ||
171 | // Start listening for incoming Event payloads and callback the set | |
172 | // eventHandler. | |
173 | go func() { | |
174 | for { | |
175 | select { | |
176 | case event := <-eventc: | |
177 | config.eventHandler(event) | |
178 | case <-c.quit: | |
179 | return | |
180 | } | |
181 | } | |
182 | }() | |
183 | return c, nil | |
118 | 184 | } |
119 | 185 | |
120 | 186 | // CreateParentNodes implements the ZooKeeper Client interface. |
121 | 187 | func (c *client) CreateParentNodes(path string) error { |
188 | if !c.active { | |
189 | return ErrClientClosed | |
190 | } | |
191 | if path[0] != '/' { | |
192 | return ErrInvalidPath | |
193 | } | |
122 | 194 | payload := []byte("") |
123 | 195 | pathString := "" |
124 | 196 | pathNodes := strings.Split(path, "/") |
130 | 202 | } |
131 | 203 | pathString += "/" + pathNodes[i] |
132 | 204 | _, err := c.Create(pathString, payload, 0, c.acl) |
205 | // not being able to create the node because it exists or not having | |
206 | // sufficient rights is not an issue. It is ok for the node to already | |
207 | // exist and/or us to only have read rights | |
133 | 208 | if err != nil && err != zk.ErrNodeExists && err != zk.ErrNoAuth { |
134 | 209 | return err |
135 | 210 | } |
138 | 213 | } |
139 | 214 | |
140 | 215 | // GetEntries implements the ZooKeeper Client interface. |
141 | func (c *client) GetEntries(path string) ([]string, channels.SimpleOutChannel, error) { | |
216 | func (c *client) GetEntries(path string) ([]string, <-chan Event, error) { | |
217 | eventc := make(chan Event, 1) | |
218 | if !c.active { | |
219 | close(eventc) | |
220 | return nil, eventc, ErrClientClosed | |
221 | } | |
142 | 222 | // retrieve list of child nodes for given path and add watch to path |
143 | 223 | znodes, _, updateReceived, err := c.ChildrenW(path) |
224 | go func() { | |
225 | // wait for update and forward over Event channel | |
226 | payload := <-updateReceived | |
227 | eventc <- Event{payload} | |
228 | }() | |
229 | ||
144 | 230 | if err != nil { |
145 | return nil, channels.Wrap(updateReceived), err | |
231 | return nil, eventc, err | |
146 | 232 | } |
147 | 233 | |
148 | 234 | var resp []string |
152 | 238 | resp = append(resp, string(data)) |
153 | 239 | } |
154 | 240 | } |
155 | return resp, channels.Wrap(updateReceived), nil | |
156 | } | |
241 | return resp, eventc, nil | |
242 | } | |
243 | ||
244 | // Stop implements the ZooKeeper Client interface. | |
245 | func (c *client) Stop() { | |
246 | c.active = false | |
247 | close(c.quit) | |
248 | c.Close() | |
249 | } |
1 | 1 | |
2 | 2 | import ( |
3 | 3 | "bytes" |
4 | "fmt" | |
5 | 4 | "testing" |
6 | 5 | "time" |
7 | 6 | |
8 | 7 | stdzk "github.com/samuel/go-zookeeper/zk" |
9 | 8 | ) |
10 | ||
11 | type logWriter struct { | |
12 | t *testing.T | |
13 | p string | |
14 | } | |
15 | ||
16 | func (lw logWriter) Write(b []byte) (int, error) { | |
17 | lw.t.Logf("%s%s", lw.p, string(b)) | |
18 | return len(b), nil | |
19 | } | |
20 | 9 | |
21 | 10 | func TestNewClient(t *testing.T) { |
22 | 11 | var ( |
26 | 15 | payload = [][]byte{[]byte("Payload"), []byte("Test")} |
27 | 16 | ) |
28 | 17 | |
29 | ts, host := startCluster(t) | |
30 | defer ts.Stop() | |
18 | c, err := NewClient( | |
19 | []string{"FailThisInvalidHost!!!"}, | |
20 | logger, | |
21 | ) | |
31 | 22 | |
32 | c, err := NewClient( | |
33 | host, | |
23 | time.Sleep(1 * time.Millisecond) | |
24 | if err == nil { | |
25 | t.Errorf("expected error, got nil") | |
26 | } | |
27 | calledEventHandler := false | |
28 | eventHandler := func(event stdzk.Event) { | |
29 | calledEventHandler = true | |
30 | } | |
31 | c, err = NewClient( | |
32 | []string{"localhost"}, | |
34 | 33 | logger, |
35 | WithACL(acl), | |
36 | WithConnectTimeout(connectTimeout), | |
37 | WithSessionTimeout(sessionTimeout), | |
38 | WithPayload(payload), | |
34 | ACL(acl), | |
35 | ConnectTimeout(connectTimeout), | |
36 | SessionTimeout(sessionTimeout), | |
37 | Payload(payload), | |
38 | EventHandler(eventHandler), | |
39 | 39 | ) |
40 | 40 | if err != nil { |
41 | 41 | t.Fatal(err) |
42 | 42 | } |
43 | defer c.Stop() | |
43 | 44 | clientImpl, ok := c.(*client) |
44 | 45 | if !ok { |
45 | 46 | t.Errorf("retrieved incorrect Client implementation") |
56 | 57 | if want, have := payload, clientImpl.rootNodePayload; bytes.Compare(want[0], have[0]) != 0 || bytes.Compare(want[1], have[1]) != 0 { |
57 | 58 | t.Errorf("want %q, have %q", want, have) |
58 | 59 | } |
60 | // Allow EventHandler to be called | |
61 | time.Sleep(1 * time.Millisecond) | |
62 | ||
63 | if want, have := true, calledEventHandler; want != have { | |
64 | t.Errorf("want %t, have %t", want, have) | |
65 | } | |
59 | 66 | } |
60 | 67 | |
61 | func TestCreateParentNodes(t *testing.T) { | |
62 | ts, host := startCluster(t) | |
63 | defer ts.Stop() | |
64 | ||
65 | payload := [][]byte{[]byte("Payload"), []byte("Test")} | |
66 | client, err := NewClient(host, logger, WithPayload(payload)) | |
67 | if err != nil { | |
68 | t.Fatal(err) | |
69 | } | |
70 | if client == nil { | |
71 | t.Error("expected pointer to client, got nil") | |
68 | func TestOptions(t *testing.T) { | |
69 | _, err := NewClient([]string{"localhost"}, logger, Credentials("valid", "credentials")) | |
70 | if err != nil && err != stdzk.ErrNoServer { | |
71 | t.Errorf("unexpected error: %q", err) | |
72 | 72 | } |
73 | 73 | |
74 | p, err := NewPublisher(client, path, NewFactory(""), logger) | |
75 | if err != nil { | |
76 | t.Fatal(err) | |
77 | } | |
78 | endpoints, err := p.Endpoints() | |
79 | if err != nil { | |
80 | t.Fatal(err) | |
81 | } | |
82 | if want, have := 0, len(endpoints); want != have { | |
74 | _, err = NewClient([]string{"localhost"}, logger, Credentials("nopass", "")) | |
75 | if want, have := err, ErrInvalidCredentials; want != have { | |
83 | 76 | t.Errorf("want %q, have %q", want, have) |
84 | 77 | } |
85 | 78 | |
86 | zk1, err := ts.Connect(0) | |
87 | if err != nil { | |
88 | t.Fatalf("Connect returned error: %+v", err) | |
89 | } | |
90 | data, _, err := zk1.Get(path) | |
91 | if err != nil { | |
92 | t.Fatal(err) | |
93 | } | |
94 | // test Client implementation of CreateParentNodes. It should have created | |
95 | // our payload | |
96 | if bytes.Compare(data, payload[1]) != 0 { | |
97 | t.Errorf("want %q, have %q", payload[1], data) | |
79 | _, err = NewClient([]string{"localhost"}, logger, ConnectTimeout(0)) | |
80 | if err == nil { | |
81 | t.Errorf("expected connect timeout error") | |
98 | 82 | } |
99 | 83 | |
100 | } | |
101 | ||
102 | func TestGetEntries(t *testing.T) { | |
103 | var instancePayload = "protocol://hostname:port/routing" | |
104 | ts, host := startCluster(t) | |
105 | defer ts.Stop() | |
106 | ||
107 | zk1, err := ts.Connect(0) | |
108 | if err != nil { | |
109 | t.Fatalf("Connect returned error: %+v", err) | |
110 | } | |
111 | defer zk1.Close() | |
112 | ||
113 | c, err := NewClient(host, logger) | |
114 | p, err := NewPublisher(c, path, NewFactory(""), logger) | |
115 | if err != nil { | |
116 | t.Fatal(err) | |
117 | } | |
118 | ||
119 | _, err = zk1.Create( | |
120 | path+"/instance1", | |
121 | []byte(instancePayload), | |
122 | stdzk.FlagEphemeral|stdzk.FlagSequence, | |
123 | stdzk.WorldACL(stdzk.PermAll), | |
124 | ) | |
125 | if err != nil { | |
126 | t.Fatalf("Unable to create test ephemeral znode: %+v", err) | |
127 | } | |
128 | ||
129 | time.Sleep(1 * time.Second) | |
130 | ||
131 | endpoints, err := p.Endpoints() | |
132 | if err != nil { | |
133 | t.Fatal(err) | |
134 | } | |
135 | if want, have := 1, len(endpoints); want != have { | |
136 | t.Errorf("want %q, have %q", want, have) | |
84 | _, err = NewClient([]string{"localhost"}, logger, SessionTimeout(0)) | |
85 | if err == nil { | |
86 | t.Errorf("expected connect timeout error") | |
137 | 87 | } |
138 | 88 | } |
139 | ||
140 | func startCluster(t *testing.T) (*stdzk.TestCluster, []string) { | |
141 | // Start ZooKeeper Test Cluster | |
142 | ts, err := stdzk.StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "}) | |
143 | if err != nil { | |
144 | t.Fatal(err) | |
145 | } | |
146 | host := fmt.Sprintf("localhost:%d", ts.Servers[0].Port) | |
147 | return ts, []string{host} | |
148 | } |
0 | // +build integration | |
1 | ||
2 | package zk | |
3 | ||
4 | import ( | |
5 | "bytes" | |
6 | "fmt" | |
7 | "os" | |
8 | "testing" | |
9 | "time" | |
10 | ||
11 | stdzk "github.com/samuel/go-zookeeper/zk" | |
12 | ) | |
13 | ||
14 | var ( | |
15 | host []string | |
16 | ) | |
17 | ||
18 | func TestMain(m *testing.M) { | |
19 | fmt.Println("ZooKeeper Integration Test Initializing. Starting ZooKeeper Server...") | |
20 | ts, err := stdzk.StartTestCluster(1, nil, nil) | |
21 | host = []string{fmt.Sprintf("localhost:%d", ts.Servers[0].Port)} | |
22 | if err != nil { | |
23 | fmt.Printf("Unable to start ZooKeeper Server: %s", err) | |
24 | os.Exit(-1) | |
25 | } | |
26 | code := m.Run() | |
27 | ts.Stop() | |
28 | os.Exit(code) | |
29 | } | |
30 | ||
31 | func TestCreateParentNodesOnServer(t *testing.T) { | |
32 | payload := [][]byte{[]byte("Payload"), []byte("Test")} | |
33 | c1, err := NewClient(host, logger, Payload(payload)) | |
34 | if err != nil { | |
35 | t.Fatal(err) | |
36 | } | |
37 | if c1 == nil { | |
38 | t.Error("expected pointer to client, got nil") | |
39 | } | |
40 | defer c1.Stop() | |
41 | ||
42 | p, err := NewPublisher(c1, path, NewFactory(""), logger) | |
43 | if err != nil { | |
44 | t.Fatal(err) | |
45 | } | |
46 | defer p.Stop() | |
47 | ||
48 | endpoints, err := p.Endpoints() | |
49 | if err != nil { | |
50 | t.Fatal(err) | |
51 | } | |
52 | if want, have := 0, len(endpoints); want != have { | |
53 | t.Errorf("want %q, have %q", want, have) | |
54 | } | |
55 | ||
56 | c2, err := NewClient(host, logger) | |
57 | if err != nil { | |
58 | t.Fatalf("Connect returned error: %+v", err) | |
59 | } | |
60 | defer c2.Stop() | |
61 | c2impl, _ := c2.(*client) | |
62 | data, _, err := c2impl.Get(path) | |
63 | if err != nil { | |
64 | t.Fatal(err) | |
65 | } | |
66 | // test Client implementation of CreateParentNodes. It should have created | |
67 | // our payload | |
68 | if bytes.Compare(data, payload[1]) != 0 { | |
69 | t.Errorf("want %q, have %q", payload[1], data) | |
70 | } | |
71 | ||
72 | } | |
73 | ||
74 | func TestCreateBadParentNodesOnServer(t *testing.T) { | |
75 | c, _ := NewClient(host, logger) | |
76 | defer c.Stop() | |
77 | ||
78 | _, err := NewPublisher(c, "invalid/path", NewFactory(""), logger) | |
79 | ||
80 | if want, have := ErrInvalidPath, err; want != have { | |
81 | t.Errorf("want %q, have %q", want, have) | |
82 | } | |
83 | } | |
84 | ||
85 | func TestCredentials1(t *testing.T) { | |
86 | acl := stdzk.DigestACL(stdzk.PermAll, "user", "secret") | |
87 | c, _ := NewClient(host, logger, ACL(acl), Credentials("user", "secret")) | |
88 | defer c.Stop() | |
89 | ||
90 | _, err := NewPublisher(c, "/acl-issue-test", NewFactory(""), logger) | |
91 | ||
92 | if err != nil { | |
93 | t.Fatal(err) | |
94 | } | |
95 | } | |
96 | ||
97 | func TestCredentials2(t *testing.T) { | |
98 | acl := stdzk.DigestACL(stdzk.PermAll, "user", "secret") | |
99 | c, _ := NewClient(host, logger, ACL(acl)) | |
100 | defer c.Stop() | |
101 | ||
102 | _, err := NewPublisher(c, "/acl-issue-test", NewFactory(""), logger) | |
103 | ||
104 | if err != stdzk.ErrNoAuth { | |
105 | t.Errorf("want %q, have %q", stdzk.ErrNoAuth, err) | |
106 | } | |
107 | } | |
108 | ||
109 | func TestConnection(t *testing.T) { | |
110 | c, _ := NewClient(host, logger) | |
111 | c.Stop() | |
112 | ||
113 | _, err := NewPublisher(c, "/acl-issue-test", NewFactory(""), logger) | |
114 | ||
115 | if err != ErrClientClosed { | |
116 | t.Errorf("want %q, have %q", stdzk.ErrNoAuth, err) | |
117 | } | |
118 | } | |
119 | ||
120 | func TestGetEntriesOnServer(t *testing.T) { | |
121 | var instancePayload = "protocol://hostname:port/routing" | |
122 | ||
123 | c1, err := NewClient(host, logger) | |
124 | if err != nil { | |
125 | t.Fatalf("Connect returned error: %+v", err) | |
126 | } | |
127 | ||
128 | defer c1.Stop() | |
129 | ||
130 | c2, err := NewClient(host, logger) | |
131 | p, err := NewPublisher(c2, path, NewFactory(""), logger) | |
132 | if err != nil { | |
133 | t.Fatal(err) | |
134 | } | |
135 | defer c2.Stop() | |
136 | ||
137 | c2impl, _ := c2.(*client) | |
138 | _, err = c2impl.Create( | |
139 | path+"/instance1", | |
140 | []byte(instancePayload), | |
141 | stdzk.FlagEphemeral|stdzk.FlagSequence, | |
142 | stdzk.WorldACL(stdzk.PermAll), | |
143 | ) | |
144 | if err != nil { | |
145 | t.Fatalf("Unable to create test ephemeral znode 1: %+v", err) | |
146 | } | |
147 | _, err = c2impl.Create( | |
148 | path+"/instance2", | |
149 | []byte(instancePayload+"2"), | |
150 | stdzk.FlagEphemeral|stdzk.FlagSequence, | |
151 | stdzk.WorldACL(stdzk.PermAll), | |
152 | ) | |
153 | if err != nil { | |
154 | t.Fatalf("Unable to create test ephemeral znode 2: %+v", err) | |
155 | } | |
156 | ||
157 | time.Sleep(1 * time.Millisecond) | |
158 | ||
159 | endpoints, err := p.Endpoints() | |
160 | if err != nil { | |
161 | t.Fatal(err) | |
162 | } | |
163 | if want, have := 2, len(endpoints); want != have { | |
164 | t.Errorf("want %q, have %q", want, have) | |
165 | } | |
166 | } | |
167 | ||
168 | func TestGetEntriesPayloadOnServer(t *testing.T) { | |
169 | c, err := NewClient(host, logger) | |
170 | if err != nil { | |
171 | t.Fatal(err) | |
172 | } | |
173 | _, eventc, err := c.GetEntries(path) | |
174 | if err != nil { | |
175 | t.Fatal(err) | |
176 | } | |
177 | cimpl, _ := c.(*client) | |
178 | _, err = cimpl.Create( | |
179 | path+"/instance3", | |
180 | []byte("just some payload"), | |
181 | stdzk.FlagEphemeral|stdzk.FlagSequence, | |
182 | stdzk.WorldACL(stdzk.PermAll), | |
183 | ) | |
184 | select { | |
185 | case event := <-eventc: | |
186 | payload, ok := event.payload.(stdzk.Event) | |
187 | if !ok { | |
188 | t.Errorf("expected payload to be of type %s", "zk.Event") | |
189 | } | |
190 | if want, have := stdzk.EventNodeChildrenChanged, payload.Type; want != have { | |
191 | t.Errorf("want %q, have %q", want, have) | |
192 | } | |
193 | case <-time.After(20 * time.Millisecond): | |
194 | t.Errorf("expected incoming watch event, timeout occurred") | |
195 | } | |
196 | ||
197 | } |
0 | 0 | package zk |
1 | 1 | |
2 | 2 | import ( |
3 | "github.com/eapache/channels" | |
4 | ||
5 | 3 | "github.com/go-kit/kit/endpoint" |
6 | 4 | "github.com/go-kit/kit/loadbalancer" |
7 | 5 | "github.com/go-kit/kit/log" |
28 | 26 | quit: make(chan struct{}), |
29 | 27 | } |
30 | 28 | |
31 | // try to create path nodes if they are not available | |
32 | p.client.CreateParentNodes(p.path) | |
29 | err := p.client.CreateParentNodes(p.path) | |
30 | if err != nil { | |
31 | return nil, err | |
32 | } | |
33 | 33 | |
34 | 34 | // intial node retrieval and cache fill |
35 | instances, simpleOutChannel, err := p.client.GetEntries(p.path) | |
35 | instances, eventc, err := p.client.GetEntries(p.path) | |
36 | 36 | if err != nil { |
37 | 37 | logger.Log("path", p.path, "msg", "failed to retrieve entries", "err", err) |
38 | } else { | |
39 | logger.Log("path", p.path, "instances", len(instances)) | |
38 | return nil, err | |
40 | 39 | } |
40 | logger.Log("path", p.path, "instances", len(instances)) | |
41 | 41 | p.cache.Replace(instances) |
42 | 42 | |
43 | 43 | // handle incoming path updates |
44 | go p.loop(simpleOutChannel) | |
44 | go p.loop(eventc) | |
45 | 45 | |
46 | 46 | return p, nil |
47 | 47 | } |
48 | 48 | |
49 | func (p *Publisher) loop(simpleOutChannel channels.SimpleOutChannel) { | |
49 | func (p *Publisher) loop(eventc <-chan Event) { | |
50 | 50 | var ( |
51 | 51 | instances []string |
52 | 52 | err error |
53 | 53 | ) |
54 | 54 | for { |
55 | responseChan := simpleOutChannel.Out() | |
56 | 55 | select { |
57 | case <-responseChan: | |
56 | case _, more := <-eventc: | |
57 | if !more { | |
58 | // channel was closed by client... end this loop | |
59 | return | |
60 | } | |
58 | 61 | // we received a path update notification, call GetEntries to |
59 | 62 | // retrieve child node data and set new watch as zk watches are one |
60 | 63 | // time triggers |
61 | instances, simpleOutChannel, err = p.client.GetEntries(p.path) | |
64 | instances, eventc, err = p.client.GetEntries(p.path) | |
62 | 65 | if err != nil { |
63 | 66 | p.logger.Log("path", p.path, "msg", "failed to retrieve entries", "err", err) |
64 | 67 | continue |
5 | 5 | "testing" |
6 | 6 | "time" |
7 | 7 | |
8 | "github.com/eapache/channels" | |
9 | 8 | "golang.org/x/net/context" |
10 | 9 | |
11 | 10 | "github.com/go-kit/kit/endpoint" |
119 | 118 | } |
120 | 119 | |
121 | 120 | type fakeClient struct { |
122 | ch chan bool | |
121 | ch chan Event | |
123 | 122 | responses map[string]string |
124 | 123 | result bool |
125 | 124 | } |
126 | 125 | |
127 | 126 | func newFakeClient() *fakeClient { |
128 | 127 | return &fakeClient{ |
129 | make(chan bool, 1), | |
128 | make(chan Event, 1), | |
130 | 129 | make(map[string]string), |
131 | 130 | true, |
132 | 131 | } |
136 | 135 | return nil |
137 | 136 | } |
138 | 137 | |
139 | func (c *fakeClient) GetEntries(path string) ([]string, channels.SimpleOutChannel, error) { | |
138 | func (c *fakeClient) GetEntries(path string) ([]string, <-chan Event, error) { | |
140 | 139 | responses := []string{} |
141 | 140 | if c.result == false { |
142 | 141 | c.result = true |
143 | return responses, channels.Wrap(c.ch), errors.New("Dummy Error") | |
142 | return responses, c.ch, errors.New("Dummy Error") | |
144 | 143 | } |
145 | 144 | for _, data := range c.responses { |
146 | 145 | responses = append(responses, data) |
147 | 146 | } |
148 | return responses, channels.Wrap(c.ch), nil | |
147 | return responses, c.ch, nil | |
149 | 148 | } |
150 | 149 | |
151 | 150 | func (c *fakeClient) AddService(node, data string) { |
163 | 162 | c.triggerWatch() |
164 | 163 | } |
165 | 164 | |
165 | func (c *fakeClient) Stop() {} | |
166 | ||
166 | 167 | func NewFactory(fakeError string) loadbalancer.Factory { |
167 | 168 | return func(string) (endpoint.Endpoint, io.Closer, error) { |
168 | 169 | if fakeError == "" { |
173 | 174 | } |
174 | 175 | |
175 | 176 | func (c *fakeClient) triggerWatch() { |
176 | c.ch <- true | |
177 | c.ch <- Event{true} | |
177 | 178 | // watches on ZooKeeper Nodes trigger once, most ZooKeeper libraries also |
178 | 179 | // implement "fire once" channels for these watches |
179 | 180 | close(c.ch) |
180 | c.ch = make(chan bool, 1) | |
181 | c.ch = make(chan Event, 1) | |
181 | 182 | |
182 | 183 | // make sure we allow the Publisher to handle this update |
183 | time.Sleep(50 * time.Millisecond) | |
184 | time.Sleep(1 * time.Millisecond) | |
184 | 185 | } |