diff --git a/examples/profilesvc/client/client.go b/examples/profilesvc/client/client.go index 6251718..03c1dc7 100644 --- a/examples/profilesvc/client/client.go +++ b/examples/profilesvc/client/client.go @@ -43,28 +43,6 @@ instancer = consul.NewInstancer(sdclient, logger, consulService, consulTags, passingOnly) endpoints profilesvc.Endpoints ) - // TODO: thought experiment - mapping := []struct { - factory func(s profilesvc.Service) endpoint.Endpoint - endpoint *endpoint.Endpoint - }{ - { - factory: profilesvc.MakePostProfileEndpoint, - endpoint: &endpoints.PostProfileEndpoint, - }, - { - factory: profilesvc.MakeGetProfileEndpoint, - endpoint: &endpoints.GetProfileEndpoint, - }, - } - for _, m := range mapping { - factory := factoryFor(m.factory) - endpointer := sd.NewEndpointer(instancer, factory, logger) - balancer := lb.NewRoundRobin(endpointer) - retry := lb.Retry(retryMax, retryTimeout, balancer) - *m.endpoint = retry - } - // TODO: why not 2 lines per endpoint registration above instead of 7 lines per endpoint below? { factory := factoryFor(profilesvc.MakePostProfileEndpoint) endpointer := sd.NewEndpointer(instancer, factory, logger) diff --git a/sd/cache.go b/sd/cache.go index af5514c..ce03206 100644 --- a/sd/cache.go +++ b/sd/cache.go @@ -22,6 +22,7 @@ endpoints []endpoint.Endpoint logger log.Logger invalidateDeadline time.Time + timeNow func() time.Time } type endpointCloser struct { @@ -36,6 +37,7 @@ factory: factory, cache: map[string]endpointCloser{}, logger: logger, + timeNow: time.Now, } } @@ -47,27 +49,24 @@ c.mtx.Lock() defer c.mtx.Unlock() + // Happy path. if event.Err == nil { c.updateCache(event.Instances) - c.invalidateDeadline = time.Time{} c.err = nil - } - - c.logger.Log("err", event.Err) - - if c.options.invalidateOnErrorTimeout == nil { - // keep returning the last known endpoints on error return } + // Sad path. Something's gone wrong in sd. + c.logger.Log("err", event.Err) + if c.options.invalidateOnErrorTimeout == nil { + return // keep returning the last known endpoints on error + } + if c.err != nil { + return // already in the error state, do nothing & keep original error + } c.err = event.Err - - if !c.invalidateDeadline.IsZero() { - // aleady in the error state, do nothing - return - } // set new deadline to invalidate Endpoints unless non-error Event is received - c.invalidateDeadline = time.Now().Add(*c.options.invalidateOnErrorTimeout) + c.invalidateDeadline = c.timeNow().Add(*c.options.invalidateOnErrorTimeout) return } @@ -121,7 +120,7 @@ func (c *endpointCache) Endpoints() ([]endpoint.Endpoint, error) { c.mtx.RLock() - if c.err == nil || time.Now().Before(c.invalidateDeadline) { + if c.err == nil || c.timeNow().Before(c.invalidateDeadline) { defer c.mtx.RUnlock() return c.endpoints, nil } @@ -130,7 +129,11 @@ c.mtx.Lock() defer c.mtx.Unlock() + // Re-check due to a race between RUnlock() and Lock(). + if c.err == nil || c.timeNow().Before(c.invalidateDeadline) { + return c.endpoints, nil + } + c.updateCache(nil) // close any remaining active endpoints - return nil, c.err } diff --git a/sd/cache_test.go b/sd/cache_test.go index 8ab47aa..eca3f69 100644 --- a/sd/cache_test.go +++ b/sd/cache_test.go @@ -43,6 +43,18 @@ } assertEndpointsLen(t, cache, 2) + // Error, should continue returning old endpoints + cache.Update(Event{Err: errors.New("sd error")}) + select { + case <-ca: + t.Errorf("endpoint a closed, not good") + case <-cb: + t.Errorf("endpoint b closed, not good") + case <-time.After(time.Millisecond): + t.Logf("no closures yet, good") + } + assertEndpointsLen(t, cache, 2) + // Delete b go cache.Update(Event{Instances: []string{"a"}}) select { @@ -67,6 +79,64 @@ assertEndpointsLen(t, cache, 0) } +func TestCacheErrorAndTimeout(t *testing.T) { + var ( + ca = make(closer) + cb = make(closer) + c = map[string]io.Closer{"a": ca, "b": cb} + f = func(instance string) (endpoint.Endpoint, io.Closer, error) { return endpoint.Nop, c[instance], nil } + timeOut = 100 * time.Millisecond + cache = newEndpointCache(f, log.NewNopLogger(), endpointerOptions{invalidateOnErrorTimeout: &timeOut}) + ) + + timeNow := time.Now() + cache.timeNow = func() time.Time { return timeNow } + + // Populate + cache.Update(Event{Instances: []string{"a"}}) + select { + case <-ca: + t.Errorf("endpoint a closed, not good") + case <-time.After(time.Millisecond): + t.Logf("no closures yet, good") + } + assertEndpointsLen(t, cache, 1) + + // Send error, keep time still. + cache.Update(Event{Err: errors.New("sd error")}) + select { + case <-ca: + t.Errorf("endpoint a closed, not good") + case <-time.After(time.Millisecond): + t.Logf("no closures yet, good") + } + assertEndpointsLen(t, cache, 1) + + // Move the time, but less than the timeout + timeNow = timeNow.Add(timeOut / 2) + assertEndpointsLen(t, cache, 1) + select { + case <-ca: + t.Errorf("endpoint a closed, not good") + case <-time.After(time.Millisecond): + t.Logf("no closures yet, good") + } + + // Move the time past the timeout + timeNow = timeNow.Add(timeOut) + assertEndpointsError(t, cache, "sd error") + select { + case <-ca: + t.Logf("endpoint a closed, good") + case <-time.After(time.Millisecond): + t.Errorf("didn't close the deleted instance in time") + } + + // Send another error + cache.Update(Event{Err: errors.New("another sd error")}) + assertEndpointsError(t, cache, "sd error") // expect original error +} + func TestBadFactory(t *testing.T) { cache := newEndpointCache(func(string) (endpoint.Endpoint, io.Closer, error) { return nil, nil, errors.New("bad factory") @@ -87,6 +157,21 @@ } } +func assertEndpointsError(t *testing.T, cache *endpointCache, wantErr string) { + endpoints, err := cache.Endpoints() + if err == nil { + t.Errorf("expecting error, not good") + return + } + if want, have := wantErr, err.Error(); want != have { + t.Errorf("want %s, have %s", want, have) + return + } + if want, have := 0, len(endpoints); want != have { + t.Errorf("want %d, have %d", want, have) + } +} + type closer chan struct{} func (c closer) Close() error { close(c); return nil } diff --git a/sd/consul/instancer.go b/sd/consul/instancer.go index a7e37c0..38b18f0 100644 --- a/sd/consul/instancer.go +++ b/sd/consul/instancer.go @@ -15,7 +15,7 @@ // Instancer yields instances for a service in Consul. type Instancer struct { - instance.Cache + cache *instance.Cache client Client logger log.Logger service string @@ -29,7 +29,7 @@ // are present. func NewInstancer(client Client, logger log.Logger, service string, tags []string, passingOnly bool) *Instancer { s := &Instancer{ - Cache: *instance.NewCache(), + cache: instance.NewCache(), client: client, logger: log.With(logger, "service", service, "tags", fmt.Sprint(tags)), service: service, @@ -45,7 +45,7 @@ s.logger.Log("err", err) } - s.Update(sd.Event{Instances: instances, Err: err}) + s.cache.Update(sd.Event{Instances: instances, Err: err}) go s.loop(index) return s } @@ -67,9 +67,9 @@ return // stopped via quitc case err != nil: s.logger.Log("err", err) - s.Update(sd.Event{Err: err}) + s.cache.Update(sd.Event{Err: err}) default: - s.Update(sd.Event{Instances: instances}) + s.cache.Update(sd.Event{Instances: instances}) } } } @@ -123,6 +123,16 @@ } } +// Register implements Instancer. +func (s *Instancer) Register(ch chan<- sd.Event) { + s.cache.Register(ch) +} + +// Deregister implements Instancer. +func (s *Instancer) Deregister(ch chan<- sd.Event) { + s.cache.Deregister(ch) +} + func filterEntries(entries []*consul.ServiceEntry, tags ...string) []*consul.ServiceEntry { var es []*consul.ServiceEntry diff --git a/sd/consul/instancer_test.go b/sd/consul/instancer_test.go index 2ca52f0..5df6b43 100644 --- a/sd/consul/instancer_test.go +++ b/sd/consul/instancer_test.go @@ -69,7 +69,7 @@ s := NewInstancer(client, logger, "search", []string{"api"}, true) defer s.Stop() - state := s.State() + state := s.cache.State() if want, have := 2, len(state.Instances); want != have { t.Errorf("want %d, have %d", want, have) } @@ -84,7 +84,7 @@ s := NewInstancer(client, logger, "feed", []string{}, true) defer s.Stop() - state := s.State() + state := s.cache.State() if want, have := 0, len(state.Instances); want != have { t.Fatalf("want %d, have %d", want, have) } @@ -99,7 +99,7 @@ s := NewInstancer(client, logger, "search", []string{"api", "v2"}, true) defer s.Stop() - state := s.State() + state := s.cache.State() if want, have := 1, len(state.Instances); want != have { t.Fatalf("want %d, have %d", want, have) } @@ -109,7 +109,7 @@ s := NewInstancer(newTestClient(consulState), log.NewNopLogger(), "search", []string{"db"}, true) defer s.Stop() - state := s.State() + state := s.cache.State() if want, have := 1, len(state.Instances); want != have { t.Fatalf("want %d, have %d", want, have) } diff --git a/sd/dnssrv/instancer.go b/sd/dnssrv/instancer.go index 33efeda..bed2cb5 100644 --- a/sd/dnssrv/instancer.go +++ b/sd/dnssrv/instancer.go @@ -13,7 +13,7 @@ // Instancer yields instances from the named DNS SRV record. The name is // resolved on a fixed schedule. Priorities and weights are ignored. type Instancer struct { - instance.Cache + cache *instance.Cache name string logger log.Logger quit chan struct{} @@ -38,7 +38,7 @@ logger log.Logger, ) *Instancer { p := &Instancer{ - Cache: *instance.NewCache(), + cache: instance.NewCache(), name: name, logger: logger, quit: make(chan struct{}), @@ -50,7 +50,7 @@ } else { logger.Log("name", name, "err", err) } - p.Update(sd.Event{Instances: instances, Err: err}) + p.cache.Update(sd.Event{Instances: instances, Err: err}) go p.loop(refresh, lookup) return p @@ -69,10 +69,10 @@ instances, err := p.resolve(lookup) if err != nil { p.logger.Log("name", p.name, "err", err) - p.Update(sd.Event{Err: err}) + p.cache.Update(sd.Event{Err: err}) continue // don't replace potentially-good with bad } - p.Update(sd.Event{Instances: instances}) + p.cache.Update(sd.Event{Instances: instances}) case <-p.quit: return @@ -91,3 +91,13 @@ } return instances, nil } + +// Register implements Instancer. +func (s *Instancer) Register(ch chan<- sd.Event) { + s.cache.Register(ch) +} + +// Deregister implements Instancer. +func (s *Instancer) Deregister(ch chan<- sd.Event) { + s.cache.Deregister(ch) +} diff --git a/sd/dnssrv/instancer_test.go b/sd/dnssrv/instancer_test.go index 3600e6e..c3221bb 100644 --- a/sd/dnssrv/instancer_test.go +++ b/sd/dnssrv/instancer_test.go @@ -32,7 +32,7 @@ defer instancer.Stop() // First lookup, empty - state := instancer.State() + state := instancer.cache.State() if state.Err != nil { t.Error(state.Err) } @@ -56,7 +56,7 @@ // TODO(pb): solve by running the read through the loop goroutine. time.Sleep(100 * time.Millisecond) - state = instancer.State() + state = instancer.cache.State() if state.Err != nil { t.Error(state.Err) } diff --git a/sd/endpointer.go b/sd/endpointer.go index e380077..189ab90 100644 --- a/sd/endpointer.go +++ b/sd/endpointer.go @@ -24,16 +24,16 @@ // NewEndpointer creates an Endpointer that subscribes to updates from Instancer src // and uses factory f to create Endpoints. If src notifies of an error, the Endpointer // keeps returning previously created Endpoints assuming they are still good, unless -// this behavior is disabled with ResetOnError option. -func NewEndpointer(src Instancer, f Factory, logger log.Logger, options ...EndpointerOption) Endpointer { +// this behavior is disabled via InvalidateOnError option. +func NewEndpointer(src Instancer, f Factory, logger log.Logger, options ...EndpointerOption) *DefaultEndpointer { opts := endpointerOptions{} for _, opt := range options { opt(&opts) } - se := &simpleEndpointer{ - endpointCache: *newEndpointCache(f, logger, opts), - instancer: src, - ch: make(chan Event), + se := &DefaultEndpointer{ + cache: newEndpointCache(f, logger, opts), + instancer: src, + ch: make(chan Event), } go se.receive() src.Register(se.ch) @@ -60,20 +60,29 @@ invalidateOnErrorTimeout *time.Duration } -type simpleEndpointer struct { - endpointCache - +// DefaultEndpointer implements an Endpointer interface. +// When created with NewEndpointer function, it automatically registers +// as a subscriber to events from the Instances and maintains a list +// of active Endpoints. +type DefaultEndpointer struct { + cache *endpointCache instancer Instancer ch chan Event } -func (se *simpleEndpointer) receive() { - for event := range se.ch { - se.Update(event) +func (de *DefaultEndpointer) receive() { + for event := range de.ch { + de.cache.Update(event) } } -func (se *simpleEndpointer) Close() { - se.instancer.Deregister(se.ch) - close(se.ch) +// Close de-registeres DefaultEndpointer from the Instancer and stops the internal go-routine. +func (de *DefaultEndpointer) Close() { + de.instancer.Deregister(de.ch) + close(de.ch) } + +// Endpoints implements Endpointer. +func (de *DefaultEndpointer) Endpoints() ([]endpoint.Endpoint, error) { + return de.cache.Endpoints() +} diff --git a/sd/etcd/instancer.go b/sd/etcd/instancer.go index a34f656..693fc7a 100644 --- a/sd/etcd/instancer.go +++ b/sd/etcd/instancer.go @@ -9,7 +9,7 @@ // Instancer yields instances stored in a certain etcd keyspace. Any kind of // change in that keyspace is watched and will update the Instancer's Instancers. type Instancer struct { - instance.Cache + cache *instance.Cache client Client prefix string logger log.Logger @@ -22,7 +22,7 @@ s := &Instancer{ client: c, prefix: prefix, - Cache: *instance.NewCache(), + cache: instance.NewCache(), logger: logger, quitc: make(chan struct{}), } @@ -33,7 +33,7 @@ } else { logger.Log("prefix", s.prefix, "err", err) } - s.Update(sd.Event{Instances: instances, Err: err}) + s.cache.Update(sd.Event{Instances: instances, Err: err}) go s.loop() return s, nil @@ -48,10 +48,10 @@ instances, err := s.client.GetEntries(s.prefix) if err != nil { s.logger.Log("msg", "failed to retrieve entries", "err", err) - s.Update(sd.Event{Err: err}) + s.cache.Update(sd.Event{Err: err}) continue } - s.Update(sd.Event{Instances: instances}) + s.cache.Update(sd.Event{Instances: instances}) case <-s.quitc: return @@ -63,3 +63,13 @@ func (s *Instancer) Stop() { close(s.quitc) } + +// Register implements Instancer. +func (s *Instancer) Register(ch chan<- sd.Event) { + s.cache.Register(ch) +} + +// Deregister implements Instancer. +func (s *Instancer) Deregister(ch chan<- sd.Event) { + s.cache.Deregister(ch) +} diff --git a/sd/etcd/instancer_test.go b/sd/etcd/instancer_test.go index f669a0d..9609e28 100644 --- a/sd/etcd/instancer_test.go +++ b/sd/etcd/instancer_test.go @@ -36,7 +36,7 @@ } defer s.Stop() - if state := s.State(); state.Err != nil { + if state := s.cache.State(); state.Err != nil { t.Fatal(state.Err) } } diff --git a/sd/eureka/instancer.go b/sd/eureka/instancer.go index ea601b4..fddbbb5 100644 --- a/sd/eureka/instancer.go +++ b/sd/eureka/instancer.go @@ -13,7 +13,7 @@ // Instancer yields instances stored in the Eureka registry for the given app. // Changes in that app are watched and will update the subscribers. type Instancer struct { - instance.Cache + cache *instance.Cache conn fargoConnection app string logger log.Logger @@ -26,7 +26,7 @@ logger = log.With(logger, "app", app) s := &Instancer{ - Cache: *instance.NewCache(), + cache: instance.NewCache(), conn: conn, app: app, logger: logger, @@ -40,7 +40,7 @@ s.logger.Log("during", "getInstances", "err", err) } - s.Update(sd.Event{Instances: instances, Err: err}) + s.cache.Update(sd.Event{Instances: instances, Err: err}) go s.loop() return s } @@ -66,12 +66,12 @@ case update := <-updatec: if update.Err != nil { s.logger.Log("during", "Update", "err", update.Err) - s.Update(sd.Event{Err: update.Err}) + s.cache.Update(sd.Event{Err: update.Err}) continue } instances := convertFargoAppToInstances(update.App) s.logger.Log("instances", len(instances)) - s.Update(sd.Event{Instances: instances}) + s.cache.Update(sd.Event{Instances: instances}) case q := <-s.quitc: close(q) @@ -95,3 +95,13 @@ } return instances } + +// Register implements Instancer. +func (s *Instancer) Register(ch chan<- sd.Event) { + s.cache.Register(ch) +} + +// Deregister implements Instancer. +func (s *Instancer) Deregister(ch chan<- sd.Event) { + s.cache.Deregister(ch) +} diff --git a/sd/eureka/instancer_test.go b/sd/eureka/instancer_test.go index 94e40b3..98c83e5 100644 --- a/sd/eureka/instancer_test.go +++ b/sd/eureka/instancer_test.go @@ -21,7 +21,7 @@ instancer := NewInstancer(connection, appNameTest, loggerTest) defer instancer.Stop() - state := instancer.State() + state := instancer.cache.State() if state.Err != nil { t.Fatal(state.Err) } @@ -41,14 +41,14 @@ instancer := NewInstancer(connection, appNameTest, loggerTest) defer instancer.Stop() - state := instancer.State() + state := instancer.cache.State() if want, have := 1, len(state.Instances); want != have { t.Errorf("want %d, have %d", want, have) } time.Sleep(50 * time.Millisecond) - state = instancer.State() + state = instancer.cache.State() if want, have := 2, len(state.Instances); want != have { t.Errorf("want %v, have %v", want, have) } @@ -65,7 +65,7 @@ instancer := NewInstancer(connection, appNameTest, loggerTest) defer instancer.Stop() - state := instancer.State() + state := instancer.cache.State() if state.Err == nil { t.Fatal("expecting error") } @@ -85,7 +85,7 @@ instancer := NewInstancer(connection, appNameTest, loggerTest) defer instancer.Stop() - state := instancer.State() + state := instancer.cache.State() if state.Err != nil { t.Error(state.Err) } @@ -95,7 +95,7 @@ time.Sleep(50 * time.Millisecond) - state = instancer.State() + state = instancer.cache.State() if state.Err == nil { t.Fatal("expecting error") } diff --git a/sd/zk/instancer.go b/sd/zk/instancer.go index 1983112..c81e01d 100644 --- a/sd/zk/instancer.go +++ b/sd/zk/instancer.go @@ -11,7 +11,7 @@ // Instancer yield instances stored in a certain ZooKeeper path. Any kind of // change in that path is watched and will update the subscribers. type Instancer struct { - instance.Cache + cache *instance.Cache client Client path string logger log.Logger @@ -22,7 +22,7 @@ // the given path for changes and update the Instancer endpoints. func NewInstancer(c Client, path string, logger log.Logger) (*Instancer, error) { s := &Instancer{ - Cache: *instance.NewCache(), + cache: instance.NewCache(), client: c, path: path, logger: logger, @@ -37,11 +37,11 @@ instances, eventc, err := s.client.GetEntries(s.path) if err != nil { logger.Log("path", s.path, "msg", "failed to retrieve entries", "err", err) - // TODO why zk constructor exits when other implementations continue? + // other implementations continue here, but we exit because we don't know if eventc is valid return nil, err } logger.Log("path", s.path, "instances", len(instances)) - s.Update(sd.Event{Instances: instances}) + s.cache.Update(sd.Event{Instances: instances}) go s.loop(eventc) @@ -62,11 +62,11 @@ instances, eventc, err = s.client.GetEntries(s.path) if err != nil { s.logger.Log("path", s.path, "msg", "failed to retrieve entries", "err", err) - s.Update(sd.Event{Err: err}) + s.cache.Update(sd.Event{Err: err}) continue } s.logger.Log("path", s.path, "instances", len(instances)) - s.Update(sd.Event{Instances: instances}) + s.cache.Update(sd.Event{Instances: instances}) case <-s.quitc: return @@ -78,3 +78,13 @@ func (s *Instancer) Stop() { close(s.quitc) } + +// Register implements Instancer. +func (s *Instancer) Register(ch chan<- sd.Event) { + s.cache.Register(ch) +} + +// Deregister implements Instancer. +func (s *Instancer) Deregister(ch chan<- sd.Event) { + s.cache.Deregister(ch) +}