Codebase list golang-github-go-kit-kit / a9459ea
ZooKeeper Publisher improved unit tests and passthrough of zk.Event chan Bas van Beek 8 years ago
5 changed file(s) with 159 addition(s) and 105 deletion(s). Raw diff Collapse all Expand all
1313 // DefaultACL is the default ACL to use for creating znodes.
1414 var (
1515 DefaultACL = zk.WorldACL(zk.PermAll)
16 ErrInvalidPath = errors.New("path must start with / character")
1716 ErrInvalidCredentials = errors.New("invalid credentials provided")
1817 ErrClientClosed = errors.New("client service closed")
1918 )
2726 DefaultSessionTimeout = 5 * time.Second
2827 )
2928
30 // Event is a data container for znode watch events returned by GetEntries. It
31 // provides a consistent trigger for watch events to Publishers, while still
32 // allowing the underlying ZooKeeper client library watch payload to be
33 // inspected.
34 type Event struct {
35 payload interface{}
36 }
37
3829 // Client is a wrapper around a lower level ZooKeeper client implementation.
3930 type Client interface {
4031 // GetEntries should query the provided path in ZooKeeper, place a watch on
4132 // it and retrieve data from its current child nodes.
42 GetEntries(path string) ([]string, <-chan Event, error)
33 GetEntries(path string) ([]string, <-chan zk.Event, error)
4334 // CreateParentNodes should try to create the path in case it does not exist
4435 // yet on ZooKeeper.
4536 CreateParentNodes(path string) error
189180 return ErrClientClosed
190181 }
191182 if path[0] != '/' {
192 return ErrInvalidPath
183 return zk.ErrInvalidPath
193184 }
194185 payload := []byte("")
195186 pathString := ""
213204 }
214205
215206 // GetEntries implements the ZooKeeper Client interface.
216 func (c *client) GetEntries(path string) ([]string, <-chan Event, error) {
217 eventc := make(chan Event, 1)
218 if !c.active {
219 close(eventc)
220 return nil, eventc, ErrClientClosed
221 }
207 func (c *client) GetEntries(path string) ([]string, <-chan zk.Event, error) {
222208 // retrieve list of child nodes for given path and add watch to path
223 znodes, _, updateReceived, err := c.ChildrenW(path)
224 go func() {
225 // wait for update and forward over Event channel
226 payload := <-updateReceived
227 eventc <- Event{payload}
228 }()
209 znodes, _, eventc, err := c.ChildrenW(path)
229210
230211 if err != nil {
231212 return nil, eventc, err
4646 t.Errorf("retrieved incorrect Client implementation")
4747 }
4848 if want, have := acl, clientImpl.acl; want[0] != have[0] {
49 t.Errorf("want %q, have %q", want, have)
49 t.Errorf("want %+v, have %+v", want, have)
5050 }
5151 if want, have := connectTimeout, clientImpl.connectTimeout; want != have {
52 t.Errorf("want %q, have %q", want, have)
52 t.Errorf("want %d, have %d", want, have)
5353 }
5454 if want, have := sessionTimeout, clientImpl.sessionTimeout; want != have {
55 t.Errorf("want %q, have %q", want, have)
55 t.Errorf("want %d, have %d", want, have)
5656 }
5757 if want, have := payload, clientImpl.rootNodePayload; bytes.Compare(want[0], have[0]) != 0 || bytes.Compare(want[1], have[1]) != 0 {
58 t.Errorf("want %q, have %q", want, have)
58 t.Errorf("want %s, have %s", want, have)
5959 }
6060 // Allow EventHandler to be called
6161 time.Sleep(1 * time.Millisecond)
6868 func TestOptions(t *testing.T) {
6969 _, err := NewClient([]string{"localhost"}, logger, Credentials("valid", "credentials"))
7070 if err != nil && err != stdzk.ErrNoServer {
71 t.Errorf("unexpected error: %q", err)
71 t.Errorf("unexpected error: %v", err)
7272 }
7373
7474 _, err = NewClient([]string{"localhost"}, logger, Credentials("nopass", ""))
7575 if want, have := err, ErrInvalidCredentials; want != have {
76 t.Errorf("want %q, have %q", want, have)
76 t.Errorf("want %v, have %v", want, have)
7777 }
7878
7979 _, err = NewClient([]string{"localhost"}, logger, ConnectTimeout(0))
8686 t.Errorf("expected connect timeout error")
8787 }
8888 }
89
90 func TestCreateParentNodes(t *testing.T) {
91 payload := [][]byte{[]byte("Payload"), []byte("Test")}
92
93 c, err := NewClient([]string{"localhost:65500"}, logger)
94 if err != nil {
95 t.Errorf("unexpected error: %v", err)
96 }
97 if c == nil {
98 t.Fatalf("expected new Client, got nil")
99 }
100 p, err := NewPublisher(c, "/validpath", newFactory(""), logger)
101 if err != stdzk.ErrNoServer {
102 t.Errorf("unexpected error: %v", err)
103 }
104 if p != nil {
105 t.Errorf("expected failed new Publisher")
106 }
107 p, err = NewPublisher(c, "invalidpath", newFactory(""), logger)
108 if err != stdzk.ErrInvalidPath {
109 t.Errorf("unexpected error: %v", err)
110 }
111 _, _, err = c.GetEntries("/validpath")
112 if err != stdzk.ErrNoServer {
113 t.Errorf("unexpected error: %v", err)
114 }
115 // stopping Client
116 c.Stop()
117 err = c.CreateParentNodes("/validpath")
118 if err != ErrClientClosed {
119 t.Errorf("unexpected error: %v", err)
120 }
121 p, err = NewPublisher(c, "/validpath", newFactory(""), logger)
122 if err != ErrClientClosed {
123 t.Errorf("unexpected error: %v", err)
124 }
125 if p != nil {
126 t.Errorf("expected failed new Publisher")
127 }
128 c, err = NewClient([]string{"localhost:65500"}, logger, Payload(payload))
129 if err != nil {
130 t.Errorf("unexpected error: %v", err)
131 }
132 if c == nil {
133 t.Fatalf("expected new Client, got nil")
134 }
135 p, err = NewPublisher(c, "/validpath", newFactory(""), logger)
136 if err != stdzk.ErrNoServer {
137 t.Errorf("unexpected error: %v", err)
138 }
139 if p != nil {
140 t.Errorf("expected failed new Publisher")
141 }
142 }
1818 func TestMain(m *testing.M) {
1919 fmt.Println("ZooKeeper Integration Test Initializing. Starting ZooKeeper Server...")
2020 ts, err := stdzk.StartTestCluster(1, nil, nil)
21 host = []string{fmt.Sprintf("localhost:%d", ts.Servers[0].Port)}
2221 if err != nil {
23 fmt.Printf("Unable to start ZooKeeper Server: %s", err)
22 fmt.Printf("Unable to start ZooKeeper Server: %v\n", err)
2423 os.Exit(-1)
2524 }
25 defer ts.Stop()
26 host = []string{fmt.Sprintf("localhost:%d", ts.Servers[0].Port)}
2627 code := m.Run()
27 ts.Stop()
2828 os.Exit(code)
2929 }
3030
3232 payload := [][]byte{[]byte("Payload"), []byte("Test")}
3333 c1, err := NewClient(host, logger, Payload(payload))
3434 if err != nil {
35 t.Fatal(err)
35 t.Fatalf("Connect returned error: %v", err)
3636 }
3737 if c1 == nil {
38 t.Error("expected pointer to client, got nil")
38 t.Fatal("Expected pointer to client, got nil")
3939 }
4040 defer c1.Stop()
4141
42 p, err := NewPublisher(c1, path, NewFactory(""), logger)
42 p, err := NewPublisher(c1, path, newFactory(""), logger)
4343 if err != nil {
44 t.Fatal(err)
44 t.Fatalf("Unable to create Publisher: %v", err)
4545 }
4646 defer p.Stop()
4747
5050 t.Fatal(err)
5151 }
5252 if want, have := 0, len(endpoints); want != have {
53 t.Errorf("want %q, have %q", want, have)
53 t.Errorf("want %d, have %d", want, have)
5454 }
5555
5656 c2, err := NewClient(host, logger)
5757 if err != nil {
58 t.Fatalf("Connect returned error: %+v", err)
58 t.Fatalf("Connect returned error: %v", err)
5959 }
6060 defer c2.Stop()
61 c2impl, _ := c2.(*client)
62 data, _, err := c2impl.Get(path)
61 data, _, err := c2.(*client).Get(path)
6362 if err != nil {
6463 t.Fatal(err)
6564 }
6665 // test Client implementation of CreateParentNodes. It should have created
6766 // our payload
6867 if bytes.Compare(data, payload[1]) != 0 {
69 t.Errorf("want %q, have %q", payload[1], data)
68 t.Errorf("want %s, have %s", payload[1], data)
7069 }
7170
7271 }
7574 c, _ := NewClient(host, logger)
7675 defer c.Stop()
7776
78 _, err := NewPublisher(c, "invalid/path", NewFactory(""), logger)
77 _, err := NewPublisher(c, "invalid/path", newFactory(""), logger)
7978
80 if want, have := ErrInvalidPath, err; want != have {
81 t.Errorf("want %q, have %q", want, have)
79 if want, have := stdzk.ErrInvalidPath, err; want != have {
80 t.Errorf("want %v, have %v", want, have)
8281 }
8382 }
8483
8786 c, _ := NewClient(host, logger, ACL(acl), Credentials("user", "secret"))
8887 defer c.Stop()
8988
90 _, err := NewPublisher(c, "/acl-issue-test", NewFactory(""), logger)
89 _, err := NewPublisher(c, "/acl-issue-test", newFactory(""), logger)
9190
9291 if err != nil {
9392 t.Fatal(err)
9998 c, _ := NewClient(host, logger, ACL(acl))
10099 defer c.Stop()
101100
102 _, err := NewPublisher(c, "/acl-issue-test", NewFactory(""), logger)
101 _, err := NewPublisher(c, "/acl-issue-test", newFactory(""), logger)
103102
104103 if err != stdzk.ErrNoAuth {
105 t.Errorf("want %q, have %q", stdzk.ErrNoAuth, err)
104 t.Errorf("want %v, have %v", stdzk.ErrNoAuth, err)
106105 }
107106 }
108107
110109 c, _ := NewClient(host, logger)
111110 c.Stop()
112111
113 _, err := NewPublisher(c, "/acl-issue-test", NewFactory(""), logger)
112 _, err := NewPublisher(c, "/acl-issue-test", newFactory(""), logger)
114113
115114 if err != ErrClientClosed {
116 t.Errorf("want %q, have %q", stdzk.ErrNoAuth, err)
115 t.Errorf("want %v, have %v", ErrClientClosed, err)
117116 }
118117 }
119118
122121
123122 c1, err := NewClient(host, logger)
124123 if err != nil {
125 t.Fatalf("Connect returned error: %+v", err)
124 t.Fatalf("Connect returned error: %v", err)
126125 }
127126
128127 defer c1.Stop()
129128
130129 c2, err := NewClient(host, logger)
131 p, err := NewPublisher(c2, path, NewFactory(""), logger)
130 p, err := NewPublisher(c2, path, newFactory(""), logger)
132131 if err != nil {
133132 t.Fatal(err)
134133 }
142141 stdzk.WorldACL(stdzk.PermAll),
143142 )
144143 if err != nil {
145 t.Fatalf("Unable to create test ephemeral znode 1: %+v", err)
144 t.Fatalf("Unable to create test ephemeral znode 1: %v", err)
146145 }
147146 _, err = c2impl.Create(
148147 path+"/instance2",
151150 stdzk.WorldACL(stdzk.PermAll),
152151 )
153152 if err != nil {
154 t.Fatalf("Unable to create test ephemeral znode 2: %+v", err)
153 t.Fatalf("Unable to create test ephemeral znode 2: %v", err)
155154 }
156155
157 time.Sleep(1 * time.Millisecond)
156 time.Sleep(50 * time.Millisecond)
158157
159158 endpoints, err := p.Endpoints()
160159 if err != nil {
161160 t.Fatal(err)
162161 }
163162 if want, have := 2, len(endpoints); want != have {
164 t.Errorf("want %q, have %q", want, have)
163 t.Errorf("want %d, have %d", want, have)
165164 }
166165 }
167166
168167 func TestGetEntriesPayloadOnServer(t *testing.T) {
169168 c, err := NewClient(host, logger)
170169 if err != nil {
171 t.Fatal(err)
170 t.Fatalf("Connect returned error: %v", err)
172171 }
173172 _, eventc, err := c.GetEntries(path)
174173 if err != nil {
175174 t.Fatal(err)
176175 }
177 cimpl, _ := c.(*client)
178 _, err = cimpl.Create(
176 _, err = c.(*client).Create(
179177 path+"/instance3",
180178 []byte("just some payload"),
181179 stdzk.FlagEphemeral|stdzk.FlagSequence,
182180 stdzk.WorldACL(stdzk.PermAll),
183181 )
182 if err != nil {
183 t.Fatalf("Unable to create test ephemeral znode: %v", err)
184 }
184185 select {
185186 case event := <-eventc:
186 payload, ok := event.payload.(stdzk.Event)
187 if !ok {
188 t.Errorf("expected payload to be of type %s", "zk.Event")
189 }
190 if want, have := stdzk.EventNodeChildrenChanged, payload.Type; want != have {
191 t.Errorf("want %q, have %q", want, have)
187 if want, have := stdzk.EventNodeChildrenChanged.String(), event.Type.String(); want != have {
188 t.Errorf("want %s, have %s", want, have)
192189 }
193190 case <-time.After(20 * time.Millisecond):
194191 t.Errorf("expected incoming watch event, timeout occurred")
33 "github.com/go-kit/kit/endpoint"
44 "github.com/go-kit/kit/loadbalancer"
55 "github.com/go-kit/kit/log"
6 "github.com/samuel/go-zookeeper/zk"
67 )
78
89 // Publisher yield endpoints stored in a certain ZooKeeper path. Any kind of
4647 return p, nil
4748 }
4849
49 func (p *Publisher) loop(eventc <-chan Event) {
50 func (p *Publisher) loop(eventc <-chan zk.Event) {
5051 var (
5152 instances []string
5253 err error
5354 )
5455 for {
5556 select {
56 case _, more := <-eventc:
57 if !more {
58 // channel was closed by client... end this loop
59 return
60 }
57 case <-eventc:
6158 // we received a path update notification, call GetEntries to
6259 // retrieve child node data and set new watch as zk watches are one
6360 // time triggers
1010 "github.com/go-kit/kit/endpoint"
1111 "github.com/go-kit/kit/loadbalancer"
1212 "github.com/go-kit/kit/log"
13 "github.com/samuel/go-zookeeper/zk"
1314 )
1415
1516 var (
2122 func TestPublisher(t *testing.T) {
2223 client := newFakeClient()
2324
24 p, err := NewPublisher(client, path, NewFactory(""), logger)
25 p, err := NewPublisher(client, path, newFactory(""), logger)
2526 if err != nil {
2627 t.Fatalf("failed to create new publisher: %v", err)
2728 }
3536 func TestBadFactory(t *testing.T) {
3637 client := newFakeClient()
3738
38 p, err := NewPublisher(client, path, NewFactory("kaboom"), logger)
39 p, err := NewPublisher(client, path, newFactory("kaboom"), logger)
3940 if err != nil {
4041 t.Fatalf("failed to create new publisher: %v", err)
4142 }
4546 if err != nil {
4647 t.Fatal(err)
4748 }
48
49 if want, have := 0, len(endpoints); want != have {
50 t.Errorf("want %q, have %q", want, have)
51 }
52 }
53
54 func TestServiceUpdate(t *testing.T) {
55 client := newFakeClient()
56
57 p, err := NewPublisher(client, path, NewFactory(""), logger)
58 if err != nil {
59 t.Fatalf("failed to create new publisher: %v", err)
60 }
61 defer p.Stop()
62
63 endpoints, err := p.Endpoints()
64 if err != nil {
65 t.Fatal(err)
66 }
67 if want, have := 0, len(endpoints); want != have {
68 t.Errorf("want %q, have %q", want, have)
69 }
70
7149 // instance1 came online
7250 client.AddService(path+"/instance1", "zookeeper_node_data")
7351
52 if want, have := 0, len(endpoints); want != have {
53 t.Errorf("want %d, have %d", want, have)
54 }
55 }
56
57 func TestServiceUpdate(t *testing.T) {
58 client := newFakeClient()
59
60 p, err := NewPublisher(client, path, newFactory(""), logger)
61 if err != nil {
62 t.Fatalf("failed to create new publisher: %v", err)
63 }
64 defer p.Stop()
65
66 endpoints, err := p.Endpoints()
67 if err != nil {
68 t.Fatal(err)
69 }
70 if want, have := 0, len(endpoints); want != have {
71 t.Errorf("want %d, have %d", want, have)
72 }
73
74 // instance1 came online
75 client.AddService(path+"/instance1", "zookeeper_node_data")
76
7477 // test if we received the instance
7578 endpoints, err = p.Endpoints()
7679 if err != nil {
7780 t.Fatal(err)
7881 }
7982 if want, have := 1, len(endpoints); want != have {
80 t.Errorf("want %q, have %q", want, have)
83 t.Errorf("want %d, have %d", want, have)
8184 }
8285
8386 // instance2 came online
8992 t.Fatal(err)
9093 }
9194 if want, have := 2, len(endpoints); want != have {
92 t.Errorf("want %q, have %q", want, have)
95 t.Errorf("want %d, have %d", want, have)
9396 }
9497
9598 // watch triggers an error...
101104 t.Fatal(err)
102105 }
103106 if want, have := 2, len(endpoints); want != have {
104 t.Errorf("want %q, have %q", want, have)
107 t.Errorf("want %d, have %d", want, have)
105108 }
106109
107110 // instances go offline
113116 t.Fatal(err)
114117 }
115118 if want, have := 0, len(endpoints); want != have {
116 t.Errorf("want %q, have %q", want, have)
119 t.Errorf("want %d, have %d", want, have)
120 }
121 }
122
123 func TestBadPublisherCreate(t *testing.T) {
124 client := newFakeClient()
125 client.SendErrorOnWatch()
126 p, err := NewPublisher(client, path, newFactory(""), logger)
127 if err == nil {
128 t.Errorf("expected error on new publisher")
129 }
130 if p != nil {
131 t.Errorf("expected publisher not to be created")
132 }
133 p, err = NewPublisher(client, "BadPath", newFactory(""), logger)
134 if err == nil {
135 t.Errorf("expected error on new publisher")
136 }
137 if p != nil {
138 t.Errorf("expected publisher not to be created")
117139 }
118140 }
119141
120142 type fakeClient struct {
121 ch chan Event
143 ch chan zk.Event
122144 responses map[string]string
123145 result bool
124146 }
125147
126148 func newFakeClient() *fakeClient {
127149 return &fakeClient{
128 make(chan Event, 1),
150 make(chan zk.Event, 1),
129151 make(map[string]string),
130152 true,
131153 }
132154 }
133155
134156 func (c *fakeClient) CreateParentNodes(path string) error {
157 if path == "BadPath" {
158 return errors.New("Dummy Error")
159 }
135160 return nil
136161 }
137162
138 func (c *fakeClient) GetEntries(path string) ([]string, <-chan Event, error) {
163 func (c *fakeClient) GetEntries(path string) ([]string, <-chan zk.Event, error) {
139164 responses := []string{}
140165 if c.result == false {
141166 c.result = true
164189
165190 func (c *fakeClient) Stop() {}
166191
167 func NewFactory(fakeError string) loadbalancer.Factory {
192 func newFactory(fakeError string) loadbalancer.Factory {
168193 return func(string) (endpoint.Endpoint, io.Closer, error) {
169194 if fakeError == "" {
170195 return e, nil, nil
174199 }
175200
176201 func (c *fakeClient) triggerWatch() {
177 c.ch <- Event{true}
202 c.ch <- zk.Event{}
178203 // watches on ZooKeeper Nodes trigger once, most ZooKeeper libraries also
179204 // implement "fire once" channels for these watches
180205 close(c.ch)
181 c.ch = make(chan Event, 1)
206 c.ch = make(chan zk.Event, 1)
182207
183208 // make sure we allow the Publisher to handle this update
184209 time.Sleep(1 * time.Millisecond)