Codebase list golang-github-go-kit-kit / e202196
Test Fixes for ZooKeeper Publisher (race conditions and multi channel close panic) Bas van Beek 8 years ago
2 changed file(s) with 120 addition(s) and 88 deletion(s). Raw diff Collapse all Expand all
2424 if err == nil {
2525 t.Errorf("expected error, got nil")
2626 }
27 hasFired := false
2728 calledEventHandler := make(chan struct{})
2829 eventHandler := func(event stdzk.Event) {
29 close(calledEventHandler)
30 if !hasFired {
31 // test is successful if this function has fired at least once
32 hasFired = true
33 close(calledEventHandler)
34 }
3035 }
3136 c, err = NewClient(
3237 []string{"localhost"},
11
22 import (
33 "errors"
4 "fmt"
45 "io"
6 "sync"
57 "testing"
68 "time"
79
4244 }
4345 defer p.Stop()
4446
47 // instance1 came online
48 client.AddService(path+"/instance1", "kaboom")
49
50 // instance2 came online
51 client.AddService(path+"/instance2", "zookeeper_node_data")
52
53 if err = asyncTest(100*time.Millisecond, 1, p); err != nil {
54 t.Error(err)
55 }
56 }
57
58 func TestServiceUpdate(t *testing.T) {
59 client := newFakeClient()
60
61 p, err := NewPublisher(client, path, newFactory(""), logger)
62 if err != nil {
63 t.Fatalf("failed to create new publisher: %v", err)
64 }
65 defer p.Stop()
66
4567 endpoints, err := p.Endpoints()
4668 if err != nil {
4769 t.Fatal(err)
4870 }
71
72 if want, have := 0, len(endpoints); want != have {
73 t.Errorf("want %d, have %d", want, have)
74 }
75
4976 // instance1 came online
5077 client.AddService(path+"/instance1", "zookeeper_node_data")
5178
52 if want, have := 0, len(endpoints); want != have {
53 t.Errorf("want %d, have %d", want, have)
54 }
55 }
56
57 func TestServiceUpdate(t *testing.T) {
58 client := newFakeClient()
59
60 p, err := NewPublisher(client, path, newFactory(""), logger)
61 if err != nil {
62 t.Fatalf("failed to create new publisher: %v", err)
63 }
64 defer p.Stop()
65
66 endpoints, err := p.Endpoints()
67 if err != nil {
68 t.Fatal(err)
69 }
70 if want, have := 0, len(endpoints); want != have {
71 t.Errorf("want %d, have %d", want, have)
72 }
73
74 // instance1 came online
75 client.AddService(path+"/instance1", "zookeeper_node_data")
76
77 // test if we received the instance
78 endpoints, err = p.Endpoints()
79 if err != nil {
80 t.Fatal(err)
81 }
82 if want, have := 1, len(endpoints); want != have {
83 t.Errorf("want %d, have %d", want, have)
84 }
85
8679 // instance2 came online
8780 client.AddService(path+"/instance2", "zookeeper_node_data2")
8881
89 // test if we received the instance
90 endpoints, err = p.Endpoints()
91 if err != nil {
92 t.Fatal(err)
93 }
94 if want, have := 2, len(endpoints); want != have {
95 t.Errorf("want %d, have %d", want, have)
82 // we should have 2 instances
83 if err = asyncTest(100*time.Millisecond, 2, p); err != nil {
84 t.Error(err)
9685 }
9786
9887 // watch triggers an error...
9988 client.SendErrorOnWatch()
10089
101 // test if we ignored the empty instance response due to the error
102 endpoints, err = p.Endpoints()
103 if err != nil {
104 t.Fatal(err)
105 }
106 if want, have := 2, len(endpoints); want != have {
107 t.Errorf("want %d, have %d", want, have)
108 }
109
110 // instances go offline
90 // test if error was consumed
91 if err = client.ErrorIsConsumed(100 * time.Millisecond); err != nil {
92 t.Error(err)
93 }
94
95 // instance3 came online
96 client.AddService(path+"/instance3", "zookeeper_node_data3")
97
98 // we should have 3 instances
99 if err = asyncTest(100*time.Millisecond, 3, p); err != nil {
100 t.Error(err)
101 }
102
103 // instance1 goes offline
111104 client.RemoveService(path + "/instance1")
105
106 // instance2 goes offline
112107 client.RemoveService(path + "/instance2")
113108
114 endpoints, err = p.Endpoints()
115 if err != nil {
116 t.Fatal(err)
117 }
118 if want, have := 0, len(endpoints); want != have {
119 t.Errorf("want %d, have %d", want, have)
109 // we should have 1 instance
110 if err = asyncTest(100*time.Millisecond, 1, p); err != nil {
111 t.Error(err)
120112 }
121113 }
122114
125117 client.SendErrorOnWatch()
126118 p, err := NewPublisher(client, path, newFactory(""), logger)
127119 if err == nil {
128 t.Errorf("expected error on new publisher")
120 t.Error("expected error on new publisher")
129121 }
130122 if p != nil {
131 t.Errorf("expected publisher not to be created")
123 t.Error("expected publisher not to be created")
132124 }
133125 p, err = NewPublisher(client, "BadPath", newFactory(""), logger)
134126 if err == nil {
135 t.Errorf("expected error on new publisher")
127 t.Error("expected error on new publisher")
136128 }
137129 if p != nil {
138 t.Errorf("expected publisher not to be created")
130 t.Error("expected publisher not to be created")
139131 }
140132 }
141133
142134 type fakeClient struct {
135 mtx sync.Mutex
143136 ch chan zk.Event
144137 responses map[string]string
145138 result bool
147140
148141 func newFakeClient() *fakeClient {
149142 return &fakeClient{
150 make(chan zk.Event, 1),
151 make(map[string]string),
152 true,
143 ch: make(chan zk.Event, 5),
144 responses: make(map[string]string),
145 result: true,
153146 }
154147 }
155148
161154 }
162155
163156 func (c *fakeClient) GetEntries(path string) ([]string, <-chan zk.Event, error) {
164 responses := []string{}
157 c.mtx.Lock()
158 defer c.mtx.Unlock()
165159 if c.result == false {
166160 c.result = true
167 return responses, c.ch, errors.New("Dummy Error")
168 }
161 return []string{}, c.ch, errors.New("Dummy Error")
162 }
163 responses := []string{}
169164 for _, data := range c.responses {
170165 responses = append(responses, data)
171166 }
173168 }
174169
175170 func (c *fakeClient) AddService(node, data string) {
171 c.mtx.Lock()
172 defer c.mtx.Unlock()
176173 c.responses[node] = data
177 c.triggerWatch()
174 c.ch <- zk.Event{}
178175 }
179176
180177 func (c *fakeClient) RemoveService(node string) {
178 c.mtx.Lock()
179 defer c.mtx.Unlock()
181180 delete(c.responses, node)
182 c.triggerWatch()
181 c.ch <- zk.Event{}
183182 }
184183
185184 func (c *fakeClient) SendErrorOnWatch() {
185 c.mtx.Lock()
186 defer c.mtx.Unlock()
186187 c.result = false
187 c.triggerWatch()
188 c.ch <- zk.Event{}
189 }
190
191 func (c *fakeClient) ErrorIsConsumed(t time.Duration) error {
192 timeout := time.After(t)
193 for {
194 select {
195 case <-timeout:
196 return fmt.Errorf("expected error not consumed after timeout %s", t.String())
197 default:
198 c.mtx.Lock()
199 if c.result == false {
200 c.mtx.Unlock()
201 return nil
202 }
203 c.mtx.Unlock()
204 }
205 }
188206 }
189207
190208 func (c *fakeClient) Stop() {}
191209
192210 func newFactory(fakeError string) loadbalancer.Factory {
193 return func(string) (endpoint.Endpoint, io.Closer, error) {
194 if fakeError == "" {
195 return e, nil, nil
211 return func(instance string) (endpoint.Endpoint, io.Closer, error) {
212 if fakeError == instance {
213 return nil, nil, errors.New(fakeError)
196214 }
197 return nil, nil, errors.New(fakeError)
198 }
199 }
200
201 func (c *fakeClient) triggerWatch() {
202 c.ch <- zk.Event{}
203 // watches on ZooKeeper Nodes trigger once, most ZooKeeper libraries also
204 // implement "fire once" channels for these watches
205 close(c.ch)
206 c.ch = make(chan zk.Event, 1)
207
208 // make sure we allow the Publisher to handle this update
209 time.Sleep(1 * time.Millisecond)
210 }
215 return e, nil, nil
216 }
217 }
218
219 func asyncTest(timeout time.Duration, want int, p *Publisher) (err error) {
220 var endpoints []endpoint.Endpoint
221 // want can never be -1
222 have := -1
223 t := time.After(timeout)
224 for {
225 select {
226 case <-t:
227 return fmt.Errorf("want %d, have %d after timeout %s", want, have, timeout.String())
228 default:
229 endpoints, err = p.Endpoints()
230 have = len(endpoints)
231 if err != nil || want == have {
232 return
233 }
234 time.Sleep(time.Millisecond)
235 }
236 }
237 }