Package list golang-github-go-kit-kit / f5bc66e
Add more tests Yuri Shkuro 4 years ago
7 changed file(s) with 459 addition(s) and 378 deletion(s). Raw diff Collapse all Expand all
66 set -e
77
88 function go_files { find . -name '*_test.go' ; }
9 function filter { grep -v -e '/_' ; }
9 function filter { grep -v '/_' ; }
1010 function remove_relative_prefix { sed -e 's/^\.\///g' ; }
1111
1212 function directories {
+0
-143
sd/cache.go less more
0 package sd
1
2 import (
3 "io"
4 "sort"
5 "sync"
6 "time"
7
8 "github.com/go-kit/kit/endpoint"
9 "github.com/go-kit/kit/log"
10 )
11
12 // endpointCache collects the most recent set of instances from a service discovery
13 // system, creates endpoints for them using a factory function, and makes
14 // them available to consumers.
15 type endpointCache struct {
16 options endpointerOptions
17 mtx sync.RWMutex
18 factory Factory
19 cache map[string]endpointCloser
20 err error
21 endpoints []endpoint.Endpoint
22 logger log.Logger
23 invalidateDeadline time.Time
24 timeNow func() time.Time
25 }
26
27 type endpointCloser struct {
28 endpoint.Endpoint
29 io.Closer
30 }
31
32 // newEndpointCache returns a new, empty endpointCache.
33 func newEndpointCache(factory Factory, logger log.Logger, options endpointerOptions) *endpointCache {
34 return &endpointCache{
35 options: options,
36 factory: factory,
37 cache: map[string]endpointCloser{},
38 logger: logger,
39 timeNow: time.Now,
40 }
41 }
42
43 // Update should be invoked by clients with a complete set of current instance
44 // strings whenever that set changes. The cache manufactures new endpoints via
45 // the factory, closes old endpoints when they disappear, and persists existing
46 // endpoints if they survive through an update.
47 func (c *endpointCache) Update(event Event) {
48 c.mtx.Lock()
49 defer c.mtx.Unlock()
50
51 // Happy path.
52 if event.Err == nil {
53 c.updateCache(event.Instances)
54 c.err = nil
55 return
56 }
57
58 // Sad path. Something's gone wrong in sd.
59 c.logger.Log("err", event.Err)
60 if !c.options.invalidateOnError {
61 return // keep returning the last known endpoints on error
62 }
63 if c.err != nil {
64 return // already in the error state, do nothing & keep original error
65 }
66 c.err = event.Err
67 // set new deadline to invalidate Endpoints unless non-error Event is received
68 c.invalidateDeadline = c.timeNow().Add(c.options.invalidateTimeout)
69 return
70 }
71
72 func (c *endpointCache) updateCache(instances []string) {
73 // Deterministic order (for later).
74 sort.Strings(instances)
75
76 // Produce the current set of services.
77 cache := make(map[string]endpointCloser, len(instances))
78 for _, instance := range instances {
79 // If it already exists, just copy it over.
80 if sc, ok := c.cache[instance]; ok {
81 cache[instance] = sc
82 delete(c.cache, instance)
83 continue
84 }
85
86 // If it doesn't exist, create it.
87 service, closer, err := c.factory(instance)
88 if err != nil {
89 c.logger.Log("instance", instance, "err", err)
90 continue
91 }
92 cache[instance] = endpointCloser{service, closer}
93 }
94
95 // Close any leftover endpoints.
96 for _, sc := range c.cache {
97 if sc.Closer != nil {
98 sc.Closer.Close()
99 }
100 }
101
102 // Populate the slice of endpoints.
103 endpoints := make([]endpoint.Endpoint, 0, len(cache))
104 for _, instance := range instances {
105 // A bad factory may mean an instance is not present.
106 if _, ok := cache[instance]; !ok {
107 continue
108 }
109 endpoints = append(endpoints, cache[instance].Endpoint)
110 }
111
112 // Swap and trigger GC for old copies.
113 c.endpoints = endpoints
114 c.cache = cache
115 }
116
117 // Endpoints yields the current set of (presumably identical) endpoints, ordered
118 // lexicographically by the corresponding instance string.
119 func (c *endpointCache) Endpoints() ([]endpoint.Endpoint, error) {
120 // in the steady state we're going to have many goroutines calling Endpoints()
121 // concurrently, so to minimize contention we use a shared R-lock.
122 c.mtx.RLock()
123
124 if c.err == nil || c.timeNow().Before(c.invalidateDeadline) {
125 defer c.mtx.RUnlock()
126 return c.endpoints, nil
127 }
128
129 c.mtx.RUnlock()
130
131 // in case of an error, switch to an exclusive lock.
132 c.mtx.Lock()
133 defer c.mtx.Unlock()
134
135 // re-check condition due to a race between RUnlock() and Lock().
136 if c.err == nil || c.timeNow().Before(c.invalidateDeadline) {
137 return c.endpoints, nil
138 }
139
140 c.updateCache(nil) // close any remaining active endpoints
141 return nil, c.err
142 }
+0
-180
sd/cache_test.go less more
0 package sd
1
2 import (
3 "errors"
4 "io"
5 "testing"
6 "time"
7
8 "github.com/go-kit/kit/endpoint"
9 "github.com/go-kit/kit/log"
10 )
11
12 func TestCache(t *testing.T) {
13 var (
14 ca = make(closer)
15 cb = make(closer)
16 c = map[string]io.Closer{"a": ca, "b": cb}
17 f = func(instance string) (endpoint.Endpoint, io.Closer, error) { return endpoint.Nop, c[instance], nil }
18 cache = newEndpointCache(f, log.NewNopLogger(), endpointerOptions{})
19 )
20
21 // Populate
22 cache.Update(Event{Instances: []string{"a", "b"}})
23 select {
24 case <-ca:
25 t.Errorf("endpoint a closed, not good")
26 case <-cb:
27 t.Errorf("endpoint b closed, not good")
28 case <-time.After(time.Millisecond):
29 t.Logf("no closures yet, good")
30 }
31 assertEndpointsLen(t, cache, 2)
32
33 // Duplicate, should be no-op
34 cache.Update(Event{Instances: []string{"a", "b"}})
35 select {
36 case <-ca:
37 t.Errorf("endpoint a closed, not good")
38 case <-cb:
39 t.Errorf("endpoint b closed, not good")
40 case <-time.After(time.Millisecond):
41 t.Logf("no closures yet, good")
42 }
43 assertEndpointsLen(t, cache, 2)
44
45 // Error, should continue returning old endpoints
46 cache.Update(Event{Err: errors.New("sd error")})
47 select {
48 case <-ca:
49 t.Errorf("endpoint a closed, not good")
50 case <-cb:
51 t.Errorf("endpoint b closed, not good")
52 case <-time.After(time.Millisecond):
53 t.Logf("no closures yet, good")
54 }
55 assertEndpointsLen(t, cache, 2)
56
57 // Delete b
58 go cache.Update(Event{Instances: []string{"a"}})
59 select {
60 case <-ca:
61 t.Errorf("endpoint a closed, not good")
62 case <-cb:
63 t.Logf("endpoint b closed, good")
64 case <-time.After(time.Second):
65 t.Errorf("didn't close the deleted instance in time")
66 }
67 assertEndpointsLen(t, cache, 1)
68
69 // Delete a
70 go cache.Update(Event{Instances: []string{}})
71 select {
72 // case <-cb: will succeed, as it's closed
73 case <-ca:
74 t.Logf("endpoint a closed, good")
75 case <-time.After(time.Second):
76 t.Errorf("didn't close the deleted instance in time")
77 }
78 assertEndpointsLen(t, cache, 0)
79 }
80
81 func TestCacheErrorAndTimeout(t *testing.T) {
82 var (
83 ca = make(closer)
84 cb = make(closer)
85 c = map[string]io.Closer{"a": ca, "b": cb}
86 f = func(instance string) (endpoint.Endpoint, io.Closer, error) { return endpoint.Nop, c[instance], nil }
87 timeOut = 100 * time.Millisecond
88 cache = newEndpointCache(f, log.NewNopLogger(), endpointerOptions{
89 invalidateOnError: true,
90 invalidateTimeout: timeOut,
91 })
92 )
93
94 timeNow := time.Now()
95 cache.timeNow = func() time.Time { return timeNow }
96
97 // Populate
98 cache.Update(Event{Instances: []string{"a"}})
99 select {
100 case <-ca:
101 t.Errorf("endpoint a closed, not good")
102 case <-time.After(time.Millisecond):
103 t.Logf("no closures yet, good")
104 }
105 assertEndpointsLen(t, cache, 1)
106
107 // Send error, keep time still.
108 cache.Update(Event{Err: errors.New("sd error")})
109 select {
110 case <-ca:
111 t.Errorf("endpoint a closed, not good")
112 case <-time.After(time.Millisecond):
113 t.Logf("no closures yet, good")
114 }
115 assertEndpointsLen(t, cache, 1)
116
117 // Move the time, but less than the timeout
118 timeNow = timeNow.Add(timeOut / 2)
119 assertEndpointsLen(t, cache, 1)
120 select {
121 case <-ca:
122 t.Errorf("endpoint a closed, not good")
123 case <-time.After(time.Millisecond):
124 t.Logf("no closures yet, good")
125 }
126
127 // Move the time past the timeout
128 timeNow = timeNow.Add(timeOut)
129 assertEndpointsError(t, cache, "sd error")
130 select {
131 case <-ca:
132 t.Logf("endpoint a closed, good")
133 case <-time.After(time.Millisecond):
134 t.Errorf("didn't close the deleted instance in time")
135 }
136
137 // Send another error
138 cache.Update(Event{Err: errors.New("another sd error")})
139 assertEndpointsError(t, cache, "sd error") // expect original error
140 }
141
142 func TestBadFactory(t *testing.T) {
143 cache := newEndpointCache(func(string) (endpoint.Endpoint, io.Closer, error) {
144 return nil, nil, errors.New("bad factory")
145 }, log.NewNopLogger(), endpointerOptions{})
146
147 cache.Update(Event{Instances: []string{"foo:1234", "bar:5678"}})
148 assertEndpointsLen(t, cache, 0)
149 }
150
151 func assertEndpointsLen(t *testing.T, cache *endpointCache, l int) {
152 endpoints, err := cache.Endpoints()
153 if err != nil {
154 t.Errorf("unexpected error %v", err)
155 return
156 }
157 if want, have := l, len(endpoints); want != have {
158 t.Errorf("want %d, have %d", want, have)
159 }
160 }
161
162 func assertEndpointsError(t *testing.T, cache *endpointCache, wantErr string) {
163 endpoints, err := cache.Endpoints()
164 if err == nil {
165 t.Errorf("expecting error, not good")
166 return
167 }
168 if want, have := wantErr, err.Error(); want != have {
169 t.Errorf("want %s, have %s", want, have)
170 return
171 }
172 if want, have := 0, len(endpoints); want != have {
173 t.Errorf("want %d, have %d", want, have)
174 }
175 }
176
177 type closer chan struct{}
178
179 func (c closer) Close() error { close(c); return nil }
0 package sd
1
2 import (
3 "io"
4 "sort"
5 "sync"
6 "time"
7
8 "github.com/go-kit/kit/endpoint"
9 "github.com/go-kit/kit/log"
10 )
11
12 // endpointCache collects the most recent set of instances from a service discovery
13 // system, creates endpoints for them using a factory function, and makes
14 // them available to consumers.
15 type endpointCache struct {
16 options endpointerOptions
17 mtx sync.RWMutex
18 factory Factory
19 cache map[string]endpointCloser
20 err error
21 endpoints []endpoint.Endpoint
22 logger log.Logger
23 invalidateDeadline time.Time
24 timeNow func() time.Time
25 }
26
27 type endpointCloser struct {
28 endpoint.Endpoint
29 io.Closer
30 }
31
32 // newEndpointCache returns a new, empty endpointCache.
33 func newEndpointCache(factory Factory, logger log.Logger, options endpointerOptions) *endpointCache {
34 return &endpointCache{
35 options: options,
36 factory: factory,
37 cache: map[string]endpointCloser{},
38 logger: logger,
39 timeNow: time.Now,
40 }
41 }
42
43 // Update should be invoked by clients with a complete set of current instance
44 // strings whenever that set changes. The cache manufactures new endpoints via
45 // the factory, closes old endpoints when they disappear, and persists existing
46 // endpoints if they survive through an update.
47 func (c *endpointCache) Update(event Event) {
48 c.mtx.Lock()
49 defer c.mtx.Unlock()
50
51 // Happy path.
52 if event.Err == nil {
53 c.updateCache(event.Instances)
54 c.err = nil
55 return
56 }
57
58 // Sad path. Something's gone wrong in sd.
59 c.logger.Log("err", event.Err)
60 if !c.options.invalidateOnError {
61 return // keep returning the last known endpoints on error
62 }
63 if c.err != nil {
64 return // already in the error state, do nothing & keep original error
65 }
66 c.err = event.Err
67 // set new deadline to invalidate Endpoints unless non-error Event is received
68 c.invalidateDeadline = c.timeNow().Add(c.options.invalidateTimeout)
69 return
70 }
71
72 func (c *endpointCache) updateCache(instances []string) {
73 // Deterministic order (for later).
74 sort.Strings(instances)
75
76 // Produce the current set of services.
77 cache := make(map[string]endpointCloser, len(instances))
78 for _, instance := range instances {
79 // If it already exists, just copy it over.
80 if sc, ok := c.cache[instance]; ok {
81 cache[instance] = sc
82 delete(c.cache, instance)
83 continue
84 }
85
86 // If it doesn't exist, create it.
87 service, closer, err := c.factory(instance)
88 if err != nil {
89 c.logger.Log("instance", instance, "err", err)
90 continue
91 }
92 cache[instance] = endpointCloser{service, closer}
93 }
94
95 // Close any leftover endpoints.
96 for _, sc := range c.cache {
97 if sc.Closer != nil {
98 sc.Closer.Close()
99 }
100 }
101
102 // Populate the slice of endpoints.
103 endpoints := make([]endpoint.Endpoint, 0, len(cache))
104 for _, instance := range instances {
105 // A bad factory may mean an instance is not present.
106 if _, ok := cache[instance]; !ok {
107 continue
108 }
109 endpoints = append(endpoints, cache[instance].Endpoint)
110 }
111
112 // Swap and trigger GC for old copies.
113 c.endpoints = endpoints
114 c.cache = cache
115 }
116
117 // Endpoints yields the current set of (presumably identical) endpoints, ordered
118 // lexicographically by the corresponding instance string.
119 func (c *endpointCache) Endpoints() ([]endpoint.Endpoint, error) {
120 // in the steady state we're going to have many goroutines calling Endpoints()
121 // concurrently, so to minimize contention we use a shared R-lock.
122 c.mtx.RLock()
123
124 if c.err == nil || c.timeNow().Before(c.invalidateDeadline) {
125 defer c.mtx.RUnlock()
126 return c.endpoints, nil
127 }
128
129 c.mtx.RUnlock()
130
131 // in case of an error, switch to an exclusive lock.
132 c.mtx.Lock()
133 defer c.mtx.Unlock()
134
135 // re-check condition due to a race between RUnlock() and Lock().
136 if c.err == nil || c.timeNow().Before(c.invalidateDeadline) {
137 return c.endpoints, nil
138 }
139
140 c.updateCache(nil) // close any remaining active endpoints
141 return nil, c.err
142 }
0 package sd
1
2 import (
3 "errors"
4 "io"
5 "testing"
6 "time"
7
8 "github.com/go-kit/kit/endpoint"
9 "github.com/go-kit/kit/log"
10 )
11
12 func TestEndpointCache(t *testing.T) {
13 var (
14 ca = make(closer)
15 cb = make(closer)
16 c = map[string]io.Closer{"a": ca, "b": cb}
17 f = func(instance string) (endpoint.Endpoint, io.Closer, error) { return endpoint.Nop, c[instance], nil }
18 cache = newEndpointCache(f, log.NewNopLogger(), endpointerOptions{})
19 )
20
21 // Populate
22 cache.Update(Event{Instances: []string{"a", "b"}})
23 select {
24 case <-ca:
25 t.Errorf("endpoint a closed, not good")
26 case <-cb:
27 t.Errorf("endpoint b closed, not good")
28 case <-time.After(time.Millisecond):
29 t.Logf("no closures yet, good")
30 }
31 assertEndpointsLen(t, cache, 2)
32
33 // Duplicate, should be no-op
34 cache.Update(Event{Instances: []string{"a", "b"}})
35 select {
36 case <-ca:
37 t.Errorf("endpoint a closed, not good")
38 case <-cb:
39 t.Errorf("endpoint b closed, not good")
40 case <-time.After(time.Millisecond):
41 t.Logf("no closures yet, good")
42 }
43 assertEndpointsLen(t, cache, 2)
44
45 // Error, should continue returning old endpoints
46 cache.Update(Event{Err: errors.New("sd error")})
47 select {
48 case <-ca:
49 t.Errorf("endpoint a closed, not good")
50 case <-cb:
51 t.Errorf("endpoint b closed, not good")
52 case <-time.After(time.Millisecond):
53 t.Logf("no closures yet, good")
54 }
55 assertEndpointsLen(t, cache, 2)
56
57 // Delete b
58 go cache.Update(Event{Instances: []string{"a"}})
59 select {
60 case <-ca:
61 t.Errorf("endpoint a closed, not good")
62 case <-cb:
63 t.Logf("endpoint b closed, good")
64 case <-time.After(time.Second):
65 t.Errorf("didn't close the deleted instance in time")
66 }
67 assertEndpointsLen(t, cache, 1)
68
69 // Delete a
70 go cache.Update(Event{Instances: []string{}})
71 select {
72 // case <-cb: will succeed, as it's closed
73 case <-ca:
74 t.Logf("endpoint a closed, good")
75 case <-time.After(time.Second):
76 t.Errorf("didn't close the deleted instance in time")
77 }
78 assertEndpointsLen(t, cache, 0)
79 }
80
81 func TestEndpointCacheErrorAndTimeout(t *testing.T) {
82 var (
83 ca = make(closer)
84 cb = make(closer)
85 c = map[string]io.Closer{"a": ca, "b": cb}
86 f = func(instance string) (endpoint.Endpoint, io.Closer, error) { return endpoint.Nop, c[instance], nil }
87 timeOut = 100 * time.Millisecond
88 cache = newEndpointCache(f, log.NewNopLogger(), endpointerOptions{
89 invalidateOnError: true,
90 invalidateTimeout: timeOut,
91 })
92 )
93
94 timeNow := time.Now()
95 cache.timeNow = func() time.Time { return timeNow }
96
97 // Populate
98 cache.Update(Event{Instances: []string{"a"}})
99 select {
100 case <-ca:
101 t.Errorf("endpoint a closed, not good")
102 case <-time.After(time.Millisecond):
103 t.Logf("no closures yet, good")
104 }
105 assertEndpointsLen(t, cache, 1)
106
107 // Send error, keep time still.
108 cache.Update(Event{Err: errors.New("sd error")})
109 select {
110 case <-ca:
111 t.Errorf("endpoint a closed, not good")
112 case <-time.After(time.Millisecond):
113 t.Logf("no closures yet, good")
114 }
115 assertEndpointsLen(t, cache, 1)
116
117 // Move the time, but less than the timeout
118 timeNow = timeNow.Add(timeOut / 2)
119 assertEndpointsLen(t, cache, 1)
120 select {
121 case <-ca:
122 t.Errorf("endpoint a closed, not good")
123 case <-time.After(time.Millisecond):
124 t.Logf("no closures yet, good")
125 }
126
127 // Move the time past the timeout
128 timeNow = timeNow.Add(timeOut)
129 assertEndpointsError(t, cache, "sd error")
130 select {
131 case <-ca:
132 t.Logf("endpoint a closed, good")
133 case <-time.After(time.Millisecond):
134 t.Errorf("didn't close the deleted instance in time")
135 }
136
137 // Send another error
138 cache.Update(Event{Err: errors.New("another sd error")})
139 assertEndpointsError(t, cache, "sd error") // expect original error
140 }
141
142 func TestBadFactory(t *testing.T) {
143 cache := newEndpointCache(func(string) (endpoint.Endpoint, io.Closer, error) {
144 return nil, nil, errors.New("bad factory")
145 }, log.NewNopLogger(), endpointerOptions{})
146
147 cache.Update(Event{Instances: []string{"foo:1234", "bar:5678"}})
148 assertEndpointsLen(t, cache, 0)
149 }
150
151 func assertEndpointsLen(t *testing.T, cache *endpointCache, l int) {
152 endpoints, err := cache.Endpoints()
153 if err != nil {
154 t.Errorf("unexpected error %v", err)
155 return
156 }
157 if want, have := l, len(endpoints); want != have {
158 t.Errorf("want %d, have %d", want, have)
159 }
160 }
161
162 func assertEndpointsError(t *testing.T, cache *endpointCache, wantErr string) {
163 endpoints, err := cache.Endpoints()
164 if err == nil {
165 t.Errorf("expecting error, not good")
166 return
167 }
168 if want, have := wantErr, err.Error(); want != have {
169 t.Errorf("want %s, have %s", want, have)
170 return
171 }
172 if want, have := 0, len(endpoints); want != have {
173 t.Errorf("want %d, have %d", want, have)
174 }
175 }
176
177 type closer chan struct{}
178
179 func (c closer) Close() error { close(c); return nil }
0 package sd_test
1
2 import (
3 "io"
4 "testing"
5 "time"
6
7 "github.com/go-kit/kit/endpoint"
8 "github.com/go-kit/kit/log"
9 "github.com/go-kit/kit/sd"
10 "github.com/go-kit/kit/sd/internal/instance"
11 )
12
13 func TestDefaultEndpointer(t *testing.T) {
14 var (
15 ca = make(closer)
16 cb = make(closer)
17 c = map[string]io.Closer{"a": ca, "b": cb}
18 f = func(instance string) (endpoint.Endpoint, io.Closer, error) {
19 return endpoint.Nop, c[instance], nil
20 }
21 instancer = &mockInstancer{
22 cache: instance.NewCache(),
23 }
24 )
25 // set initial state
26 instancer.Update(sd.Event{Instances: []string{"a", "b"}})
27
28 endpointer := sd.NewEndpointer(instancer, f, log.NewNopLogger(), sd.InvalidateOnError(time.Minute))
29 if endpoints, err := endpointer.Endpoints(); err != nil {
30 t.Errorf("unepected error %v", err)
31 } else if want, have := 2, len(endpoints); want != have {
32 t.Errorf("want %d, have %d", want, have)
33 }
34
35 instancer.Update(sd.Event{Instances: []string{}})
36 select {
37 case <-ca:
38 t.Logf("endpoint a closed, good")
39 case <-time.After(time.Millisecond):
40 t.Errorf("didn't close the deleted instance in time")
41 }
42 select {
43 case <-cb:
44 t.Logf("endpoint b closed, good")
45 case <-time.After(time.Millisecond):
46 t.Errorf("didn't close the deleted instance in time")
47 }
48 if endpoints, err := endpointer.Endpoints(); err != nil {
49 t.Errorf("unepected error %v", err)
50 } else if want, have := 0, len(endpoints); want != have {
51 t.Errorf("want %d, have %d", want, have)
52 }
53
54 endpointer.Close()
55 instancer.Update(sd.Event{Instances: []string{"a"}})
56 // TODO verify that on Close the endpointer fully disconnects from the instancer.
57 // Unfortunately, because we use instance.Cache, this test cannot be in the sd package,
58 // and therefore does not have access to the endpointer's private members.
59 }
60
61 type mockInstancer struct {
62 cache *instance.Cache
63 }
64
65 func (m *mockInstancer) Update(event sd.Event) {
66 m.cache.Update(event)
67 }
68
69 func (m *mockInstancer) Register(ch chan<- sd.Event) {
70 m.cache.Register(ch)
71 }
72
73 func (m *mockInstancer) Deregister(ch chan<- sd.Event) {
74 m.cache.Deregister(ch)
75 }
76
77 type closer chan struct{}
78
79 func (c closer) Close() error { close(c); return nil }
00 package instance
11
22 import (
3 "sync"
3 "reflect"
44 "testing"
5 "time"
56
67 "github.com/go-kit/kit/sd"
78 )
89
910 var _ sd.Instancer = &Cache{} // API check
1011
12 // The test verifies the following:
13 // registering causes initial notification of the current state
14 // notifications delivered to two receivers
15 // identical notifications cause no updates
16 // different update causes new notification
17 // instances are sorted
18 // no updates after de-registering
1119 func TestCache(t *testing.T) {
12 // TODO this test is not finished yet
20 e1 := sd.Event{Instances: []string{"y", "x"}} // not sorted
21 e2 := sd.Event{Instances: []string{"c", "a", "b"}}
1322
1423 c := NewCache()
15
16 {
17 state := c.State()
18 if want, have := 0, len(state.Instances); want != have {
19 t.Fatalf("want %v instances, have %v", want, have)
20 }
24 if want, have := 0, len(c.State().Instances); want != have {
25 t.Fatalf("want %v instances, have %v", want, have)
2126 }
2227
23 notification1 := sd.Event{Instances: []string{"x", "y"}}
24 notification2 := sd.Event{Instances: []string{"a", "b", "c"}}
25
26 c.Update(notification1)
27
28 // times 2 because we have two observers
29 expectedInstances := 2 * (len(notification1.Instances) + len(notification2.Instances))
30
31 wg := sync.WaitGroup{}
32 wg.Add(expectedInstances)
33
34 receiver := func(ch chan sd.Event) {
35 for state := range ch {
36 // count total number of instances received
37 for range state.Instances {
38 wg.Done()
39 }
40 }
28 c.Update(e1) // sets initial state
29 if want, have := 2, len(c.State().Instances); want != have {
30 t.Fatalf("want %v instances, have %v", want, have)
4131 }
4232
43 f1 := make(chan sd.Event)
44 f2 := make(chan sd.Event)
45 go receiver(f1)
46 go receiver(f2)
33 r1 := make(chan sd.Event)
34 go c.Register(r1)
35 expectUpdate(t, r1, []string{"x", "y"})
4736
48 c.Register(f1)
49 c.Register(f2)
37 r2 := make(chan sd.Event)
38 go c.Register(r2)
39 expectUpdate(t, r2, []string{"x", "y"})
5040
51 c.Update(notification1)
52 c.Update(notification2)
41 // send the same instances but in different order.
42 // because it's a duplicate it should not cause new notification.
43 // if it did, this call would deadlock trying to send to channels with no readers
44 c.Update(sd.Event{Instances: []string{"x", "y"}})
45 expectNoUpdate(t, r1)
46 expectNoUpdate(t, r2)
5347
54 // if state := c.State(); instances == nil {
55 // if want, have := len(notification2), len(instances); want != have {
56 // t.Errorf("want length %v, have %v", want, have)
57 // } else {
58 // for i := range notification2 {
59 // if want, have := notification2[i], instances[i]; want != have {
60 // t.Errorf("want instance %v, have %v", want, have)
61 // }
62 // }
63 // }
64 // }
48 go c.Update(e2) // different set
49 expectUpdate(t, r1, []string{"a", "b", "c"})
50 expectUpdate(t, r2, []string{"a", "b", "c"})
6551
66 close(f1)
67 close(f2)
52 c.Deregister(r1)
53 c.Deregister(r2)
54 close(r1)
55 close(r2)
56 // if deregister didn't work, Update would panic on closed channels
57 c.Update(e1)
58 }
6859
69 wg.Wait()
60 func expectUpdate(t *testing.T, r chan sd.Event, expect []string) {
61 select {
62 case e := <-r:
63 if want, have := expect, e.Instances; !reflect.DeepEqual(want, have) {
64 t.Fatalf("want: %v, have: %v", want, have)
65 }
66 case <-time.After(time.Second):
67 t.Fatalf("did not receive expected update")
68 }
69 }
7070
71 // d.Deregister(f1)
72
73 // d.Unregister(f2)
74 // if want, have := 0, len(d.observers); want != have {
75 // t.Fatalf("want %v observers, have %v", want, have)
76 // }
71 func expectNoUpdate(t *testing.T, r chan sd.Event) {
72 select {
73 case e := <-r:
74 t.Errorf("received unexpected update %v", e)
75 case <-time.After(time.Millisecond):
76 return // as expected
77 }
7778 }