diff --git a/coverage.bash b/coverage.bash index 9f70f31..f4b0524 100755 --- a/coverage.bash +++ b/coverage.bash @@ -7,7 +7,7 @@ set -e function go_files { find . -name '*_test.go' ; } -function filter { grep -v -e '/_' ; } +function filter { grep -v '/_' ; } function remove_relative_prefix { sed -e 's/^\.\///g' ; } function directories { diff --git a/sd/cache.go b/sd/cache.go deleted file mode 100644 index 6a4938f..0000000 --- a/sd/cache.go +++ /dev/null @@ -1,143 +0,0 @@ -package sd - -import ( - "io" - "sort" - "sync" - "time" - - "github.com/go-kit/kit/endpoint" - "github.com/go-kit/kit/log" -) - -// endpointCache collects the most recent set of instances from a service discovery -// system, creates endpoints for them using a factory function, and makes -// them available to consumers. -type endpointCache struct { - options endpointerOptions - mtx sync.RWMutex - factory Factory - cache map[string]endpointCloser - err error - endpoints []endpoint.Endpoint - logger log.Logger - invalidateDeadline time.Time - timeNow func() time.Time -} - -type endpointCloser struct { - endpoint.Endpoint - io.Closer -} - -// newEndpointCache returns a new, empty endpointCache. -func newEndpointCache(factory Factory, logger log.Logger, options endpointerOptions) *endpointCache { - return &endpointCache{ - options: options, - factory: factory, - cache: map[string]endpointCloser{}, - logger: logger, - timeNow: time.Now, - } -} - -// Update should be invoked by clients with a complete set of current instance -// strings whenever that set changes. The cache manufactures new endpoints via -// the factory, closes old endpoints when they disappear, and persists existing -// endpoints if they survive through an update. -func (c *endpointCache) Update(event Event) { - c.mtx.Lock() - defer c.mtx.Unlock() - - // Happy path. - if event.Err == nil { - c.updateCache(event.Instances) - c.err = nil - return - } - - // Sad path. Something's gone wrong in sd. - c.logger.Log("err", event.Err) - if !c.options.invalidateOnError { - return // keep returning the last known endpoints on error - } - if c.err != nil { - return // already in the error state, do nothing & keep original error - } - c.err = event.Err - // set new deadline to invalidate Endpoints unless non-error Event is received - c.invalidateDeadline = c.timeNow().Add(c.options.invalidateTimeout) - return -} - -func (c *endpointCache) updateCache(instances []string) { - // Deterministic order (for later). - sort.Strings(instances) - - // Produce the current set of services. - cache := make(map[string]endpointCloser, len(instances)) - for _, instance := range instances { - // If it already exists, just copy it over. - if sc, ok := c.cache[instance]; ok { - cache[instance] = sc - delete(c.cache, instance) - continue - } - - // If it doesn't exist, create it. - service, closer, err := c.factory(instance) - if err != nil { - c.logger.Log("instance", instance, "err", err) - continue - } - cache[instance] = endpointCloser{service, closer} - } - - // Close any leftover endpoints. - for _, sc := range c.cache { - if sc.Closer != nil { - sc.Closer.Close() - } - } - - // Populate the slice of endpoints. - endpoints := make([]endpoint.Endpoint, 0, len(cache)) - for _, instance := range instances { - // A bad factory may mean an instance is not present. - if _, ok := cache[instance]; !ok { - continue - } - endpoints = append(endpoints, cache[instance].Endpoint) - } - - // Swap and trigger GC for old copies. - c.endpoints = endpoints - c.cache = cache -} - -// Endpoints yields the current set of (presumably identical) endpoints, ordered -// lexicographically by the corresponding instance string. -func (c *endpointCache) Endpoints() ([]endpoint.Endpoint, error) { - // in the steady state we're going to have many goroutines calling Endpoints() - // concurrently, so to minimize contention we use a shared R-lock. - c.mtx.RLock() - - if c.err == nil || c.timeNow().Before(c.invalidateDeadline) { - defer c.mtx.RUnlock() - return c.endpoints, nil - } - - c.mtx.RUnlock() - - // in case of an error, switch to an exclusive lock. - c.mtx.Lock() - defer c.mtx.Unlock() - - // re-check condition due to a race between RUnlock() and Lock(). - if c.err == nil || c.timeNow().Before(c.invalidateDeadline) { - return c.endpoints, nil - } - - c.updateCache(nil) // close any remaining active endpoints - return nil, c.err -} diff --git a/sd/cache_test.go b/sd/cache_test.go deleted file mode 100644 index 75c78cb..0000000 --- a/sd/cache_test.go +++ /dev/null @@ -1,180 +0,0 @@ -package sd - -import ( - "errors" - "io" - "testing" - "time" - - "github.com/go-kit/kit/endpoint" - "github.com/go-kit/kit/log" -) - -func TestCache(t *testing.T) { - var ( - ca = make(closer) - cb = make(closer) - c = map[string]io.Closer{"a": ca, "b": cb} - f = func(instance string) (endpoint.Endpoint, io.Closer, error) { return endpoint.Nop, c[instance], nil } - cache = newEndpointCache(f, log.NewNopLogger(), endpointerOptions{}) - ) - - // Populate - cache.Update(Event{Instances: []string{"a", "b"}}) - select { - case <-ca: - t.Errorf("endpoint a closed, not good") - case <-cb: - t.Errorf("endpoint b closed, not good") - case <-time.After(time.Millisecond): - t.Logf("no closures yet, good") - } - assertEndpointsLen(t, cache, 2) - - // Duplicate, should be no-op - cache.Update(Event{Instances: []string{"a", "b"}}) - select { - case <-ca: - t.Errorf("endpoint a closed, not good") - case <-cb: - t.Errorf("endpoint b closed, not good") - case <-time.After(time.Millisecond): - t.Logf("no closures yet, good") - } - assertEndpointsLen(t, cache, 2) - - // Error, should continue returning old endpoints - cache.Update(Event{Err: errors.New("sd error")}) - select { - case <-ca: - t.Errorf("endpoint a closed, not good") - case <-cb: - t.Errorf("endpoint b closed, not good") - case <-time.After(time.Millisecond): - t.Logf("no closures yet, good") - } - assertEndpointsLen(t, cache, 2) - - // Delete b - go cache.Update(Event{Instances: []string{"a"}}) - select { - case <-ca: - t.Errorf("endpoint a closed, not good") - case <-cb: - t.Logf("endpoint b closed, good") - case <-time.After(time.Second): - t.Errorf("didn't close the deleted instance in time") - } - assertEndpointsLen(t, cache, 1) - - // Delete a - go cache.Update(Event{Instances: []string{}}) - select { - // case <-cb: will succeed, as it's closed - case <-ca: - t.Logf("endpoint a closed, good") - case <-time.After(time.Second): - t.Errorf("didn't close the deleted instance in time") - } - assertEndpointsLen(t, cache, 0) -} - -func TestCacheErrorAndTimeout(t *testing.T) { - var ( - ca = make(closer) - cb = make(closer) - c = map[string]io.Closer{"a": ca, "b": cb} - f = func(instance string) (endpoint.Endpoint, io.Closer, error) { return endpoint.Nop, c[instance], nil } - timeOut = 100 * time.Millisecond - cache = newEndpointCache(f, log.NewNopLogger(), endpointerOptions{ - invalidateOnError: true, - invalidateTimeout: timeOut, - }) - ) - - timeNow := time.Now() - cache.timeNow = func() time.Time { return timeNow } - - // Populate - cache.Update(Event{Instances: []string{"a"}}) - select { - case <-ca: - t.Errorf("endpoint a closed, not good") - case <-time.After(time.Millisecond): - t.Logf("no closures yet, good") - } - assertEndpointsLen(t, cache, 1) - - // Send error, keep time still. - cache.Update(Event{Err: errors.New("sd error")}) - select { - case <-ca: - t.Errorf("endpoint a closed, not good") - case <-time.After(time.Millisecond): - t.Logf("no closures yet, good") - } - assertEndpointsLen(t, cache, 1) - - // Move the time, but less than the timeout - timeNow = timeNow.Add(timeOut / 2) - assertEndpointsLen(t, cache, 1) - select { - case <-ca: - t.Errorf("endpoint a closed, not good") - case <-time.After(time.Millisecond): - t.Logf("no closures yet, good") - } - - // Move the time past the timeout - timeNow = timeNow.Add(timeOut) - assertEndpointsError(t, cache, "sd error") - select { - case <-ca: - t.Logf("endpoint a closed, good") - case <-time.After(time.Millisecond): - t.Errorf("didn't close the deleted instance in time") - } - - // Send another error - cache.Update(Event{Err: errors.New("another sd error")}) - assertEndpointsError(t, cache, "sd error") // expect original error -} - -func TestBadFactory(t *testing.T) { - cache := newEndpointCache(func(string) (endpoint.Endpoint, io.Closer, error) { - return nil, nil, errors.New("bad factory") - }, log.NewNopLogger(), endpointerOptions{}) - - cache.Update(Event{Instances: []string{"foo:1234", "bar:5678"}}) - assertEndpointsLen(t, cache, 0) -} - -func assertEndpointsLen(t *testing.T, cache *endpointCache, l int) { - endpoints, err := cache.Endpoints() - if err != nil { - t.Errorf("unexpected error %v", err) - return - } - if want, have := l, len(endpoints); want != have { - t.Errorf("want %d, have %d", want, have) - } -} - -func assertEndpointsError(t *testing.T, cache *endpointCache, wantErr string) { - endpoints, err := cache.Endpoints() - if err == nil { - t.Errorf("expecting error, not good") - return - } - if want, have := wantErr, err.Error(); want != have { - t.Errorf("want %s, have %s", want, have) - return - } - if want, have := 0, len(endpoints); want != have { - t.Errorf("want %d, have %d", want, have) - } -} - -type closer chan struct{} - -func (c closer) Close() error { close(c); return nil } diff --git a/sd/endpoint_cache.go b/sd/endpoint_cache.go new file mode 100644 index 0000000..6a4938f --- /dev/null +++ b/sd/endpoint_cache.go @@ -0,0 +1,143 @@ +package sd + +import ( + "io" + "sort" + "sync" + "time" + + "github.com/go-kit/kit/endpoint" + "github.com/go-kit/kit/log" +) + +// endpointCache collects the most recent set of instances from a service discovery +// system, creates endpoints for them using a factory function, and makes +// them available to consumers. +type endpointCache struct { + options endpointerOptions + mtx sync.RWMutex + factory Factory + cache map[string]endpointCloser + err error + endpoints []endpoint.Endpoint + logger log.Logger + invalidateDeadline time.Time + timeNow func() time.Time +} + +type endpointCloser struct { + endpoint.Endpoint + io.Closer +} + +// newEndpointCache returns a new, empty endpointCache. +func newEndpointCache(factory Factory, logger log.Logger, options endpointerOptions) *endpointCache { + return &endpointCache{ + options: options, + factory: factory, + cache: map[string]endpointCloser{}, + logger: logger, + timeNow: time.Now, + } +} + +// Update should be invoked by clients with a complete set of current instance +// strings whenever that set changes. The cache manufactures new endpoints via +// the factory, closes old endpoints when they disappear, and persists existing +// endpoints if they survive through an update. +func (c *endpointCache) Update(event Event) { + c.mtx.Lock() + defer c.mtx.Unlock() + + // Happy path. + if event.Err == nil { + c.updateCache(event.Instances) + c.err = nil + return + } + + // Sad path. Something's gone wrong in sd. + c.logger.Log("err", event.Err) + if !c.options.invalidateOnError { + return // keep returning the last known endpoints on error + } + if c.err != nil { + return // already in the error state, do nothing & keep original error + } + c.err = event.Err + // set new deadline to invalidate Endpoints unless non-error Event is received + c.invalidateDeadline = c.timeNow().Add(c.options.invalidateTimeout) + return +} + +func (c *endpointCache) updateCache(instances []string) { + // Deterministic order (for later). + sort.Strings(instances) + + // Produce the current set of services. + cache := make(map[string]endpointCloser, len(instances)) + for _, instance := range instances { + // If it already exists, just copy it over. + if sc, ok := c.cache[instance]; ok { + cache[instance] = sc + delete(c.cache, instance) + continue + } + + // If it doesn't exist, create it. + service, closer, err := c.factory(instance) + if err != nil { + c.logger.Log("instance", instance, "err", err) + continue + } + cache[instance] = endpointCloser{service, closer} + } + + // Close any leftover endpoints. + for _, sc := range c.cache { + if sc.Closer != nil { + sc.Closer.Close() + } + } + + // Populate the slice of endpoints. + endpoints := make([]endpoint.Endpoint, 0, len(cache)) + for _, instance := range instances { + // A bad factory may mean an instance is not present. + if _, ok := cache[instance]; !ok { + continue + } + endpoints = append(endpoints, cache[instance].Endpoint) + } + + // Swap and trigger GC for old copies. + c.endpoints = endpoints + c.cache = cache +} + +// Endpoints yields the current set of (presumably identical) endpoints, ordered +// lexicographically by the corresponding instance string. +func (c *endpointCache) Endpoints() ([]endpoint.Endpoint, error) { + // in the steady state we're going to have many goroutines calling Endpoints() + // concurrently, so to minimize contention we use a shared R-lock. + c.mtx.RLock() + + if c.err == nil || c.timeNow().Before(c.invalidateDeadline) { + defer c.mtx.RUnlock() + return c.endpoints, nil + } + + c.mtx.RUnlock() + + // in case of an error, switch to an exclusive lock. + c.mtx.Lock() + defer c.mtx.Unlock() + + // re-check condition due to a race between RUnlock() and Lock(). + if c.err == nil || c.timeNow().Before(c.invalidateDeadline) { + return c.endpoints, nil + } + + c.updateCache(nil) // close any remaining active endpoints + return nil, c.err +} diff --git a/sd/endpoint_cache_test.go b/sd/endpoint_cache_test.go new file mode 100644 index 0000000..91eb5be --- /dev/null +++ b/sd/endpoint_cache_test.go @@ -0,0 +1,180 @@ +package sd + +import ( + "errors" + "io" + "testing" + "time" + + "github.com/go-kit/kit/endpoint" + "github.com/go-kit/kit/log" +) + +func TestEndpointCache(t *testing.T) { + var ( + ca = make(closer) + cb = make(closer) + c = map[string]io.Closer{"a": ca, "b": cb} + f = func(instance string) (endpoint.Endpoint, io.Closer, error) { return endpoint.Nop, c[instance], nil } + cache = newEndpointCache(f, log.NewNopLogger(), endpointerOptions{}) + ) + + // Populate + cache.Update(Event{Instances: []string{"a", "b"}}) + select { + case <-ca: + t.Errorf("endpoint a closed, not good") + case <-cb: + t.Errorf("endpoint b closed, not good") + case <-time.After(time.Millisecond): + t.Logf("no closures yet, good") + } + assertEndpointsLen(t, cache, 2) + + // Duplicate, should be no-op + cache.Update(Event{Instances: []string{"a", "b"}}) + select { + case <-ca: + t.Errorf("endpoint a closed, not good") + case <-cb: + t.Errorf("endpoint b closed, not good") + case <-time.After(time.Millisecond): + t.Logf("no closures yet, good") + } + assertEndpointsLen(t, cache, 2) + + // Error, should continue returning old endpoints + cache.Update(Event{Err: errors.New("sd error")}) + select { + case <-ca: + t.Errorf("endpoint a closed, not good") + case <-cb: + t.Errorf("endpoint b closed, not good") + case <-time.After(time.Millisecond): + t.Logf("no closures yet, good") + } + assertEndpointsLen(t, cache, 2) + + // Delete b + go cache.Update(Event{Instances: []string{"a"}}) + select { + case <-ca: + t.Errorf("endpoint a closed, not good") + case <-cb: + t.Logf("endpoint b closed, good") + case <-time.After(time.Second): + t.Errorf("didn't close the deleted instance in time") + } + assertEndpointsLen(t, cache, 1) + + // Delete a + go cache.Update(Event{Instances: []string{}}) + select { + // case <-cb: will succeed, as it's closed + case <-ca: + t.Logf("endpoint a closed, good") + case <-time.After(time.Second): + t.Errorf("didn't close the deleted instance in time") + } + assertEndpointsLen(t, cache, 0) +} + +func TestEndpointCacheErrorAndTimeout(t *testing.T) { + var ( + ca = make(closer) + cb = make(closer) + c = map[string]io.Closer{"a": ca, "b": cb} + f = func(instance string) (endpoint.Endpoint, io.Closer, error) { return endpoint.Nop, c[instance], nil } + timeOut = 100 * time.Millisecond + cache = newEndpointCache(f, log.NewNopLogger(), endpointerOptions{ + invalidateOnError: true, + invalidateTimeout: timeOut, + }) + ) + + timeNow := time.Now() + cache.timeNow = func() time.Time { return timeNow } + + // Populate + cache.Update(Event{Instances: []string{"a"}}) + select { + case <-ca: + t.Errorf("endpoint a closed, not good") + case <-time.After(time.Millisecond): + t.Logf("no closures yet, good") + } + assertEndpointsLen(t, cache, 1) + + // Send error, keep time still. + cache.Update(Event{Err: errors.New("sd error")}) + select { + case <-ca: + t.Errorf("endpoint a closed, not good") + case <-time.After(time.Millisecond): + t.Logf("no closures yet, good") + } + assertEndpointsLen(t, cache, 1) + + // Move the time, but less than the timeout + timeNow = timeNow.Add(timeOut / 2) + assertEndpointsLen(t, cache, 1) + select { + case <-ca: + t.Errorf("endpoint a closed, not good") + case <-time.After(time.Millisecond): + t.Logf("no closures yet, good") + } + + // Move the time past the timeout + timeNow = timeNow.Add(timeOut) + assertEndpointsError(t, cache, "sd error") + select { + case <-ca: + t.Logf("endpoint a closed, good") + case <-time.After(time.Millisecond): + t.Errorf("didn't close the deleted instance in time") + } + + // Send another error + cache.Update(Event{Err: errors.New("another sd error")}) + assertEndpointsError(t, cache, "sd error") // expect original error +} + +func TestBadFactory(t *testing.T) { + cache := newEndpointCache(func(string) (endpoint.Endpoint, io.Closer, error) { + return nil, nil, errors.New("bad factory") + }, log.NewNopLogger(), endpointerOptions{}) + + cache.Update(Event{Instances: []string{"foo:1234", "bar:5678"}}) + assertEndpointsLen(t, cache, 0) +} + +func assertEndpointsLen(t *testing.T, cache *endpointCache, l int) { + endpoints, err := cache.Endpoints() + if err != nil { + t.Errorf("unexpected error %v", err) + return + } + if want, have := l, len(endpoints); want != have { + t.Errorf("want %d, have %d", want, have) + } +} + +func assertEndpointsError(t *testing.T, cache *endpointCache, wantErr string) { + endpoints, err := cache.Endpoints() + if err == nil { + t.Errorf("expecting error, not good") + return + } + if want, have := wantErr, err.Error(); want != have { + t.Errorf("want %s, have %s", want, have) + return + } + if want, have := 0, len(endpoints); want != have { + t.Errorf("want %d, have %d", want, have) + } +} + +type closer chan struct{} + +func (c closer) Close() error { close(c); return nil } diff --git a/sd/endpointer_test.go b/sd/endpointer_test.go new file mode 100644 index 0000000..bea6605 --- /dev/null +++ b/sd/endpointer_test.go @@ -0,0 +1,80 @@ +package sd_test + +import ( + "io" + "testing" + "time" + + "github.com/go-kit/kit/endpoint" + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/sd" + "github.com/go-kit/kit/sd/internal/instance" +) + +func TestDefaultEndpointer(t *testing.T) { + var ( + ca = make(closer) + cb = make(closer) + c = map[string]io.Closer{"a": ca, "b": cb} + f = func(instance string) (endpoint.Endpoint, io.Closer, error) { + return endpoint.Nop, c[instance], nil + } + instancer = &mockInstancer{ + cache: instance.NewCache(), + } + ) + // set initial state + instancer.Update(sd.Event{Instances: []string{"a", "b"}}) + + endpointer := sd.NewEndpointer(instancer, f, log.NewNopLogger(), sd.InvalidateOnError(time.Minute)) + if endpoints, err := endpointer.Endpoints(); err != nil { + t.Errorf("unepected error %v", err) + } else if want, have := 2, len(endpoints); want != have { + t.Errorf("want %d, have %d", want, have) + } + + instancer.Update(sd.Event{Instances: []string{}}) + select { + case <-ca: + t.Logf("endpoint a closed, good") + case <-time.After(time.Millisecond): + t.Errorf("didn't close the deleted instance in time") + } + select { + case <-cb: + t.Logf("endpoint b closed, good") + case <-time.After(time.Millisecond): + t.Errorf("didn't close the deleted instance in time") + } + if endpoints, err := endpointer.Endpoints(); err != nil { + t.Errorf("unepected error %v", err) + } else if want, have := 0, len(endpoints); want != have { + t.Errorf("want %d, have %d", want, have) + } + + endpointer.Close() + instancer.Update(sd.Event{Instances: []string{"a"}}) + // TODO verify that on Close the endpointer fully disconnects from the instancer. + // Unfortunately, because we use instance.Cache, this test cannot be in the sd package, + // and therefore does not have access to the endpointer's private members. +} + +type mockInstancer struct { + cache *instance.Cache +} + +func (m *mockInstancer) Update(event sd.Event) { + m.cache.Update(event) +} + +func (m *mockInstancer) Register(ch chan<- sd.Event) { + m.cache.Register(ch) +} + +func (m *mockInstancer) Deregister(ch chan<- sd.Event) { + m.cache.Deregister(ch) +} + +type closer chan struct{} + +func (c closer) Close() error { close(c); return nil } diff --git a/sd/internal/instance/cache_test.go b/sd/internal/instance/cache_test.go index 5d9847a..bb612d8 100644 --- a/sd/internal/instance/cache_test.go +++ b/sd/internal/instance/cache_test.go @@ -1,78 +1,79 @@ package instance import ( - "sync" + "reflect" "testing" + "time" "github.com/go-kit/kit/sd" ) var _ sd.Instancer = &Cache{} // API check +// The test verifies the following: +// registering causes initial notification of the current state +// notifications delivered to two receivers +// identical notifications cause no updates +// different update causes new notification +// instances are sorted +// no updates after de-registering func TestCache(t *testing.T) { - // TODO this test is not finished yet + e1 := sd.Event{Instances: []string{"y", "x"}} // not sorted + e2 := sd.Event{Instances: []string{"c", "a", "b"}} c := NewCache() - - { - state := c.State() - if want, have := 0, len(state.Instances); want != have { - t.Fatalf("want %v instances, have %v", want, have) - } + if want, have := 0, len(c.State().Instances); want != have { + t.Fatalf("want %v instances, have %v", want, have) } - notification1 := sd.Event{Instances: []string{"x", "y"}} - notification2 := sd.Event{Instances: []string{"a", "b", "c"}} - - c.Update(notification1) - - // times 2 because we have two observers - expectedInstances := 2 * (len(notification1.Instances) + len(notification2.Instances)) - - wg := sync.WaitGroup{} - wg.Add(expectedInstances) - - receiver := func(ch chan sd.Event) { - for state := range ch { - // count total number of instances received - for range state.Instances { - wg.Done() - } - } + c.Update(e1) // sets initial state + if want, have := 2, len(c.State().Instances); want != have { + t.Fatalf("want %v instances, have %v", want, have) } - f1 := make(chan sd.Event) - f2 := make(chan sd.Event) - go receiver(f1) - go receiver(f2) + r1 := make(chan sd.Event) + go c.Register(r1) + expectUpdate(t, r1, []string{"x", "y"}) - c.Register(f1) - c.Register(f2) + r2 := make(chan sd.Event) + go c.Register(r2) + expectUpdate(t, r2, []string{"x", "y"}) - c.Update(notification1) - c.Update(notification2) + // send the same instances but in different order. + // because it's a duplicate it should not cause new notification. + // if it did, this call would deadlock trying to send to channels with no readers + c.Update(sd.Event{Instances: []string{"x", "y"}}) + expectNoUpdate(t, r1) + expectNoUpdate(t, r2) - // if state := c.State(); instances == nil { - // if want, have := len(notification2), len(instances); want != have { - // t.Errorf("want length %v, have %v", want, have) - // } else { - // for i := range notification2 { - // if want, have := notification2[i], instances[i]; want != have { - // t.Errorf("want instance %v, have %v", want, have) - // } - // } - // } - // } + go c.Update(e2) // different set + expectUpdate(t, r1, []string{"a", "b", "c"}) + expectUpdate(t, r2, []string{"a", "b", "c"}) - close(f1) - close(f2) + c.Deregister(r1) + c.Deregister(r2) + close(r1) + close(r2) + // if deregister didn't work, Update would panic on closed channels + c.Update(e1) +} - wg.Wait() +func expectUpdate(t *testing.T, r chan sd.Event, expect []string) { + select { + case e := <-r: + if want, have := expect, e.Instances; !reflect.DeepEqual(want, have) { + t.Fatalf("want: %v, have: %v", want, have) + } + case <-time.After(time.Second): + t.Fatalf("did not receive expected update") + } +} - // d.Deregister(f1) - - // d.Unregister(f2) - // if want, have := 0, len(d.observers); want != have { - // t.Fatalf("want %v observers, have %v", want, have) - // } +func expectNoUpdate(t *testing.T, r chan sd.Event) { + select { + case e := <-r: + t.Errorf("received unexpected update %v", e) + case <-time.After(time.Millisecond): + return // as expected + } }