diff --git a/.gitignore b/.gitignore index 6062401..cb871af 100644 --- a/.gitignore +++ b/.gitignore @@ -43,3 +43,8 @@ # auto-generated tag files tags +# dependency management files +glide.lock +glide.yaml +vendor/ + diff --git a/coverage.bash b/coverage.bash index f4b0524..da12ef9 100755 --- a/coverage.bash +++ b/coverage.bash @@ -7,7 +7,7 @@ set -e function go_files { find . -name '*_test.go' ; } -function filter { grep -v '/_' ; } +function filter { grep -v -e '/_' -e vendor ; } function remove_relative_prefix { sed -e 's/^\.\///g' ; } function directories { diff --git a/examples/apigateway/main.go b/examples/apigateway/main.go index bedb368..bba6178 100644 --- a/examples/apigateway/main.go +++ b/examples/apigateway/main.go @@ -84,18 +84,19 @@ tags = []string{} passingOnly = true endpoints = addsvc.Endpoints{} + instancer = consulsd.NewInstancer(client, logger, "addsvc", tags, passingOnly) ) { factory := addsvcFactory(addsvc.MakeSumEndpoint, tracer, logger) - subscriber := consulsd.NewSubscriber(client, factory, logger, "addsvc", tags, passingOnly) - balancer := lb.NewRoundRobin(subscriber) + endpointer := sd.NewEndpointer(instancer, factory, logger) + balancer := lb.NewRoundRobin(endpointer) retry := lb.Retry(*retryMax, *retryTimeout, balancer) endpoints.SumEndpoint = retry } { factory := addsvcFactory(addsvc.MakeConcatEndpoint, tracer, logger) - subscriber := consulsd.NewSubscriber(client, factory, logger, "addsvc", tags, passingOnly) - balancer := lb.NewRoundRobin(subscriber) + endpointer := sd.NewEndpointer(instancer, factory, logger) + balancer := lb.NewRoundRobin(endpointer) retry := lb.Retry(*retryMax, *retryTimeout, balancer) endpoints.ConcatEndpoint = retry } @@ -120,18 +121,19 @@ passingOnly = true uppercase endpoint.Endpoint count endpoint.Endpoint + instancer = consulsd.NewInstancer(client, logger, "stringsvc", tags, passingOnly) ) { factory := stringsvcFactory(ctx, "GET", "/uppercase") - subscriber := consulsd.NewSubscriber(client, factory, logger, "stringsvc", tags, passingOnly) - balancer := lb.NewRoundRobin(subscriber) + endpointer := sd.NewEndpointer(instancer, factory, logger) + balancer := lb.NewRoundRobin(endpointer) retry := lb.Retry(*retryMax, *retryTimeout, balancer) uppercase = retry } { factory := stringsvcFactory(ctx, "GET", "/count") - subscriber := consulsd.NewSubscriber(client, factory, logger, "stringsvc", tags, passingOnly) - balancer := lb.NewRoundRobin(subscriber) + endpointer := sd.NewEndpointer(instancer, factory, logger) + balancer := lb.NewRoundRobin(endpointer) retry := lb.Retry(*retryMax, *retryTimeout, balancer) count = retry } diff --git a/examples/profilesvc/client/client.go b/examples/profilesvc/client/client.go index 6b1dff0..6251718 100644 --- a/examples/profilesvc/client/client.go +++ b/examples/profilesvc/client/client.go @@ -40,68 +40,91 @@ var ( sdclient = consul.NewClient(apiclient) + instancer = consul.NewInstancer(sdclient, logger, consulService, consulTags, passingOnly) endpoints profilesvc.Endpoints ) + // TODO: thought experiment + mapping := []struct { + factory func(s profilesvc.Service) endpoint.Endpoint + endpoint *endpoint.Endpoint + }{ + { + factory: profilesvc.MakePostProfileEndpoint, + endpoint: &endpoints.PostProfileEndpoint, + }, + { + factory: profilesvc.MakeGetProfileEndpoint, + endpoint: &endpoints.GetProfileEndpoint, + }, + } + for _, m := range mapping { + factory := factoryFor(m.factory) + endpointer := sd.NewEndpointer(instancer, factory, logger) + balancer := lb.NewRoundRobin(endpointer) + retry := lb.Retry(retryMax, retryTimeout, balancer) + *m.endpoint = retry + } + // TODO: why not 2 lines per endpoint registration above instead of 7 lines per endpoint below? { factory := factoryFor(profilesvc.MakePostProfileEndpoint) - subscriber := consul.NewSubscriber(sdclient, factory, logger, consulService, consulTags, passingOnly) - balancer := lb.NewRoundRobin(subscriber) + endpointer := sd.NewEndpointer(instancer, factory, logger) + balancer := lb.NewRoundRobin(endpointer) retry := lb.Retry(retryMax, retryTimeout, balancer) endpoints.PostProfileEndpoint = retry } { factory := factoryFor(profilesvc.MakeGetProfileEndpoint) - subscriber := consul.NewSubscriber(sdclient, factory, logger, consulService, consulTags, passingOnly) - balancer := lb.NewRoundRobin(subscriber) + endpointer := sd.NewEndpointer(instancer, factory, logger) + balancer := lb.NewRoundRobin(endpointer) retry := lb.Retry(retryMax, retryTimeout, balancer) endpoints.GetProfileEndpoint = retry } { factory := factoryFor(profilesvc.MakePutProfileEndpoint) - subscriber := consul.NewSubscriber(sdclient, factory, logger, consulService, consulTags, passingOnly) - balancer := lb.NewRoundRobin(subscriber) + endpointer := sd.NewEndpointer(instancer, factory, logger) + balancer := lb.NewRoundRobin(endpointer) retry := lb.Retry(retryMax, retryTimeout, balancer) endpoints.PutProfileEndpoint = retry } { factory := factoryFor(profilesvc.MakePatchProfileEndpoint) - subscriber := consul.NewSubscriber(sdclient, factory, logger, consulService, consulTags, passingOnly) - balancer := lb.NewRoundRobin(subscriber) + endpointer := sd.NewEndpointer(instancer, factory, logger) + balancer := lb.NewRoundRobin(endpointer) retry := lb.Retry(retryMax, retryTimeout, balancer) endpoints.PatchProfileEndpoint = retry } { factory := factoryFor(profilesvc.MakeDeleteProfileEndpoint) - subscriber := consul.NewSubscriber(sdclient, factory, logger, consulService, consulTags, passingOnly) - balancer := lb.NewRoundRobin(subscriber) + endpointer := sd.NewEndpointer(instancer, factory, logger) + balancer := lb.NewRoundRobin(endpointer) retry := lb.Retry(retryMax, retryTimeout, balancer) endpoints.DeleteProfileEndpoint = retry } { factory := factoryFor(profilesvc.MakeGetAddressesEndpoint) - subscriber := consul.NewSubscriber(sdclient, factory, logger, consulService, consulTags, passingOnly) - balancer := lb.NewRoundRobin(subscriber) + endpointer := sd.NewEndpointer(instancer, factory, logger) + balancer := lb.NewRoundRobin(endpointer) retry := lb.Retry(retryMax, retryTimeout, balancer) endpoints.GetAddressesEndpoint = retry } { factory := factoryFor(profilesvc.MakeGetAddressEndpoint) - subscriber := consul.NewSubscriber(sdclient, factory, logger, consulService, consulTags, passingOnly) - balancer := lb.NewRoundRobin(subscriber) + endpointer := sd.NewEndpointer(instancer, factory, logger) + balancer := lb.NewRoundRobin(endpointer) retry := lb.Retry(retryMax, retryTimeout, balancer) endpoints.GetAddressEndpoint = retry } { factory := factoryFor(profilesvc.MakePostAddressEndpoint) - subscriber := consul.NewSubscriber(sdclient, factory, logger, consulService, consulTags, passingOnly) - balancer := lb.NewRoundRobin(subscriber) + endpointer := sd.NewEndpointer(instancer, factory, logger) + balancer := lb.NewRoundRobin(endpointer) retry := lb.Retry(retryMax, retryTimeout, balancer) endpoints.PostAddressEndpoint = retry } { factory := factoryFor(profilesvc.MakeDeleteAddressEndpoint) - subscriber := consul.NewSubscriber(sdclient, factory, logger, consulService, consulTags, passingOnly) - balancer := lb.NewRoundRobin(subscriber) + endpointer := sd.NewEndpointer(instancer, factory, logger) + balancer := lb.NewRoundRobin(endpointer) retry := lb.Retry(retryMax, retryTimeout, balancer) endpoints.DeleteAddressEndpoint = retry } diff --git a/examples/stringsvc3/proxying.go b/examples/stringsvc3/proxying.go index 8d67d38..8b1013f 100644 --- a/examples/stringsvc3/proxying.go +++ b/examples/stringsvc3/proxying.go @@ -40,7 +40,7 @@ // discovery system. var ( instanceList = split(instances) - subscriber sd.FixedSubscriber + endpointer sd.FixedEndpointer ) logger.Log("proxy_to", fmt.Sprint(instanceList)) for _, instance := range instanceList { @@ -48,12 +48,12 @@ e = makeUppercaseProxy(ctx, instance) e = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{}))(e) e = ratelimit.NewTokenBucketLimiter(jujuratelimit.NewBucketWithRate(float64(qps), int64(qps)))(e) - subscriber = append(subscriber, e) + endpointer = append(endpointer, e) } // Now, build a single, retrying, load-balancing endpoint out of all of // those individual endpoints. - balancer := lb.NewRoundRobin(subscriber) + balancer := lb.NewRoundRobin(endpointer) retry := lb.Retry(maxAttempts, maxTime, balancer) // And finally, return the ServiceMiddleware, implemented by proxymw. diff --git a/sd/benchmark_test.go b/sd/benchmark_test.go new file mode 100644 index 0000000..b548e1a --- /dev/null +++ b/sd/benchmark_test.go @@ -0,0 +1,29 @@ +package sd + +import ( + "io" + "testing" + + "github.com/go-kit/kit/endpoint" + "github.com/go-kit/kit/log" +) + +func BenchmarkEndpoints(b *testing.B) { + var ( + ca = make(closer) + cb = make(closer) + cmap = map[string]io.Closer{"a": ca, "b": cb} + factory = func(instance string) (endpoint.Endpoint, io.Closer, error) { return endpoint.Nop, cmap[instance], nil } + c = newEndpointCache(factory, log.NewNopLogger(), endpointerOptions{}) + ) + + b.ReportAllocs() + + c.Update(Event{Instances: []string{"a", "b"}}) + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + c.Endpoints() + } + }) +} diff --git a/sd/cache/benchmark_test.go b/sd/cache/benchmark_test.go deleted file mode 100644 index 41f1821..0000000 --- a/sd/cache/benchmark_test.go +++ /dev/null @@ -1,29 +0,0 @@ -package cache - -import ( - "io" - "testing" - - "github.com/go-kit/kit/endpoint" - "github.com/go-kit/kit/log" -) - -func BenchmarkEndpoints(b *testing.B) { - var ( - ca = make(closer) - cb = make(closer) - cmap = map[string]io.Closer{"a": ca, "b": cb} - factory = func(instance string) (endpoint.Endpoint, io.Closer, error) { return endpoint.Nop, cmap[instance], nil } - c = New(factory, log.NewNopLogger()) - ) - - b.ReportAllocs() - - c.Update([]string{"a", "b"}) - - b.RunParallel(func(pb *testing.PB) { - for pb.Next() { - c.Endpoints() - } - }) -} diff --git a/sd/cache/cache.go b/sd/cache/cache.go deleted file mode 100644 index 82af86b..0000000 --- a/sd/cache/cache.go +++ /dev/null @@ -1,96 +0,0 @@ -package cache - -import ( - "io" - "sort" - "sync" - "sync/atomic" - - "github.com/go-kit/kit/endpoint" - "github.com/go-kit/kit/log" - "github.com/go-kit/kit/sd" -) - -// Cache collects the most recent set of endpoints from a service discovery -// system via a subscriber, and makes them available to consumers. Cache is -// meant to be embedded inside of a concrete subscriber, and can serve Service -// invocations directly. -type Cache struct { - mtx sync.RWMutex - factory sd.Factory - cache map[string]endpointCloser - slice atomic.Value // []endpoint.Endpoint - logger log.Logger -} - -type endpointCloser struct { - endpoint.Endpoint - io.Closer -} - -// New returns a new, empty endpoint cache. -func New(factory sd.Factory, logger log.Logger) *Cache { - return &Cache{ - factory: factory, - cache: map[string]endpointCloser{}, - logger: logger, - } -} - -// Update should be invoked by clients with a complete set of current instance -// strings whenever that set changes. The cache manufactures new endpoints via -// the factory, closes old endpoints when they disappear, and persists existing -// endpoints if they survive through an update. -func (c *Cache) Update(instances []string) { - c.mtx.Lock() - defer c.mtx.Unlock() - - // Deterministic order (for later). - sort.Strings(instances) - - // Produce the current set of services. - cache := make(map[string]endpointCloser, len(instances)) - for _, instance := range instances { - // If it already exists, just copy it over. - if sc, ok := c.cache[instance]; ok { - cache[instance] = sc - delete(c.cache, instance) - continue - } - - // If it doesn't exist, create it. - service, closer, err := c.factory(instance) - if err != nil { - c.logger.Log("instance", instance, "err", err) - continue - } - cache[instance] = endpointCloser{service, closer} - } - - // Close any leftover endpoints. - for _, sc := range c.cache { - if sc.Closer != nil { - sc.Closer.Close() - } - } - - // Populate the slice of endpoints. - slice := make([]endpoint.Endpoint, 0, len(cache)) - for _, instance := range instances { - // A bad factory may mean an instance is not present. - if _, ok := cache[instance]; !ok { - continue - } - slice = append(slice, cache[instance].Endpoint) - } - - // Swap and trigger GC for old copies. - c.slice.Store(slice) - c.cache = cache -} - -// Endpoints yields the current set of (presumably identical) endpoints, ordered -// lexicographically by the corresponding instance string. -func (c *Cache) Endpoints() []endpoint.Endpoint { - return c.slice.Load().([]endpoint.Endpoint) -} diff --git a/sd/cache/cache_test.go b/sd/cache/cache_test.go deleted file mode 100644 index be9abaf..0000000 --- a/sd/cache/cache_test.go +++ /dev/null @@ -1,91 +0,0 @@ -package cache - -import ( - "errors" - "io" - "testing" - "time" - - "github.com/go-kit/kit/endpoint" - "github.com/go-kit/kit/log" -) - -func TestCache(t *testing.T) { - var ( - ca = make(closer) - cb = make(closer) - c = map[string]io.Closer{"a": ca, "b": cb} - f = func(instance string) (endpoint.Endpoint, io.Closer, error) { return endpoint.Nop, c[instance], nil } - cache = New(f, log.NewNopLogger()) - ) - - // Populate - cache.Update([]string{"a", "b"}) - select { - case <-ca: - t.Errorf("endpoint a closed, not good") - case <-cb: - t.Errorf("endpoint b closed, not good") - case <-time.After(time.Millisecond): - t.Logf("no closures yet, good") - } - if want, have := 2, len(cache.Endpoints()); want != have { - t.Errorf("want %d, have %d", want, have) - } - - // Duplicate, should be no-op - cache.Update([]string{"a", "b"}) - select { - case <-ca: - t.Errorf("endpoint a closed, not good") - case <-cb: - t.Errorf("endpoint b closed, not good") - case <-time.After(time.Millisecond): - t.Logf("no closures yet, good") - } - if want, have := 2, len(cache.Endpoints()); want != have { - t.Errorf("want %d, have %d", want, have) - } - - // Delete b - go cache.Update([]string{"a"}) - select { - case <-ca: - t.Errorf("endpoint a closed, not good") - case <-cb: - t.Logf("endpoint b closed, good") - case <-time.After(time.Second): - t.Errorf("didn't close the deleted instance in time") - } - if want, have := 1, len(cache.Endpoints()); want != have { - t.Errorf("want %d, have %d", want, have) - } - - // Delete a - go cache.Update([]string{}) - select { - // case <-cb: will succeed, as it's closed - case <-ca: - t.Logf("endpoint a closed, good") - case <-time.After(time.Second): - t.Errorf("didn't close the deleted instance in time") - } - if want, have := 0, len(cache.Endpoints()); want != have { - t.Errorf("want %d, have %d", want, have) - } -} - -func TestBadFactory(t *testing.T) { - cache := New(func(string) (endpoint.Endpoint, io.Closer, error) { - return nil, nil, errors.New("bad factory") - }, log.NewNopLogger()) - - cache.Update([]string{"foo:1234", "bar:5678"}) - if want, have := 0, len(cache.Endpoints()); want != have { - t.Errorf("want %d, have %d", want, have) - } -} - -type closer chan struct{} - -func (c closer) Close() error { close(c); return nil } diff --git a/sd/cache.go b/sd/cache.go new file mode 100644 index 0000000..af5514c --- /dev/null +++ b/sd/cache.go @@ -0,0 +1,136 @@ +package sd + +import ( + "io" + "sort" + "sync" + "time" + + "github.com/go-kit/kit/endpoint" + "github.com/go-kit/kit/log" +) + +// endpointCache collects the most recent set of instances from a service discovery +// system, creates endpoints for them using a factory function, and makes +// them available to consumers. +type endpointCache struct { + options endpointerOptions + mtx sync.RWMutex + factory Factory + cache map[string]endpointCloser + err error + endpoints []endpoint.Endpoint + logger log.Logger + invalidateDeadline time.Time +} + +type endpointCloser struct { + endpoint.Endpoint + io.Closer +} + +// newEndpointCache returns a new, empty endpointCache. +func newEndpointCache(factory Factory, logger log.Logger, options endpointerOptions) *endpointCache { + return &endpointCache{ + options: options, + factory: factory, + cache: map[string]endpointCloser{}, + logger: logger, + } +} + +// Update should be invoked by clients with a complete set of current instance +// strings whenever that set changes. The cache manufactures new endpoints via +// the factory, closes old endpoints when they disappear, and persists existing +// endpoints if they survive through an update. +func (c *endpointCache) Update(event Event) { + c.mtx.Lock() + defer c.mtx.Unlock() + + if event.Err == nil { + c.updateCache(event.Instances) + c.invalidateDeadline = time.Time{} + c.err = nil + } + + c.logger.Log("err", event.Err) + + if c.options.invalidateOnErrorTimeout == nil { + // keep returning the last known endpoints on error + return + } + + c.err = event.Err + + if !c.invalidateDeadline.IsZero() { + // aleady in the error state, do nothing + return + } + // set new deadline to invalidate Endpoints unless non-error Event is received + c.invalidateDeadline = time.Now().Add(*c.options.invalidateOnErrorTimeout) + return +} + +func (c *endpointCache) updateCache(instances []string) { + // Deterministic order (for later). + sort.Strings(instances) + + // Produce the current set of services. + cache := make(map[string]endpointCloser, len(instances)) + for _, instance := range instances { + // If it already exists, just copy it over. + if sc, ok := c.cache[instance]; ok { + cache[instance] = sc + delete(c.cache, instance) + continue + } + + // If it doesn't exist, create it. + service, closer, err := c.factory(instance) + if err != nil { + c.logger.Log("instance", instance, "err", err) + continue + } + cache[instance] = endpointCloser{service, closer} + } + + // Close any leftover endpoints. + for _, sc := range c.cache { + if sc.Closer != nil { + sc.Closer.Close() + } + } + + // Populate the slice of endpoints. + endpoints := make([]endpoint.Endpoint, 0, len(cache)) + for _, instance := range instances { + // A bad factory may mean an instance is not present. + if _, ok := cache[instance]; !ok { + continue + } + endpoints = append(endpoints, cache[instance].Endpoint) + } + + // Swap and trigger GC for old copies. + c.endpoints = endpoints + c.cache = cache +} + +// Endpoints yields the current set of (presumably identical) endpoints, ordered +// lexicographically by the corresponding instance string. +func (c *endpointCache) Endpoints() ([]endpoint.Endpoint, error) { + c.mtx.RLock() + + if c.err == nil || time.Now().Before(c.invalidateDeadline) { + defer c.mtx.RUnlock() + return c.endpoints, nil + } + + c.mtx.RUnlock() + c.mtx.Lock() + defer c.mtx.Unlock() + + c.updateCache(nil) // close any remaining active endpoints + + return nil, c.err +} diff --git a/sd/cache_test.go b/sd/cache_test.go new file mode 100644 index 0000000..8ab47aa --- /dev/null +++ b/sd/cache_test.go @@ -0,0 +1,92 @@ +package sd + +import ( + "errors" + "io" + "testing" + "time" + + "github.com/go-kit/kit/endpoint" + "github.com/go-kit/kit/log" +) + +func TestCache(t *testing.T) { + var ( + ca = make(closer) + cb = make(closer) + c = map[string]io.Closer{"a": ca, "b": cb} + f = func(instance string) (endpoint.Endpoint, io.Closer, error) { return endpoint.Nop, c[instance], nil } + cache = newEndpointCache(f, log.NewNopLogger(), endpointerOptions{}) + ) + + // Populate + cache.Update(Event{Instances: []string{"a", "b"}}) + select { + case <-ca: + t.Errorf("endpoint a closed, not good") + case <-cb: + t.Errorf("endpoint b closed, not good") + case <-time.After(time.Millisecond): + t.Logf("no closures yet, good") + } + assertEndpointsLen(t, cache, 2) + + // Duplicate, should be no-op + cache.Update(Event{Instances: []string{"a", "b"}}) + select { + case <-ca: + t.Errorf("endpoint a closed, not good") + case <-cb: + t.Errorf("endpoint b closed, not good") + case <-time.After(time.Millisecond): + t.Logf("no closures yet, good") + } + assertEndpointsLen(t, cache, 2) + + // Delete b + go cache.Update(Event{Instances: []string{"a"}}) + select { + case <-ca: + t.Errorf("endpoint a closed, not good") + case <-cb: + t.Logf("endpoint b closed, good") + case <-time.After(time.Second): + t.Errorf("didn't close the deleted instance in time") + } + assertEndpointsLen(t, cache, 1) + + // Delete a + go cache.Update(Event{Instances: []string{}}) + select { + // case <-cb: will succeed, as it's closed + case <-ca: + t.Logf("endpoint a closed, good") + case <-time.After(time.Second): + t.Errorf("didn't close the deleted instance in time") + } + assertEndpointsLen(t, cache, 0) +} + +func TestBadFactory(t *testing.T) { + cache := newEndpointCache(func(string) (endpoint.Endpoint, io.Closer, error) { + return nil, nil, errors.New("bad factory") + }, log.NewNopLogger(), endpointerOptions{}) + + cache.Update(Event{Instances: []string{"foo:1234", "bar:5678"}}) + assertEndpointsLen(t, cache, 0) +} + +func assertEndpointsLen(t *testing.T, cache *endpointCache, l int) { + endpoints, err := cache.Endpoints() + if err != nil { + t.Errorf("unexpected error %v", err) + return + } + if want, have := l, len(endpoints); want != have { + t.Errorf("want %d, have %d", want, have) + } +} + +type closer chan struct{} + +func (c closer) Close() error { close(c); return nil } diff --git a/sd/consul/doc.go b/sd/consul/doc.go index ae51c44..b3b475f 100644 --- a/sd/consul/doc.go +++ b/sd/consul/doc.go @@ -1,2 +1,2 @@ -// Package consul provides subscriber and registrar implementations for Consul. +// Package consul provides Instancer and Registrar implementations for Consul. package consul diff --git a/sd/consul/instancer.go b/sd/consul/instancer.go new file mode 100644 index 0000000..a7e37c0 --- /dev/null +++ b/sd/consul/instancer.go @@ -0,0 +1,157 @@ +package consul + +import ( + "fmt" + "io" + + consul "github.com/hashicorp/consul/api" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/sd" + "github.com/go-kit/kit/sd/internal/instance" +) + +const defaultIndex = 0 + +// Instancer yields instances for a service in Consul. +type Instancer struct { + instance.Cache + client Client + logger log.Logger + service string + tags []string + passingOnly bool + quitc chan struct{} +} + +// NewInstancer returns a Consul instancer that publishes instances for the +// requested service. It only returns instances for which all of the passed tags +// are present. +func NewInstancer(client Client, logger log.Logger, service string, tags []string, passingOnly bool) *Instancer { + s := &Instancer{ + Cache: *instance.NewCache(), + client: client, + logger: log.With(logger, "service", service, "tags", fmt.Sprint(tags)), + service: service, + tags: tags, + passingOnly: passingOnly, + quitc: make(chan struct{}), + } + + instances, index, err := s.getInstances(defaultIndex, nil) + if err == nil { + s.logger.Log("instances", len(instances)) + } else { + s.logger.Log("err", err) + } + + s.Update(sd.Event{Instances: instances, Err: err}) + go s.loop(index) + return s +} + +// Stop terminates the instancer. +func (s *Instancer) Stop() { + close(s.quitc) +} + +func (s *Instancer) loop(lastIndex uint64) { + var ( + instances []string + err error + ) + for { + instances, lastIndex, err = s.getInstances(lastIndex, s.quitc) + switch { + case err == io.EOF: + return // stopped via quitc + case err != nil: + s.logger.Log("err", err) + s.Update(sd.Event{Err: err}) + default: + s.Update(sd.Event{Instances: instances}) + } + } +} + +func (s *Instancer) getInstances(lastIndex uint64, interruptc chan struct{}) ([]string, uint64, error) { + tag := "" + if len(s.tags) > 0 { + tag = s.tags[0] + } + + // Consul doesn't support more than one tag in its service query method. + // https://github.com/hashicorp/consul/issues/294 + // Hashi suggest prepared queries, but they don't support blocking. + // https://www.consul.io/docs/agent/http/query.html#execute + // If we want blocking for efficiency, we must filter tags manually. + + type response struct { + instances []string + index uint64 + } + + var ( + errc = make(chan error, 1) + resc = make(chan response, 1) + ) + + go func() { + entries, meta, err := s.client.Service(s.service, tag, s.passingOnly, &consul.QueryOptions{ + WaitIndex: lastIndex, + }) + if err != nil { + errc <- err + return + } + if len(s.tags) > 1 { + entries = filterEntries(entries, s.tags[1:]...) + } + resc <- response{ + instances: makeInstances(entries), + index: meta.LastIndex, + } + }() + + select { + case err := <-errc: + return nil, 0, err + case res := <-resc: + return res.instances, res.index, nil + case <-interruptc: + return nil, 0, io.EOF + } +} + +func filterEntries(entries []*consul.ServiceEntry, tags ...string) []*consul.ServiceEntry { + var es []*consul.ServiceEntry + +ENTRIES: + for _, entry := range entries { + ts := make(map[string]struct{}, len(entry.Service.Tags)) + for _, tag := range entry.Service.Tags { + ts[tag] = struct{}{} + } + + for _, tag := range tags { + if _, ok := ts[tag]; !ok { + continue ENTRIES + } + } + es = append(es, entry) + } + + return es +} + +func makeInstances(entries []*consul.ServiceEntry) []string { + instances := make([]string, len(entries)) + for i, entry := range entries { + addr := entry.Node.Address + if entry.Service.Address != "" { + addr = entry.Service.Address + } + instances[i] = fmt.Sprintf("%s:%d", addr, entry.Service.Port) + } + return instances +} diff --git a/sd/consul/instancer_test.go b/sd/consul/instancer_test.go new file mode 100644 index 0000000..2ca52f0 --- /dev/null +++ b/sd/consul/instancer_test.go @@ -0,0 +1,133 @@ +package consul + +import ( + "context" + "testing" + + consul "github.com/hashicorp/consul/api" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/sd" +) + +var _ sd.Instancer = &Instancer{} // API check + +var consulState = []*consul.ServiceEntry{ + { + Node: &consul.Node{ + Address: "10.0.0.0", + Node: "app00.local", + }, + Service: &consul.AgentService{ + ID: "search-api-0", + Port: 8000, + Service: "search", + Tags: []string{ + "api", + "v1", + }, + }, + }, + { + Node: &consul.Node{ + Address: "10.0.0.1", + Node: "app01.local", + }, + Service: &consul.AgentService{ + ID: "search-api-1", + Port: 8001, + Service: "search", + Tags: []string{ + "api", + "v2", + }, + }, + }, + { + Node: &consul.Node{ + Address: "10.0.0.1", + Node: "app01.local", + }, + Service: &consul.AgentService{ + Address: "10.0.0.10", + ID: "search-db-0", + Port: 9000, + Service: "search", + Tags: []string{ + "db", + }, + }, + }, +} + +func TestInstancer(t *testing.T) { + var ( + logger = log.NewNopLogger() + client = newTestClient(consulState) + ) + + s := NewInstancer(client, logger, "search", []string{"api"}, true) + defer s.Stop() + + state := s.State() + if want, have := 2, len(state.Instances); want != have { + t.Errorf("want %d, have %d", want, have) + } +} + +func TestInstancerNoService(t *testing.T) { + var ( + logger = log.NewNopLogger() + client = newTestClient(consulState) + ) + + s := NewInstancer(client, logger, "feed", []string{}, true) + defer s.Stop() + + state := s.State() + if want, have := 0, len(state.Instances); want != have { + t.Fatalf("want %d, have %d", want, have) + } +} + +func TestInstancerWithTags(t *testing.T) { + var ( + logger = log.NewNopLogger() + client = newTestClient(consulState) + ) + + s := NewInstancer(client, logger, "search", []string{"api", "v2"}, true) + defer s.Stop() + + state := s.State() + if want, have := 1, len(state.Instances); want != have { + t.Fatalf("want %d, have %d", want, have) + } +} + +func TestInstancerAddressOverride(t *testing.T) { + s := NewInstancer(newTestClient(consulState), log.NewNopLogger(), "search", []string{"db"}, true) + defer s.Stop() + + state := s.State() + if want, have := 1, len(state.Instances); want != have { + t.Fatalf("want %d, have %d", want, have) + } + + endpoint, closer, err := testFactory(state.Instances[0]) + if err != nil { + t.Fatal(err) + } + if closer != nil { + defer closer.Close() + } + + response, err := endpoint(context.Background(), struct{}{}) + if err != nil { + t.Fatal(err) + } + + if want, have := "10.0.0.10:9000", response.(string); want != have { + t.Errorf("want %q, have %q", want, have) + } +} diff --git a/sd/consul/integration_test.go b/sd/consul/integration_test.go index 3b5e79f..95185d8 100644 --- a/sd/consul/integration_test.go +++ b/sd/consul/integration_test.go @@ -10,6 +10,7 @@ "github.com/go-kit/kit/endpoint" "github.com/go-kit/kit/log" + "github.com/go-kit/kit/sd" stdconsul "github.com/hashicorp/consul/api" ) @@ -38,24 +39,28 @@ // skipping check(s) } - // Build a subscriber on r.Name + r.Tags. + // Build an Instancer on r.Name + r.Tags. factory := func(instance string) (endpoint.Endpoint, io.Closer, error) { t.Logf("factory invoked for %q", instance) return endpoint.Nop, nil, nil } - subscriber := NewSubscriber( + instancer := NewInstancer( client, - factory, - log.With(logger, "component", "subscriber"), + log.With(logger, "component", "instancer"), r.Name, r.Tags, true, + ) + endpointer := sd.NewEndpointer( + instancer, + factory, + log.With(logger, "component", "endpointer"), ) time.Sleep(time.Second) // Before we publish, we should have no endpoints. - endpoints, err := subscriber.Endpoints() + endpoints, err := endpointer.Endpoints() if err != nil { t.Error(err) } @@ -71,7 +76,7 @@ time.Sleep(time.Second) // Now we should have one active endpoints. - endpoints, err = subscriber.Endpoints() + endpoints, err = endpointer.Endpoints() if err != nil { t.Error(err) } diff --git a/sd/consul/subscriber.go b/sd/consul/subscriber.go deleted file mode 100644 index 30922e2..0000000 --- a/sd/consul/subscriber.go +++ /dev/null @@ -1,166 +0,0 @@ -package consul - -import ( - "fmt" - "io" - - consul "github.com/hashicorp/consul/api" - - "github.com/go-kit/kit/endpoint" - "github.com/go-kit/kit/log" - "github.com/go-kit/kit/sd" - "github.com/go-kit/kit/sd/cache" -) - -const defaultIndex = 0 - -// Subscriber yields endpoints for a service in Consul. Updates to the service -// are watched and will update the Subscriber endpoints. -type Subscriber struct { - cache *cache.Cache - client Client - logger log.Logger - service string - tags []string - passingOnly bool - endpointsc chan []endpoint.Endpoint - quitc chan struct{} -} - -var _ sd.Subscriber = &Subscriber{} - -// NewSubscriber returns a Consul subscriber which returns endpoints for the -// requested service. It only returns instances for which all of the passed tags -// are present. -func NewSubscriber(client Client, factory sd.Factory, logger log.Logger, service string, tags []string, passingOnly bool) *Subscriber { - s := &Subscriber{ - cache: cache.New(factory, logger), - client: client, - logger: log.With(logger, "service", service, "tags", fmt.Sprint(tags)), - service: service, - tags: tags, - passingOnly: passingOnly, - quitc: make(chan struct{}), - } - - instances, index, err := s.getInstances(defaultIndex, nil) - if err == nil { - s.logger.Log("instances", len(instances)) - } else { - s.logger.Log("err", err) - } - - s.cache.Update(instances) - go s.loop(index) - return s -} - -// Endpoints implements the Subscriber interface. -func (s *Subscriber) Endpoints() ([]endpoint.Endpoint, error) { - return s.cache.Endpoints(), nil -} - -// Stop terminates the subscriber. -func (s *Subscriber) Stop() { - close(s.quitc) -} - -func (s *Subscriber) loop(lastIndex uint64) { - var ( - instances []string - err error - ) - for { - instances, lastIndex, err = s.getInstances(lastIndex, s.quitc) - switch { - case err == io.EOF: - return // stopped via quitc - case err != nil: - s.logger.Log("err", err) - default: - s.cache.Update(instances) - } - } -} - -func (s *Subscriber) getInstances(lastIndex uint64, interruptc chan struct{}) ([]string, uint64, error) { - tag := "" - if len(s.tags) > 0 { - tag = s.tags[0] - } - - // Consul doesn't support more than one tag in its service query method. - // https://github.com/hashicorp/consul/issues/294 - // Hashi suggest prepared queries, but they don't support blocking. - // https://www.consul.io/docs/agent/http/query.html#execute - // If we want blocking for efficiency, we must filter tags manually. - - type response struct { - instances []string - index uint64 - } - - var ( - errc = make(chan error, 1) - resc = make(chan response, 1) - ) - - go func() { - entries, meta, err := s.client.Service(s.service, tag, s.passingOnly, &consul.QueryOptions{ - WaitIndex: lastIndex, - }) - if err != nil { - errc <- err - return - } - if len(s.tags) > 1 { - entries = filterEntries(entries, s.tags[1:]...) - } - resc <- response{ - instances: makeInstances(entries), - index: meta.LastIndex, - } - }() - - select { - case err := <-errc: - return nil, 0, err - case res := <-resc: - return res.instances, res.index, nil - case <-interruptc: - return nil, 0, io.EOF - } -} - -func filterEntries(entries []*consul.ServiceEntry, tags ...string) []*consul.ServiceEntry { - var es []*consul.ServiceEntry - -ENTRIES: - for _, entry := range entries { - ts := make(map[string]struct{}, len(entry.Service.Tags)) - for _, tag := range entry.Service.Tags { - ts[tag] = struct{}{} - } - - for _, tag := range tags { - if _, ok := ts[tag]; !ok { - continue ENTRIES - } - } - es = append(es, entry) - } - - return es -} - -func makeInstances(entries []*consul.ServiceEntry) []string { - instances := make([]string, len(entries)) - for i, entry := range entries { - addr := entry.Node.Address - if entry.Service.Address != "" { - addr = entry.Service.Address - } - instances[i] = fmt.Sprintf("%s:%d", addr, entry.Service.Port) - } - return instances -} diff --git a/sd/consul/subscriber_test.go b/sd/consul/subscriber_test.go deleted file mode 100644 index da00458..0000000 --- a/sd/consul/subscriber_test.go +++ /dev/null @@ -1,138 +0,0 @@ -package consul - -import ( - "context" - "testing" - - consul "github.com/hashicorp/consul/api" - - "github.com/go-kit/kit/log" -) - -var consulState = []*consul.ServiceEntry{ - { - Node: &consul.Node{ - Address: "10.0.0.0", - Node: "app00.local", - }, - Service: &consul.AgentService{ - ID: "search-api-0", - Port: 8000, - Service: "search", - Tags: []string{ - "api", - "v1", - }, - }, - }, - { - Node: &consul.Node{ - Address: "10.0.0.1", - Node: "app01.local", - }, - Service: &consul.AgentService{ - ID: "search-api-1", - Port: 8001, - Service: "search", - Tags: []string{ - "api", - "v2", - }, - }, - }, - { - Node: &consul.Node{ - Address: "10.0.0.1", - Node: "app01.local", - }, - Service: &consul.AgentService{ - Address: "10.0.0.10", - ID: "search-db-0", - Port: 9000, - Service: "search", - Tags: []string{ - "db", - }, - }, - }, -} - -func TestSubscriber(t *testing.T) { - var ( - logger = log.NewNopLogger() - client = newTestClient(consulState) - ) - - s := NewSubscriber(client, testFactory, logger, "search", []string{"api"}, true) - defer s.Stop() - - endpoints, err := s.Endpoints() - if err != nil { - t.Fatal(err) - } - - if want, have := 2, len(endpoints); want != have { - t.Errorf("want %d, have %d", want, have) - } -} - -func TestSubscriberNoService(t *testing.T) { - var ( - logger = log.NewNopLogger() - client = newTestClient(consulState) - ) - - s := NewSubscriber(client, testFactory, logger, "feed", []string{}, true) - defer s.Stop() - - endpoints, err := s.Endpoints() - if err != nil { - t.Fatal(err) - } - - if want, have := 0, len(endpoints); want != have { - t.Fatalf("want %d, have %d", want, have) - } -} - -func TestSubscriberWithTags(t *testing.T) { - var ( - logger = log.NewNopLogger() - client = newTestClient(consulState) - ) - - s := NewSubscriber(client, testFactory, logger, "search", []string{"api", "v2"}, true) - defer s.Stop() - - endpoints, err := s.Endpoints() - if err != nil { - t.Fatal(err) - } - - if want, have := 1, len(endpoints); want != have { - t.Fatalf("want %d, have %d", want, have) - } -} - -func TestSubscriberAddressOverride(t *testing.T) { - s := NewSubscriber(newTestClient(consulState), testFactory, log.NewNopLogger(), "search", []string{"db"}, true) - defer s.Stop() - - endpoints, err := s.Endpoints() - if err != nil { - t.Fatal(err) - } - - if want, have := 1, len(endpoints); want != have { - t.Fatalf("want %d, have %d", want, have) - } - - response, err := endpoints[0](context.Background(), struct{}{}) - if err != nil { - t.Fatal(err) - } - - if want, have := "10.0.0.10:9000", response.(string); want != have { - t.Errorf("want %q, have %q", want, have) - } -} diff --git a/sd/dnssrv/doc.go b/sd/dnssrv/doc.go index 7bc1ad4..0301487 100644 --- a/sd/dnssrv/doc.go +++ b/sd/dnssrv/doc.go @@ -1,2 +1,2 @@ -// Package dnssrv provides a subscriber implementation for DNS SRV records. +// Package dnssrv provides an Instancer implementation for DNS SRV records. package dnssrv diff --git a/sd/dnssrv/instancer.go b/sd/dnssrv/instancer.go new file mode 100644 index 0000000..33efeda --- /dev/null +++ b/sd/dnssrv/instancer.go @@ -0,0 +1,93 @@ +package dnssrv + +import ( + "fmt" + "net" + "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/sd" + "github.com/go-kit/kit/sd/internal/instance" +) + +// Instancer yields instances from the named DNS SRV record. The name is +// resolved on a fixed schedule. Priorities and weights are ignored. +type Instancer struct { + instance.Cache + name string + logger log.Logger + quit chan struct{} +} + +// NewInstancer returns a DNS SRV instancer. +func NewInstancer( + name string, + ttl time.Duration, + logger log.Logger, +) *Instancer { + return NewInstancerDetailed(name, time.NewTicker(ttl), net.LookupSRV, logger) +} + +// NewInstancerDetailed is the same as NewInstancer, but allows users to +// provide an explicit lookup refresh ticker instead of a TTL, and specify the +// lookup function instead of using net.LookupSRV. +func NewInstancerDetailed( + name string, + refresh *time.Ticker, + lookup Lookup, + logger log.Logger, +) *Instancer { + p := &Instancer{ + Cache: *instance.NewCache(), + name: name, + logger: logger, + quit: make(chan struct{}), + } + + instances, err := p.resolve(lookup) + if err == nil { + logger.Log("name", name, "instances", len(instances)) + } else { + logger.Log("name", name, "err", err) + } + p.Update(sd.Event{Instances: instances, Err: err}) + + go p.loop(refresh, lookup) + return p +} + +// Stop terminates the Instancer. +func (p *Instancer) Stop() { + close(p.quit) +} + +func (p *Instancer) loop(t *time.Ticker, lookup Lookup) { + defer t.Stop() + for { + select { + case <-t.C: + instances, err := p.resolve(lookup) + if err != nil { + p.logger.Log("name", p.name, "err", err) + p.Update(sd.Event{Err: err}) + continue // don't replace potentially-good with bad + } + p.Update(sd.Event{Instances: instances}) + + case <-p.quit: + return + } + } +} + +func (p *Instancer) resolve(lookup Lookup) ([]string, error) { + _, addrs, err := lookup("", "", p.name) + if err != nil { + return nil, err + } + instances := make([]string, len(addrs)) + for i, addr := range addrs { + instances[i] = net.JoinHostPort(addr.Target, fmt.Sprint(addr.Port)) + } + return instances, nil +} diff --git a/sd/dnssrv/instancer_test.go b/sd/dnssrv/instancer_test.go new file mode 100644 index 0000000..3600e6e --- /dev/null +++ b/sd/dnssrv/instancer_test.go @@ -0,0 +1,73 @@ +package dnssrv + +import ( + "net" + "sync/atomic" + "testing" + "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/sd" +) + +var _ sd.Instancer = &Instancer{} // API check + +func TestRefresh(t *testing.T) { + name := "some.service.internal" + + ticker := time.NewTicker(time.Second) + ticker.Stop() + tickc := make(chan time.Time) + ticker.C = tickc + + var lookups uint64 + records := []*net.SRV{} + lookup := func(service, proto, name string) (string, []*net.SRV, error) { + t.Logf("lookup(%q, %q, %q)", service, proto, name) + atomic.AddUint64(&lookups, 1) + return "cname", records, nil + } + + instancer := NewInstancerDetailed(name, ticker, lookup, log.NewNopLogger()) + defer instancer.Stop() + + // First lookup, empty + state := instancer.State() + if state.Err != nil { + t.Error(state.Err) + } + if want, have := 0, len(state.Instances); want != have { + t.Errorf("want %d, have %d", want, have) + } + if want, have := uint64(1), atomic.LoadUint64(&lookups); want != have { + t.Errorf("want %d, have %d", want, have) + } + + // Load some records and lookup again + records = []*net.SRV{ + {Target: "1.0.0.1", Port: 1001}, + {Target: "1.0.0.2", Port: 1002}, + {Target: "1.0.0.3", Port: 1003}, + } + tickc <- time.Now() + + // There is a race condition where the instancer.State call below + // invokes the cache before it is updated by the tick above. + // TODO(pb): solve by running the read through the loop goroutine. + time.Sleep(100 * time.Millisecond) + + state = instancer.State() + if state.Err != nil { + t.Error(state.Err) + } + if want, have := 3, len(state.Instances); want != have { + t.Errorf("want %d, have %d", want, have) + } + if want, have := uint64(2), atomic.LoadUint64(&lookups); want != have { + t.Errorf("want %d, have %d", want, have) + } +} + +type nopCloser struct{} + +func (nopCloser) Close() error { return nil } diff --git a/sd/dnssrv/subscriber.go b/sd/dnssrv/subscriber.go deleted file mode 100644 index 422fdaa..0000000 --- a/sd/dnssrv/subscriber.go +++ /dev/null @@ -1,100 +0,0 @@ -package dnssrv - -import ( - "fmt" - "net" - "time" - - "github.com/go-kit/kit/endpoint" - "github.com/go-kit/kit/log" - "github.com/go-kit/kit/sd" - "github.com/go-kit/kit/sd/cache" -) - -// Subscriber yields endpoints taken from the named DNS SRV record. The name is -// resolved on a fixed schedule. Priorities and weights are ignored. -type Subscriber struct { - name string - cache *cache.Cache - logger log.Logger - quit chan struct{} -} - -// NewSubscriber returns a DNS SRV subscriber. -func NewSubscriber( - name string, - ttl time.Duration, - factory sd.Factory, - logger log.Logger, -) *Subscriber { - return NewSubscriberDetailed(name, time.NewTicker(ttl), net.LookupSRV, factory, logger) -} - -// NewSubscriberDetailed is the same as NewSubscriber, but allows users to -// provide an explicit lookup refresh ticker instead of a TTL, and specify the -// lookup function instead of using net.LookupSRV. -func NewSubscriberDetailed( - name string, - refresh *time.Ticker, - lookup Lookup, - factory sd.Factory, - logger log.Logger, -) *Subscriber { - p := &Subscriber{ - name: name, - cache: cache.New(factory, logger), - logger: logger, - quit: make(chan struct{}), - } - - instances, err := p.resolve(lookup) - if err == nil { - logger.Log("name", name, "instances", len(instances)) - } else { - logger.Log("name", name, "err", err) - } - p.cache.Update(instances) - - go p.loop(refresh, lookup) - return p -} - -// Stop terminates the Subscriber. -func (p *Subscriber) Stop() { - close(p.quit) -} - -func (p *Subscriber) loop(t *time.Ticker, lookup Lookup) { - defer t.Stop() - for { - select { - case <-t.C: - instances, err := p.resolve(lookup) - if err != nil { - p.logger.Log("name", p.name, "err", err) - continue // don't replace potentially-good with bad - } - p.cache.Update(instances) - - case <-p.quit: - return - } - } -} - -// Endpoints implements the Subscriber interface. -func (p *Subscriber) Endpoints() ([]endpoint.Endpoint, error) { - return p.cache.Endpoints(), nil -} - -func (p *Subscriber) resolve(lookup Lookup) ([]string, error) { - _, addrs, err := lookup("", "", p.name) - if err != nil { - return []string{}, err - } - instances := make([]string, len(addrs)) - for i, addr := range addrs { - instances[i] = net.JoinHostPort(addr.Target, fmt.Sprint(addr.Port)) - } - return instances, nil -} diff --git a/sd/dnssrv/subscriber_test.go b/sd/dnssrv/subscriber_test.go deleted file mode 100644 index 0ad522f..0000000 --- a/sd/dnssrv/subscriber_test.go +++ /dev/null @@ -1,85 +0,0 @@ -package dnssrv - -import ( - "io" - "net" - "sync/atomic" - "testing" - "time" - - "github.com/go-kit/kit/endpoint" - "github.com/go-kit/kit/log" -) - -func TestRefresh(t *testing.T) { - name := "some.service.internal" - - ticker := time.NewTicker(time.Second) - ticker.Stop() - tickc := make(chan time.Time) - ticker.C = tickc - - var lookups uint64 - records := []*net.SRV{} - lookup := func(service, proto, name string) (string, []*net.SRV, error) { - t.Logf("lookup(%q, %q, %q)", service, proto, name) - atomic.AddUint64(&lookups, 1) - return "cname", records, nil - } - - var generates uint64 - factory := func(instance string) (endpoint.Endpoint, io.Closer, error) { - t.Logf("factory(%q)", instance) - atomic.AddUint64(&generates, 1) - return endpoint.Nop, nopCloser{}, nil - } - - subscriber := NewSubscriberDetailed(name, ticker, lookup, factory, log.NewNopLogger()) - defer subscriber.Stop() - - // First lookup, empty - endpoints, err := subscriber.Endpoints() - if err != nil { - t.Error(err) - } - if want, have := 0, len(endpoints); want != have { - t.Errorf("want %d, have %d", want, have) - } - if want, have := uint64(1), atomic.LoadUint64(&lookups); want != have { - t.Errorf("want %d, have %d", want, have) - } - if want, have := uint64(0), atomic.LoadUint64(&generates); want != have { - t.Errorf("want %d, have %d", want, have) - } - - // Load some records and lookup again - records = []*net.SRV{ - {Target: "1.0.0.1", Port: 1001}, - {Target: "1.0.0.2", Port: 1002}, - {Target: "1.0.0.3", Port: 1003}, - } - tickc <- time.Now() - - // There is a race condition where the subscriber.Endpoints call below - // invokes the cache before it is updated by the tick above. - // TODO(pb): solve by running the read through the loop goroutine. - time.Sleep(100 * time.Millisecond) - - endpoints, err = subscriber.Endpoints() - if err != nil { - t.Error(err) - } - if want, have := 3, len(endpoints); want != have { - t.Errorf("want %d, have %d", want, have) - } - if want, have := uint64(2), atomic.LoadUint64(&lookups); want != have { - t.Errorf("want %d, have %d", want, have) - } - if want, have := uint64(len(records)), atomic.LoadUint64(&generates); want != have { - t.Errorf("want %d, have %d", want, have) - } -} - -type nopCloser struct{} - -func (nopCloser) Close() error { return nil } diff --git a/sd/endpointer.go b/sd/endpointer.go new file mode 100644 index 0000000..e380077 --- /dev/null +++ b/sd/endpointer.go @@ -0,0 +1,79 @@ +package sd + +import ( + "time" + + "github.com/go-kit/kit/endpoint" + "github.com/go-kit/kit/log" +) + +// Endpointer listens to a service discovery system and yields a set of +// identical endpoints on demand. An error indicates a problem with connectivity +// to the service discovery system, or within the system itself; an Endpointer +// may yield no endpoints without error. +type Endpointer interface { + Endpoints() ([]endpoint.Endpoint, error) +} + +// FixedEndpointer yields a fixed set of endpoints. +type FixedEndpointer []endpoint.Endpoint + +// Endpoints implements Endpointer. +func (s FixedEndpointer) Endpoints() ([]endpoint.Endpoint, error) { return s, nil } + +// NewEndpointer creates an Endpointer that subscribes to updates from Instancer src +// and uses factory f to create Endpoints. If src notifies of an error, the Endpointer +// keeps returning previously created Endpoints assuming they are still good, unless +// this behavior is disabled with ResetOnError option. +func NewEndpointer(src Instancer, f Factory, logger log.Logger, options ...EndpointerOption) Endpointer { + opts := endpointerOptions{} + for _, opt := range options { + opt(&opts) + } + se := &simpleEndpointer{ + endpointCache: *newEndpointCache(f, logger, opts), + instancer: src, + ch: make(chan Event), + } + go se.receive() + src.Register(se.ch) + return se +} + +// EndpointerOption allows control of endpointCache behavior. +type EndpointerOption func(*endpointerOptions) + +// InvalidateOnError returns EndpointerOption that controls how the Endpointer +// behaves when then Instancer publishes an Event containing an error. +// Without this option the Endpointer continues returning the last known +// endpoints. With this option, the Endpointer continues returning the last +// known endpoints until the timeout elapses, then closes all active endpoints +// and starts returning an error. Once the Instancer sends a new update with +// valid resource instances, the normal operation is resumed. +func InvalidateOnError(timeout time.Duration) EndpointerOption { + return func(opts *endpointerOptions) { + opts.invalidateOnErrorTimeout = &timeout + } +} + +type endpointerOptions struct { + invalidateOnErrorTimeout *time.Duration +} + +type simpleEndpointer struct { + endpointCache + + instancer Instancer + ch chan Event +} + +func (se *simpleEndpointer) receive() { + for event := range se.ch { + se.Update(event) + } +} + +func (se *simpleEndpointer) Close() { + se.instancer.Deregister(se.ch) + close(se.ch) +} diff --git a/sd/etcd/doc.go b/sd/etcd/doc.go index 11add79..9bd6a5f 100644 --- a/sd/etcd/doc.go +++ b/sd/etcd/doc.go @@ -1,4 +1,4 @@ -// Package etcd provides a Subscriber and Registrar implementation for etcd. If +// Package etcd provides an Instancer and Registrar implementation for etcd. If // you use etcd as your service discovery system, this package will help you // implement the registration and client-side load balancing patterns. package etcd diff --git a/sd/etcd/example_test.go b/sd/etcd/example_test.go index 11b0a56..66db15b 100644 --- a/sd/etcd/example_test.go +++ b/sd/etcd/example_test.go @@ -7,6 +7,7 @@ "github.com/go-kit/kit/endpoint" "github.com/go-kit/kit/log" + "github.com/go-kit/kit/sd" "github.com/go-kit/kit/sd/lb" ) @@ -44,16 +45,18 @@ defer registrar.Deregister() // It's likely that we'll also want to connect to other services and call - // their methods. We can build a subscriber to listen for changes from etcd - // and build endpoints, wrap it with a load-balancer to pick a single + // their methods. We can build an Instancer to listen for changes from etcd, + // create Endpointer, wrap it with a load-balancer to pick a single // endpoint, and finally wrap it with a retry strategy to get something that // can be used as an endpoint directly. barPrefix := "/services/barsvc" - subscriber, err := NewSubscriber(client, barPrefix, barFactory, log.NewNopLogger()) + logger := log.NewNopLogger() + instancer, err := NewInstancer(client, barPrefix, logger) if err != nil { panic(err) } - balancer := lb.NewRoundRobin(subscriber) + endpointer := sd.NewEndpointer(instancer, barFactory, logger) + balancer := lb.NewRoundRobin(endpointer) retry := lb.Retry(3, 3*time.Second, balancer) // And now retry can be used like any other endpoint. diff --git a/sd/etcd/instancer.go b/sd/etcd/instancer.go new file mode 100644 index 0000000..a34f656 --- /dev/null +++ b/sd/etcd/instancer.go @@ -0,0 +1,65 @@ +package etcd + +import ( + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/sd" + "github.com/go-kit/kit/sd/internal/instance" +) + +// Instancer yields instances stored in a certain etcd keyspace. Any kind of +// change in that keyspace is watched and will update the Instancer's Instancers. +type Instancer struct { + instance.Cache + client Client + prefix string + logger log.Logger + quitc chan struct{} +} + +// NewInstancer returns an etcd instancer. It will start watching the given +// prefix for changes, and update the subscribers. +func NewInstancer(c Client, prefix string, logger log.Logger) (*Instancer, error) { + s := &Instancer{ + client: c, + prefix: prefix, + Cache: *instance.NewCache(), + logger: logger, + quitc: make(chan struct{}), + } + + instances, err := s.client.GetEntries(s.prefix) + if err == nil { + logger.Log("prefix", s.prefix, "instances", len(instances)) + } else { + logger.Log("prefix", s.prefix, "err", err) + } + s.Update(sd.Event{Instances: instances, Err: err}) + + go s.loop() + return s, nil +} + +func (s *Instancer) loop() { + ch := make(chan struct{}) + go s.client.WatchPrefix(s.prefix, ch) + for { + select { + case <-ch: + instances, err := s.client.GetEntries(s.prefix) + if err != nil { + s.logger.Log("msg", "failed to retrieve entries", "err", err) + s.Update(sd.Event{Err: err}) + continue + } + s.Update(sd.Event{Instances: instances}) + + case <-s.quitc: + return + } + } +} + +// Stop terminates the Instancer. +func (s *Instancer) Stop() { + close(s.quitc) +} diff --git a/sd/etcd/instancer_test.go b/sd/etcd/instancer_test.go new file mode 100644 index 0000000..f669a0d --- /dev/null +++ b/sd/etcd/instancer_test.go @@ -0,0 +1,68 @@ +package etcd + +import ( + "errors" + "testing" + + stdetcd "github.com/coreos/etcd/client" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/sd" +) + +var ( + node = &stdetcd.Node{ + Key: "/foo", + Nodes: []*stdetcd.Node{ + {Key: "/foo/1", Value: "1:1"}, + {Key: "/foo/2", Value: "1:2"}, + }, + } + fakeResponse = &stdetcd.Response{ + Node: node, + } +) + +var _ sd.Instancer = &Instancer{} // API check + +func TestInstancer(t *testing.T) { + client := &fakeClient{ + responses: map[string]*stdetcd.Response{"/foo": fakeResponse}, + } + + s, err := NewInstancer(client, "/foo", log.NewNopLogger()) + if err != nil { + t.Fatal(err) + } + defer s.Stop() + + if state := s.State(); state.Err != nil { + t.Fatal(state.Err) + } +} + +type fakeClient struct { + responses map[string]*stdetcd.Response +} + +func (c *fakeClient) GetEntries(prefix string) ([]string, error) { + response, ok := c.responses[prefix] + if !ok { + return nil, errors.New("key not exist") + } + + entries := make([]string, len(response.Node.Nodes)) + for i, node := range response.Node.Nodes { + entries[i] = node.Value + } + return entries, nil +} + +func (c *fakeClient) WatchPrefix(prefix string, ch chan struct{}) {} + +func (c *fakeClient) Register(Service) error { + return nil +} +func (c *fakeClient) Deregister(Service) error { + return nil +} diff --git a/sd/etcd/integration_test.go b/sd/etcd/integration_test.go index 86e1a6d..5ec3bac 100644 --- a/sd/etcd/integration_test.go +++ b/sd/etcd/integration_test.go @@ -11,6 +11,7 @@ "github.com/go-kit/kit/endpoint" "github.com/go-kit/kit/log" + "github.com/go-kit/kit/sd" ) // Package sd/etcd provides a wrapper around the etcd key/value store. This @@ -67,24 +68,28 @@ t.Fatalf("want %q, have %q", want, have) } - subscriber, err := NewSubscriber( + instancer, err := NewInstancer( client, prefix, - func(string) (endpoint.Endpoint, io.Closer, error) { return endpoint.Nop, nil, nil }, - log.With(log.NewLogfmtLogger(os.Stderr), "component", "subscriber"), + log.With(log.NewLogfmtLogger(os.Stderr), "component", "instancer"), ) if err != nil { - t.Fatalf("NewSubscriber: %v", err) + t.Fatalf("NewInstancer: %v", err) } - t.Logf("Constructed Subscriber OK") + endpointer := sd.NewEndpointer( + instancer, + func(string) (endpoint.Endpoint, io.Closer, error) { return endpoint.Nop, nil, nil }, + log.With(log.NewLogfmtLogger(os.Stderr), "component", "instancer"), + ) + t.Logf("Constructed Endpointer OK") if !within(time.Second, func() bool { - endpoints, err := subscriber.Endpoints() + endpoints, err := endpointer.Endpoints() return err == nil && len(endpoints) == 1 }) { - t.Fatalf("Subscriber didn't see Register in time") + t.Fatalf("Endpointer didn't see Register in time") } - t.Logf("Subscriber saw Register OK") + t.Logf("Endpointer saw Register OK") // Deregister first instance of test data. registrar.Deregister() @@ -92,11 +97,11 @@ // Check it was deregistered. if !within(time.Second, func() bool { - endpoints, err := subscriber.Endpoints() + endpoints, err := endpointer.Endpoints() t.Logf("Checking Deregister: len(endpoints) = %d, err = %v", len(endpoints), err) return err == nil && len(endpoints) == 0 }) { - t.Fatalf("Subscriber didn't see Deregister in time") + t.Fatalf("Endpointer didn't see Deregister in time") } // Verify test data no longer exists in etcd. diff --git a/sd/etcd/subscriber.go b/sd/etcd/subscriber.go deleted file mode 100644 index 1b91872..0000000 --- a/sd/etcd/subscriber.go +++ /dev/null @@ -1,72 +0,0 @@ -package etcd - -import ( - "github.com/go-kit/kit/endpoint" - "github.com/go-kit/kit/log" - "github.com/go-kit/kit/sd" - "github.com/go-kit/kit/sd/cache" -) - -// Subscriber yield endpoints stored in a certain etcd keyspace. Any kind of -// change in that keyspace is watched and will update the Subscriber endpoints. -type Subscriber struct { - client Client - prefix string - cache *cache.Cache - logger log.Logger - quitc chan struct{} -} - -var _ sd.Subscriber = &Subscriber{} - -// NewSubscriber returns an etcd subscriber. It will start watching the given -// prefix for changes, and update the endpoints. -func NewSubscriber(c Client, prefix string, factory sd.Factory, logger log.Logger) (*Subscriber, error) { - s := &Subscriber{ - client: c, - prefix: prefix, - cache: cache.New(factory, logger), - logger: logger, - quitc: make(chan struct{}), - } - - instances, err := s.client.GetEntries(s.prefix) - if err == nil { - logger.Log("prefix", s.prefix, "instances", len(instances)) - } else { - logger.Log("prefix", s.prefix, "err", err) - } - s.cache.Update(instances) - - go s.loop() - return s, nil -} - -func (s *Subscriber) loop() { - ch := make(chan struct{}) - go s.client.WatchPrefix(s.prefix, ch) - for { - select { - case <-ch: - instances, err := s.client.GetEntries(s.prefix) - if err != nil { - s.logger.Log("msg", "failed to retrieve entries", "err", err) - continue - } - s.cache.Update(instances) - - case <-s.quitc: - return - } - } -} - -// Endpoints implements the Subscriber interface. -func (s *Subscriber) Endpoints() ([]endpoint.Endpoint, error) { - return s.cache.Endpoints(), nil -} - -// Stop terminates the Subscriber. -func (s *Subscriber) Stop() { - close(s.quitc) -} diff --git a/sd/etcd/subscriber_test.go b/sd/etcd/subscriber_test.go deleted file mode 100644 index dad320b..0000000 --- a/sd/etcd/subscriber_test.go +++ /dev/null @@ -1,96 +0,0 @@ -package etcd - -import ( - "errors" - "io" - "testing" - - stdetcd "github.com/coreos/etcd/client" - - "github.com/go-kit/kit/endpoint" - "github.com/go-kit/kit/log" -) - -var ( - node = &stdetcd.Node{ - Key: "/foo", - Nodes: []*stdetcd.Node{ - {Key: "/foo/1", Value: "1:1"}, - {Key: "/foo/2", Value: "1:2"}, - }, - } - fakeResponse = &stdetcd.Response{ - Node: node, - } -) - -func TestSubscriber(t *testing.T) { - factory := func(string) (endpoint.Endpoint, io.Closer, error) { - return endpoint.Nop, nil, nil - } - - client := &fakeClient{ - responses: map[string]*stdetcd.Response{"/foo": fakeResponse}, - } - - s, err := NewSubscriber(client, "/foo", factory, log.NewNopLogger()) - if err != nil { - t.Fatal(err) - } - defer s.Stop() - - if _, err := s.Endpoints(); err != nil { - t.Fatal(err) - } -} - -func TestBadFactory(t *testing.T) { - factory := func(string) (endpoint.Endpoint, io.Closer, error) { - return nil, nil, errors.New("kaboom") - } - - client := &fakeClient{ - responses: map[string]*stdetcd.Response{"/foo": fakeResponse}, - } - - s, err := NewSubscriber(client, "/foo", factory, log.NewNopLogger()) - if err != nil { - t.Fatal(err) - } - defer s.Stop() - - endpoints, err := s.Endpoints() - if err != nil { - t.Fatal(err) - } - - if want, have := 0, len(endpoints); want != have { - t.Errorf("want %d, have %d", want, have) - } -} - -type fakeClient struct { - responses map[string]*stdetcd.Response -} - -func (c *fakeClient) GetEntries(prefix string) ([]string, error) { - response, ok := c.responses[prefix] - if !ok { - return nil, errors.New("key not exist") - } - - entries := make([]string, len(response.Node.Nodes)) - for i, node := range response.Node.Nodes { - entries[i] = node.Value - } - return entries, nil -} - -func (c *fakeClient) WatchPrefix(prefix string, ch chan struct{}) {} - -func (c *fakeClient) Register(Service) error { - return nil -} -func (c *fakeClient) Deregister(Service) error { - return nil -} diff --git a/sd/eureka/doc.go b/sd/eureka/doc.go index d41c352..b153d33 100644 --- a/sd/eureka/doc.go +++ b/sd/eureka/doc.go @@ -1,2 +1,2 @@ -// Package eureka provides subscriber and registrar implementations for Netflix OSS's Eureka +// Package eureka provides Instancer and Registrar implementations for Netflix OSS's Eureka package eureka diff --git a/sd/eureka/instancer.go b/sd/eureka/instancer.go new file mode 100644 index 0000000..ea601b4 --- /dev/null +++ b/sd/eureka/instancer.go @@ -0,0 +1,97 @@ +package eureka + +import ( + "fmt" + + "github.com/hudl/fargo" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/sd" + "github.com/go-kit/kit/sd/internal/instance" +) + +// Instancer yields instances stored in the Eureka registry for the given app. +// Changes in that app are watched and will update the subscribers. +type Instancer struct { + instance.Cache + conn fargoConnection + app string + logger log.Logger + quitc chan chan struct{} +} + +// NewInstancer returns a Eureka Instancer. It will start watching the given +// app string for changes, and update the subscribers accordingly. +func NewInstancer(conn fargoConnection, app string, logger log.Logger) *Instancer { + logger = log.With(logger, "app", app) + + s := &Instancer{ + Cache: *instance.NewCache(), + conn: conn, + app: app, + logger: logger, + quitc: make(chan chan struct{}), + } + + instances, err := s.getInstances() + if err == nil { + s.logger.Log("instances", len(instances)) + } else { + s.logger.Log("during", "getInstances", "err", err) + } + + s.Update(sd.Event{Instances: instances, Err: err}) + go s.loop() + return s +} + +// Stop terminates the Instancer. +func (s *Instancer) Stop() { + q := make(chan struct{}) + s.quitc <- q + <-q + s.quitc = nil +} + +func (s *Instancer) loop() { + var ( + await = false + done = make(chan struct{}) + updatec = s.conn.ScheduleAppUpdates(s.app, await, done) + ) + defer close(done) + + for { + select { + case update := <-updatec: + if update.Err != nil { + s.logger.Log("during", "Update", "err", update.Err) + s.Update(sd.Event{Err: update.Err}) + continue + } + instances := convertFargoAppToInstances(update.App) + s.logger.Log("instances", len(instances)) + s.Update(sd.Event{Instances: instances}) + + case q := <-s.quitc: + close(q) + return + } + } +} + +func (s *Instancer) getInstances() ([]string, error) { + app, err := s.conn.GetApp(s.app) + if err != nil { + return nil, err + } + return convertFargoAppToInstances(app), nil +} + +func convertFargoAppToInstances(app *fargo.Application) []string { + instances := make([]string, len(app.Instances)) + for i, inst := range app.Instances { + instances[i] = fmt.Sprintf("%s:%d", inst.IPAddr, inst.Port) + } + return instances +} diff --git a/sd/eureka/instancer_test.go b/sd/eureka/instancer_test.go new file mode 100644 index 0000000..2fb0059 --- /dev/null +++ b/sd/eureka/instancer_test.go @@ -0,0 +1,152 @@ +package eureka + +import ( + "io" + "testing" + "time" + + "github.com/hudl/fargo" + + "github.com/go-kit/kit/endpoint" + "github.com/go-kit/kit/sd" +) + +var _ sd.Instancer = &Instancer{} // API check + +func TestInstancer(t *testing.T) { + factory := func(string) (endpoint.Endpoint, io.Closer, error) { + return endpoint.Nop, nil, nil + } + + connection := &testConnection{ + instances: []*fargo.Instance{instanceTest1}, + application: appUpdateTest, + errApplication: nil, + } + + instancer := NewInstancer(connection, appNameTest, loggerTest) + defer instancer.Stop() + endpointer := sd.NewEndpointer(instancer, factory, loggerTest) + + endpoints, err := endpointer.Endpoints() + if err != nil { + t.Fatal(err) + } + + if want, have := 1, len(endpoints); want != have { + t.Errorf("want %d, have %d", want, have) + } +} + +func TestInstancerScheduleUpdates(t *testing.T) { + factory := func(string) (endpoint.Endpoint, io.Closer, error) { + return endpoint.Nop, nil, nil + } + + connection := &testConnection{ + instances: []*fargo.Instance{instanceTest1}, + application: appUpdateTest, + errApplication: nil, + } + + instancer := NewInstancer(connection, appNameTest, loggerTest) + defer instancer.Stop() + endpointer := sd.NewEndpointer(instancer, factory, loggerTest) + + endpoints, _ := endpointer.Endpoints() + if want, have := 1, len(endpoints); want != have { + t.Errorf("want %d, have %d", want, have) + } + + time.Sleep(50 * time.Millisecond) + + endpoints, _ = endpointer.Endpoints() + if want, have := 2, len(endpoints); want != have { + t.Errorf("want %v, have %v", want, have) + } +} + +func TestBadFactory(t *testing.T) { + factory := func(string) (endpoint.Endpoint, io.Closer, error) { + return nil, nil, errTest + } + + connection := &testConnection{ + instances: []*fargo.Instance{instanceTest1}, + application: appUpdateTest, + errApplication: nil, + } + + instancer := NewInstancer(connection, appNameTest, loggerTest) + defer instancer.Stop() + endpointer := sd.NewEndpointer(instancer, factory, loggerTest) + + endpoints, err := endpointer.Endpoints() + if err != nil { + t.Fatal(err) + } + + if want, have := 0, len(endpoints); want != have { + t.Errorf("want %d, have %d", want, have) + } +} + +func TestBadInstancerInstances(t *testing.T) { + factory := func(string) (endpoint.Endpoint, io.Closer, error) { + return endpoint.Nop, nil, nil + } + + connection := &testConnection{ + instances: []*fargo.Instance{}, + errInstances: errTest, + application: appUpdateTest, + errApplication: nil, + } + + instancer := NewInstancer(connection, appNameTest, loggerTest) + defer instancer.Stop() + endpointer := sd.NewEndpointer(instancer, factory, loggerTest) + + endpoints, err := endpointer.Endpoints() + if err != nil { + t.Fatal(err) + } + + if want, have := 0, len(endpoints); want != have { + t.Errorf("want %d, have %d", want, have) + } +} + +func TestBadInstancerScheduleUpdates(t *testing.T) { + factory := func(string) (endpoint.Endpoint, io.Closer, error) { + return endpoint.Nop, nil, nil + } + + connection := &testConnection{ + instances: []*fargo.Instance{instanceTest1}, + application: appUpdateTest, + errApplication: errTest, + } + + instancer := NewInstancer(connection, appNameTest, loggerTest) + defer instancer.Stop() + endpointer := sd.NewEndpointer(instancer, factory, loggerTest) + + endpoints, err := endpointer.Endpoints() + if err != nil { + t.Error(err) + } + if want, have := 1, len(endpoints); want != have { + t.Errorf("want %d, have %d", want, have) + } + + time.Sleep(50 * time.Millisecond) + + endpoints, err = endpointer.Endpoints() + if err != nil { + t.Error(err) + } + if want, have := 1, len(endpoints); want != have { + t.Errorf("want %v, have %v", want, have) + } +} diff --git a/sd/eureka/integration_test.go b/sd/eureka/integration_test.go index b519f70..c2ced6a 100644 --- a/sd/eureka/integration_test.go +++ b/sd/eureka/integration_test.go @@ -12,6 +12,7 @@ "github.com/go-kit/kit/endpoint" "github.com/go-kit/kit/log" + "github.com/go-kit/kit/sd" ) // Package sd/eureka provides a wrapper around the Netflix Eureka service @@ -54,16 +55,16 @@ t.Logf("factory invoked for %q", instance) return endpoint.Nop, nil, nil } - s := NewSubscriber( + instancer := NewInstancer( &fargoConnection, appNameTest, - factory, - log.With(logger, "component", "subscriber"), + log.With(logger, "component", "instancer"), ) - defer s.Stop() + defer instancer.Stop() + endpointer := sd.NewEndpointer(instancer, factory, log.With(logger, "component", "endpointer")) // We should have one endpoint immediately after subscriber instantiation. - endpoints, err := s.Endpoints() + endpoints, err := endpointer.Endpoints() if err != nil { t.Error(err) } @@ -81,7 +82,7 @@ time.Sleep(2 * time.Second) // Now we should have two endpoints. - endpoints, err = s.Endpoints() + endpoints, err = endpointer.Endpoints() if err != nil { t.Error(err) } @@ -96,7 +97,7 @@ time.Sleep(2 * time.Second) // And then there was one. - endpoints, err = s.Endpoints() + endpoints, err = endpointer.Endpoints() if err != nil { t.Error(err) } diff --git a/sd/eureka/subscriber.go b/sd/eureka/subscriber.go deleted file mode 100644 index 0300d0d..0000000 --- a/sd/eureka/subscriber.go +++ /dev/null @@ -1,106 +0,0 @@ -package eureka - -import ( - "fmt" - - "github.com/hudl/fargo" - - "github.com/go-kit/kit/endpoint" - "github.com/go-kit/kit/log" - "github.com/go-kit/kit/sd" - "github.com/go-kit/kit/sd/cache" -) - -// Subscriber yields endpoints stored in the Eureka registry for the given app. -// Changes in that app are watched and will update the Subscriber endpoints. -type Subscriber struct { - conn fargoConnection - app string - factory sd.Factory - logger log.Logger - cache *cache.Cache - quitc chan chan struct{} -} - -var _ sd.Subscriber = (*Subscriber)(nil) - -// NewSubscriber returns a Eureka subscriber. It will start watching the given -// app string for changes, and update the endpoints accordingly. -func NewSubscriber(conn fargoConnection, app string, factory sd.Factory, logger log.Logger) *Subscriber { - logger = log.With(logger, "app", app) - - s := &Subscriber{ - conn: conn, - app: app, - factory: factory, - logger: logger, - cache: cache.New(factory, logger), - quitc: make(chan chan struct{}), - } - - instances, err := s.getInstances() - if err == nil { - s.logger.Log("instances", len(instances)) - } else { - s.logger.Log("during", "getInstances", "err", err) - } - - s.cache.Update(instances) - go s.loop() - return s -} - -// Endpoints implements the Subscriber interface. -func (s *Subscriber) Endpoints() ([]endpoint.Endpoint, error) { - return s.cache.Endpoints(), nil -} - -// Stop terminates the subscriber. -func (s *Subscriber) Stop() { - q := make(chan struct{}) - s.quitc <- q - <-q - s.quitc = nil -} - -func (s *Subscriber) loop() { - var ( - await = false - done = make(chan struct{}) - updatec = s.conn.ScheduleAppUpdates(s.app, await, done) - ) - defer close(done) - - for { - select { - case update := <-updatec: - if update.Err != nil { - s.logger.Log("during", "Update", "err", update.Err) - continue - } - instances := convertFargoAppToInstances(update.App) - s.logger.Log("instances", len(instances)) - s.cache.Update(instances) - - case q := <-s.quitc: - close(q) - return - } - } -} - -func (s *Subscriber) getInstances() ([]string, error) { - app, err := s.conn.GetApp(s.app) - if err != nil { - return nil, err - } - return convertFargoAppToInstances(app), nil -} - -func convertFargoAppToInstances(app *fargo.Application) []string { - instances := make([]string, len(app.Instances)) - for i, inst := range app.Instances { - instances[i] = fmt.Sprintf("%s:%d", inst.IPAddr, inst.Port) - } - return instances -} diff --git a/sd/eureka/subscriber_test.go b/sd/eureka/subscriber_test.go deleted file mode 100644 index 4d0d9b3..0000000 --- a/sd/eureka/subscriber_test.go +++ /dev/null @@ -1,144 +0,0 @@ -package eureka - -import ( - "io" - "testing" - "time" - - "github.com/hudl/fargo" - - "github.com/go-kit/kit/endpoint" -) - -func TestSubscriber(t *testing.T) { - factory := func(string) (endpoint.Endpoint, io.Closer, error) { - return endpoint.Nop, nil, nil - } - - connection := &testConnection{ - instances: []*fargo.Instance{instanceTest1}, - application: appUpdateTest, - errApplication: nil, - } - - subscriber := NewSubscriber(connection, appNameTest, factory, loggerTest) - defer subscriber.Stop() - - endpoints, err := subscriber.Endpoints() - if err != nil { - t.Fatal(err) - } - - if want, have := 1, len(endpoints); want != have { - t.Errorf("want %d, have %d", want, have) - } -} - -func TestSubscriberScheduleUpdates(t *testing.T) { - factory := func(string) (endpoint.Endpoint, io.Closer, error) { - return endpoint.Nop, nil, nil - } - - connection := &testConnection{ - instances: []*fargo.Instance{instanceTest1}, - application: appUpdateTest, - errApplication: nil, - } - - subscriber := NewSubscriber(connection, appNameTest, factory, loggerTest) - defer subscriber.Stop() - - endpoints, _ := subscriber.Endpoints() - if want, have := 1, len(endpoints); want != have { - t.Errorf("want %d, have %d", want, have) - } - - time.Sleep(50 * time.Millisecond) - - endpoints, _ = subscriber.Endpoints() - if want, have := 2, len(endpoints); want != have { - t.Errorf("want %v, have %v", want, have) - } -} - -func TestBadFactory(t *testing.T) { - factory := func(string) (endpoint.Endpoint, io.Closer, error) { - return nil, nil, errTest - } - - connection := &testConnection{ - instances: []*fargo.Instance{instanceTest1}, - application: appUpdateTest, - errApplication: nil, - } - - subscriber := NewSubscriber(connection, appNameTest, factory, loggerTest) - defer subscriber.Stop() - - endpoints, err := subscriber.Endpoints() - if err != nil { - t.Fatal(err) - } - - if want, have := 0, len(endpoints); want != have { - t.Errorf("want %d, have %d", want, have) - } -} - -func TestBadSubscriberInstances(t *testing.T) { - factory := func(string) (endpoint.Endpoint, io.Closer, error) { - return endpoint.Nop, nil, nil - } - - connection := &testConnection{ - instances: []*fargo.Instance{}, - errInstances: errTest, - application: appUpdateTest, - errApplication: nil, - } - - subscriber := NewSubscriber(connection, appNameTest, factory, loggerTest) - defer subscriber.Stop() - - endpoints, err := subscriber.Endpoints() - if err != nil { - t.Fatal(err) - } - - if want, have := 0, len(endpoints); want != have { - t.Errorf("want %d, have %d", want, have) - } -} - -func TestBadSubscriberScheduleUpdates(t *testing.T) { - factory := func(string) (endpoint.Endpoint, io.Closer, error) { - return endpoint.Nop, nil, nil - } - - connection := &testConnection{ - instances: []*fargo.Instance{instanceTest1}, - application: appUpdateTest, - errApplication: errTest, - } - - subscriber := NewSubscriber(connection, appNameTest, factory, loggerTest) - defer subscriber.Stop() - - endpoints, err := subscriber.Endpoints() - if err != nil { - t.Error(err) - } - if want, have := 1, len(endpoints); want != have { - t.Errorf("want %d, have %d", want, have) - } - - time.Sleep(50 * time.Millisecond) - - endpoints, err = subscriber.Endpoints() - if err != nil { - t.Error(err) - } - if want, have := 1, len(endpoints); want != have { - t.Errorf("want %v, have %v", want, have) - } -} diff --git a/sd/fixed_subscriber.go b/sd/fixed_subscriber.go deleted file mode 100644 index 98fd503..0000000 --- a/sd/fixed_subscriber.go +++ /dev/null @@ -1,9 +0,0 @@ -package sd - -import "github.com/go-kit/kit/endpoint" - -// FixedSubscriber yields a fixed set of services. -type FixedSubscriber []endpoint.Endpoint - -// Endpoints implements Subscriber. -func (s FixedSubscriber) Endpoints() ([]endpoint.Endpoint, error) { return s, nil } diff --git a/sd/instancer.go b/sd/instancer.go new file mode 100644 index 0000000..09647a9 --- /dev/null +++ b/sd/instancer.go @@ -0,0 +1,34 @@ +package sd + +// Event represents a push notification generated from the underlying service discovery +// implementation. It contains either a full set of available resource instances, or +// an error indicating some issue with obtaining information from discovery backend. +// Examples of errors may include loosing connection to the discovery backend, or +// trying to look up resource instances using an incorrectly formatted key. +// After receiving an Event with an error the listenter should treat previously discovered +// resource instances as stale (although it may choose to continue using them). +// If the Instancer is able to restore connection to the discovery backend it must push +// another Event with the current set of resource instances. +type Event struct { + Instances []string + Err error +} + +// Instancer listens to a service discovery system and notifies registered +// observers of changes in the resource instances. Every event sent to the channels +// contains a complete set of instances known to the Instancer. That complete set is +// sent immediately upon registering the channel, and on any future updates from +// discovery system. +type Instancer interface { + Register(chan<- Event) + Deregister(chan<- Event) +} + +// FixedInstancer yields a fixed set of instances. +type FixedInstancer []string + +// Register implements Instancer. +func (d FixedInstancer) Register(ch chan<- Event) { ch <- Event{Instances: d} } + +// Deregister implements Instancer. +func (d FixedInstancer) Deregister(ch chan<- Event) {} diff --git a/sd/internal/instance/cache.go b/sd/internal/instance/cache.go new file mode 100644 index 0000000..94b19fd --- /dev/null +++ b/sd/internal/instance/cache.go @@ -0,0 +1,79 @@ +package instance + +import ( + "reflect" + "sort" + "sync" + + "github.com/go-kit/kit/sd" +) + +// Cache keeps track of resource instances provided to it via Update method +// and implements the Instancer interface +type Cache struct { + mtx sync.RWMutex + state sd.Event + reg registry +} + +// NewCache creates a new Cache. +func NewCache() *Cache { + return &Cache{ + reg: registry{}, + } +} + +// Update receives new instances from service discovery, stores them internally, +// and notifies all registered listeners. +func (c *Cache) Update(event sd.Event) { + c.mtx.Lock() + defer c.mtx.Unlock() + + sort.Strings(event.Instances) + if reflect.DeepEqual(c.state, event) { + return // no need to broadcast the same instances + } + + c.state = event + c.reg.broadcast(event) +} + +// State returns the current state of discovery (instances or error) as sd.Event +func (c *Cache) State() sd.Event { + c.mtx.RLock() + defer c.mtx.RUnlock() + return c.state +} + +// Register implements Instancer. +func (c *Cache) Register(ch chan<- sd.Event) { + c.mtx.Lock() + defer c.mtx.Unlock() + c.reg.register(ch) + // always push the current state to new channels + ch <- c.state +} + +// Deregister implements Instancer. +func (c *Cache) Deregister(ch chan<- sd.Event) { + c.mtx.Lock() + defer c.mtx.Unlock() + c.reg.deregister(ch) +} + +// registry is not goroutine-safe. +type registry map[chan<- sd.Event]struct{} + +func (r registry) broadcast(event sd.Event) { + for c := range r { + c <- event + } +} + +func (r registry) register(c chan<- sd.Event) { + r[c] = struct{}{} +} + +func (r registry) deregister(c chan<- sd.Event) { + delete(r, c) +} diff --git a/sd/internal/instance/cache_test.go b/sd/internal/instance/cache_test.go new file mode 100644 index 0000000..5d9847a --- /dev/null +++ b/sd/internal/instance/cache_test.go @@ -0,0 +1,78 @@ +package instance + +import ( + "sync" + "testing" + + "github.com/go-kit/kit/sd" +) + +var _ sd.Instancer = &Cache{} // API check + +func TestCache(t *testing.T) { + // TODO this test is not finished yet + + c := NewCache() + + { + state := c.State() + if want, have := 0, len(state.Instances); want != have { + t.Fatalf("want %v instances, have %v", want, have) + } + } + + notification1 := sd.Event{Instances: []string{"x", "y"}} + notification2 := sd.Event{Instances: []string{"a", "b", "c"}} + + c.Update(notification1) + + // times 2 because we have two observers + expectedInstances := 2 * (len(notification1.Instances) + len(notification2.Instances)) + + wg := sync.WaitGroup{} + wg.Add(expectedInstances) + + receiver := func(ch chan sd.Event) { + for state := range ch { + // count total number of instances received + for range state.Instances { + wg.Done() + } + } + } + + f1 := make(chan sd.Event) + f2 := make(chan sd.Event) + go receiver(f1) + go receiver(f2) + + c.Register(f1) + c.Register(f2) + + c.Update(notification1) + c.Update(notification2) + + // if state := c.State(); instances == nil { + // if want, have := len(notification2), len(instances); want != have { + // t.Errorf("want length %v, have %v", want, have) + // } else { + // for i := range notification2 { + // if want, have := notification2[i], instances[i]; want != have { + // t.Errorf("want instance %v, have %v", want, have) + // } + // } + // } + // } + + close(f1) + close(f2) + + wg.Wait() + + // d.Deregister(f1) + + // d.Unregister(f2) + // if want, have := 0, len(d.observers); want != have { + // t.Fatalf("want %v observers, have %v", want, have) + // } +} diff --git a/sd/lb/random.go b/sd/lb/random.go index 78b0956..b1b06fc 100644 --- a/sd/lb/random.go +++ b/sd/lb/random.go @@ -8,7 +8,7 @@ ) // NewRandom returns a load balancer that selects services randomly. -func NewRandom(s sd.Subscriber, seed int64) Balancer { +func NewRandom(s sd.Endpointer, seed int64) Balancer { return &random{ s: s, r: rand.New(rand.NewSource(seed)), @@ -16,7 +16,7 @@ } type random struct { - s sd.Subscriber + s sd.Endpointer r *rand.Rand } diff --git a/sd/lb/random_test.go b/sd/lb/random_test.go index 8cee33e..ff56326 100644 --- a/sd/lb/random_test.go +++ b/sd/lb/random_test.go @@ -25,8 +25,8 @@ endpoints[i] = func(context.Context, interface{}) (interface{}, error) { counts[i0]++; return struct{}{}, nil } } - subscriber := sd.FixedSubscriber(endpoints) - balancer := NewRandom(subscriber, seed) + endpointer := sd.FixedEndpointer(endpoints) + balancer := NewRandom(endpointer, seed) for i := 0; i < iterations; i++ { endpoint, _ := balancer.Endpoint() @@ -42,8 +42,8 @@ } func TestRandomNoEndpoints(t *testing.T) { - subscriber := sd.FixedSubscriber{} - balancer := NewRandom(subscriber, 1415926) + endpointer := sd.FixedEndpointer{} + balancer := NewRandom(endpointer, 1415926) _, err := balancer.Endpoint() if want, have := ErrNoEndpoints, err; want != have { t.Errorf("want %v, have %v", want, have) diff --git a/sd/lb/retry_test.go b/sd/lb/retry_test.go index 7cdd892..7563980 100644 --- a/sd/lb/retry_test.go +++ b/sd/lb/retry_test.go @@ -13,7 +13,7 @@ func TestRetryMaxTotalFail(t *testing.T) { var ( - endpoints = sd.FixedSubscriber{} // no endpoints + endpoints = sd.FixedEndpointer{} // no endpoints rr = lb.NewRoundRobin(endpoints) retry = lb.Retry(999, time.Second, rr) // lots of retries ctx = context.Background() @@ -30,13 +30,13 @@ func(context.Context, interface{}) (interface{}, error) { return nil, errors.New("error two") }, func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil /* OK */ }, } - subscriber = sd.FixedSubscriber{ + endpointer = sd.FixedEndpointer{ 0: endpoints[0], 1: endpoints[1], 2: endpoints[2], } retries = len(endpoints) - 1 // not quite enough retries - rr = lb.NewRoundRobin(subscriber) + rr = lb.NewRoundRobin(endpointer) ctx = context.Background() ) if _, err := lb.Retry(retries, time.Second, rr)(ctx, struct{}{}); err == nil { @@ -51,13 +51,13 @@ func(context.Context, interface{}) (interface{}, error) { return nil, errors.New("error two") }, func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil /* OK */ }, } - subscriber = sd.FixedSubscriber{ + endpointer = sd.FixedEndpointer{ 0: endpoints[0], 1: endpoints[1], 2: endpoints[2], } retries = len(endpoints) // exactly enough retries - rr = lb.NewRoundRobin(subscriber) + rr = lb.NewRoundRobin(endpointer) ctx = context.Background() ) if _, err := lb.Retry(retries, time.Second, rr)(ctx, struct{}{}); err != nil { @@ -70,7 +70,7 @@ step = make(chan struct{}) e = func(context.Context, interface{}) (interface{}, error) { <-step; return struct{}{}, nil } timeout = time.Millisecond - retry = lb.Retry(999, timeout, lb.NewRoundRobin(sd.FixedSubscriber{0: e})) + retry = lb.Retry(999, timeout, lb.NewRoundRobin(sd.FixedEndpointer{0: e})) errs = make(chan error, 1) invoke = func() { _, err := retry(context.Background(), struct{}{}); errs <- err } ) @@ -92,7 +92,7 @@ var ( myErr = errors.New("aborting early") cb = func(int, error) (bool, error) { return false, myErr } - endpoints = sd.FixedSubscriber{} // no endpoints + endpoints = sd.FixedEndpointer{} // no endpoints rr = lb.NewRoundRobin(endpoints) retry = lb.RetryWithCallback(time.Second, rr, cb) // lots of retries ctx = context.Background() @@ -115,7 +115,7 @@ endpoint = func(ctx context.Context, request interface{}) (interface{}, error) { return nil, myErr } - endpoints = sd.FixedSubscriber{endpoint} // no endpoints + endpoints = sd.FixedEndpointer{endpoint} // no endpoints rr = lb.NewRoundRobin(endpoints) retry = lb.RetryWithCallback(time.Second, rr, cb) // lots of retries ctx = context.Background() @@ -128,10 +128,10 @@ func TestHandleNilCallback(t *testing.T) { var ( - subscriber = sd.FixedSubscriber{ + endpointer = sd.FixedEndpointer{ func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil /* OK */ }, } - rr = lb.NewRoundRobin(subscriber) + rr = lb.NewRoundRobin(endpointer) ctx = context.Background() ) retry := lb.RetryWithCallback(time.Second, rr, nil) diff --git a/sd/lb/round_robin.go b/sd/lb/round_robin.go index 74b86ca..1d93a3b 100644 --- a/sd/lb/round_robin.go +++ b/sd/lb/round_robin.go @@ -8,7 +8,7 @@ ) // NewRoundRobin returns a load balancer that returns services in sequence. -func NewRoundRobin(s sd.Subscriber) Balancer { +func NewRoundRobin(s sd.Endpointer) Balancer { return &roundRobin{ s: s, c: 0, @@ -16,7 +16,7 @@ } type roundRobin struct { - s sd.Subscriber + s sd.Endpointer c uint64 } diff --git a/sd/lb/round_robin_test.go b/sd/lb/round_robin_test.go index b10ddbf..64cd075 100644 --- a/sd/lb/round_robin_test.go +++ b/sd/lb/round_robin_test.go @@ -22,8 +22,8 @@ } ) - subscriber := sd.FixedSubscriber(endpoints) - balancer := NewRoundRobin(subscriber) + endpointer := sd.FixedEndpointer(endpoints) + balancer := NewRoundRobin(endpointer) for i, want := range [][]int{ {1, 0, 0}, @@ -46,8 +46,8 @@ } func TestRoundRobinNoEndpoints(t *testing.T) { - subscriber := sd.FixedSubscriber{} - balancer := NewRoundRobin(subscriber) + endpointer := sd.FixedEndpointer{} + balancer := NewRoundRobin(endpointer) _, err := balancer.Endpoint() if want, have := ErrNoEndpoints, err; want != have { t.Errorf("want %v, have %v", want, have) @@ -55,7 +55,7 @@ } func TestRoundRobinNoRace(t *testing.T) { - balancer := NewRoundRobin(sd.FixedSubscriber([]endpoint.Endpoint{ + balancer := NewRoundRobin(sd.FixedEndpointer([]endpoint.Endpoint{ endpoint.Nop, endpoint.Nop, endpoint.Nop, diff --git a/sd/subscriber.go b/sd/subscriber.go deleted file mode 100644 index 8267b51..0000000 --- a/sd/subscriber.go +++ /dev/null @@ -1,11 +0,0 @@ -package sd - -import "github.com/go-kit/kit/endpoint" - -// Subscriber listens to a service discovery system and yields a set of -// identical endpoints on demand. An error indicates a problem with connectivity -// to the service discovery system, or within the system itself; a subscriber -// may yield no endpoints without error. -type Subscriber interface { - Endpoints() ([]endpoint.Endpoint, error) -} diff --git a/sd/zk/client_test.go b/sd/zk/client_test.go index fbb2a5a..e201536 100644 --- a/sd/zk/client_test.go +++ b/sd/zk/client_test.go @@ -107,15 +107,15 @@ t.Fatal("expected new Client, got nil") } - s, err := NewSubscriber(c, "/validpath", newFactory(""), log.NewNopLogger()) + s, err := NewInstancer(c, "/validpath", log.NewNopLogger()) if err != stdzk.ErrNoServer { t.Errorf("unexpected error: %v", err) } if s != nil { - t.Error("expected failed new Subscriber") + t.Error("expected failed new Instancer") } - s, err = NewSubscriber(c, "invalidpath", newFactory(""), log.NewNopLogger()) + s, err = NewInstancer(c, "invalidpath", log.NewNopLogger()) if err != stdzk.ErrInvalidPath { t.Errorf("unexpected error: %v", err) } @@ -131,12 +131,12 @@ t.Errorf("unexpected error: %v", err) } - s, err = NewSubscriber(c, "/validpath", newFactory(""), log.NewNopLogger()) + s, err = NewInstancer(c, "/validpath", log.NewNopLogger()) if err != ErrClientClosed { t.Errorf("unexpected error: %v", err) } if s != nil { - t.Error("expected failed new Subscriber") + t.Error("expected failed new Instancer") } c, err = NewClient([]string{"localhost:65500"}, log.NewNopLogger(), Payload(payload)) @@ -147,11 +147,11 @@ t.Fatal("expected new Client, got nil") } - s, err = NewSubscriber(c, "/validpath", newFactory(""), log.NewNopLogger()) + s, err = NewInstancer(c, "/validpath", log.NewNopLogger()) if err != stdzk.ErrNoServer { t.Errorf("unexpected error: %v", err) } if s != nil { - t.Error("expected failed new Subscriber") + t.Error("expected failed new Instancer") } } diff --git a/sd/zk/doc.go b/sd/zk/doc.go index 02c2208..be5a253 100644 --- a/sd/zk/doc.go +++ b/sd/zk/doc.go @@ -1,2 +1,2 @@ -// Package zk provides subscriber and registrar implementations for ZooKeeper. +// Package zk provides Instancer and Registrar implementations for ZooKeeper. package zk diff --git a/sd/zk/instancer.go b/sd/zk/instancer.go new file mode 100644 index 0000000..1983112 --- /dev/null +++ b/sd/zk/instancer.go @@ -0,0 +1,80 @@ +package zk + +import ( + "github.com/samuel/go-zookeeper/zk" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/sd" + "github.com/go-kit/kit/sd/internal/instance" +) + +// Instancer yield instances stored in a certain ZooKeeper path. Any kind of +// change in that path is watched and will update the subscribers. +type Instancer struct { + instance.Cache + client Client + path string + logger log.Logger + quitc chan struct{} +} + +// NewInstancer returns a ZooKeeper Instancer. ZooKeeper will start watching +// the given path for changes and update the Instancer endpoints. +func NewInstancer(c Client, path string, logger log.Logger) (*Instancer, error) { + s := &Instancer{ + Cache: *instance.NewCache(), + client: c, + path: path, + logger: logger, + quitc: make(chan struct{}), + } + + err := s.client.CreateParentNodes(s.path) + if err != nil { + return nil, err + } + + instances, eventc, err := s.client.GetEntries(s.path) + if err != nil { + logger.Log("path", s.path, "msg", "failed to retrieve entries", "err", err) + // TODO why zk constructor exits when other implementations continue? + return nil, err + } + logger.Log("path", s.path, "instances", len(instances)) + s.Update(sd.Event{Instances: instances}) + + go s.loop(eventc) + + return s, nil +} + +func (s *Instancer) loop(eventc <-chan zk.Event) { + var ( + instances []string + err error + ) + for { + select { + case <-eventc: + // We received a path update notification. Call GetEntries to + // retrieve child node data, and set a new watch, as ZK watches are + // one-time triggers. + instances, eventc, err = s.client.GetEntries(s.path) + if err != nil { + s.logger.Log("path", s.path, "msg", "failed to retrieve entries", "err", err) + s.Update(sd.Event{Err: err}) + continue + } + s.logger.Log("path", s.path, "instances", len(instances)) + s.Update(sd.Event{Instances: instances}) + + case <-s.quitc: + return + } + } +} + +// Stop terminates the Instancer. +func (s *Instancer) Stop() { + close(s.quitc) +} diff --git a/sd/zk/instancer_test.go b/sd/zk/instancer_test.go new file mode 100644 index 0000000..de9666b --- /dev/null +++ b/sd/zk/instancer_test.go @@ -0,0 +1,125 @@ +package zk + +import ( + "testing" + "time" + + "github.com/go-kit/kit/sd" +) + +var _ sd.Instancer = &Instancer{} + +func TestInstancer(t *testing.T) { + client := newFakeClient() + + instancer, err := NewInstancer(client, path, logger) + if err != nil { + t.Fatalf("failed to create new Instancer: %v", err) + } + defer instancer.Stop() + endpointer := sd.NewEndpointer(instancer, newFactory(""), logger) + + if _, err := endpointer.Endpoints(); err != nil { + t.Fatal(err) + } +} + +func TestBadFactory(t *testing.T) { + client := newFakeClient() + + instancer, err := NewInstancer(client, path, logger) + if err != nil { + t.Fatalf("failed to create new Instancer: %v", err) + } + defer instancer.Stop() + endpointer := sd.NewEndpointer(instancer, newFactory("kaboom"), logger) + + // instance1 came online + client.AddService(path+"/instance1", "kaboom") + + // instance2 came online + client.AddService(path+"/instance2", "zookeeper_node_data") + + if err = asyncTest(100*time.Millisecond, 1, endpointer); err != nil { + t.Error(err) + } +} + +func TestServiceUpdate(t *testing.T) { + client := newFakeClient() + + instancer, err := NewInstancer(client, path, logger) + if err != nil { + t.Fatalf("failed to create new Instancer: %v", err) + } + defer instancer.Stop() + endpointer := sd.NewEndpointer(instancer, newFactory(""), logger) + + endpoints, err := endpointer.Endpoints() + if err != nil { + t.Fatal(err) + } + if want, have := 0, len(endpoints); want != have { + t.Errorf("want %d, have %d", want, have) + } + + // instance1 came online + client.AddService(path+"/instance1", "zookeeper_node_data1") + + // instance2 came online + client.AddService(path+"/instance2", "zookeeper_node_data2") + + // we should have 2 instances + if err = asyncTest(100*time.Millisecond, 2, endpointer); err != nil { + t.Error(err) + } + + // TODO(pb): this bit is flaky + // + //// watch triggers an error... + //client.SendErrorOnWatch() + // + //// test if error was consumed + //if err = client.ErrorIsConsumedWithin(100 * time.Millisecond); err != nil { + // t.Error(err) + //} + + // instance3 came online + client.AddService(path+"/instance3", "zookeeper_node_data3") + + // we should have 3 instances + if err = asyncTest(100*time.Millisecond, 3, endpointer); err != nil { + t.Error(err) + } + + // instance1 goes offline + client.RemoveService(path + "/instance1") + + // instance2 goes offline + client.RemoveService(path + "/instance2") + + // we should have 1 instance + if err = asyncTest(100*time.Millisecond, 1, endpointer); err != nil { + t.Error(err) + } +} + +func TestBadInstancerCreate(t *testing.T) { + client := newFakeClient() + client.SendErrorOnWatch() + + instancer, err := NewInstancer(client, path, logger) + if err == nil { + t.Error("expected error on new Instancer") + } + if instancer != nil { + t.Error("expected Instancer not to be created") + } + instancer, err = NewInstancer(client, "BadPath", logger) + if err == nil { + t.Error("expected error on new Instancer") + } + if instancer != nil { + t.Error("expected Instancer not to be created") + } +} diff --git a/sd/zk/subscriber.go b/sd/zk/subscriber.go deleted file mode 100644 index b9c67db..0000000 --- a/sd/zk/subscriber.go +++ /dev/null @@ -1,86 +0,0 @@ -package zk - -import ( - "github.com/samuel/go-zookeeper/zk" - - "github.com/go-kit/kit/endpoint" - "github.com/go-kit/kit/log" - "github.com/go-kit/kit/sd" - "github.com/go-kit/kit/sd/cache" -) - -// Subscriber yield endpoints stored in a certain ZooKeeper path. Any kind of -// change in that path is watched and will update the Subscriber endpoints. -type Subscriber struct { - client Client - path string - cache *cache.Cache - logger log.Logger - quitc chan struct{} -} - -var _ sd.Subscriber = &Subscriber{} - -// NewSubscriber returns a ZooKeeper subscriber. ZooKeeper will start watching -// the given path for changes and update the Subscriber endpoints. -func NewSubscriber(c Client, path string, factory sd.Factory, logger log.Logger) (*Subscriber, error) { - s := &Subscriber{ - client: c, - path: path, - cache: cache.New(factory, logger), - logger: logger, - quitc: make(chan struct{}), - } - - err := s.client.CreateParentNodes(s.path) - if err != nil { - return nil, err - } - - instances, eventc, err := s.client.GetEntries(s.path) - if err != nil { - logger.Log("path", s.path, "msg", "failed to retrieve entries", "err", err) - return nil, err - } - logger.Log("path", s.path, "instances", len(instances)) - s.cache.Update(instances) - - go s.loop(eventc) - - return s, nil -} - -func (s *Subscriber) loop(eventc <-chan zk.Event) { - var ( - instances []string - err error - ) - for { - select { - case <-eventc: - // We received a path update notification. Call GetEntries to - // retrieve child node data, and set a new watch, as ZK watches are - // one-time triggers. - instances, eventc, err = s.client.GetEntries(s.path) - if err != nil { - s.logger.Log("path", s.path, "msg", "failed to retrieve entries", "err", err) - continue - } - s.logger.Log("path", s.path, "instances", len(instances)) - s.cache.Update(instances) - - case <-s.quitc: - return - } - } -} - -// Endpoints implements the Subscriber interface. -func (s *Subscriber) Endpoints() ([]endpoint.Endpoint, error) { - return s.cache.Endpoints(), nil -} - -// Stop terminates the Subscriber. -func (s *Subscriber) Stop() { - close(s.quitc) -} diff --git a/sd/zk/subscriber_test.go b/sd/zk/subscriber_test.go deleted file mode 100644 index 79bdb84..0000000 --- a/sd/zk/subscriber_test.go +++ /dev/null @@ -1,117 +0,0 @@ -package zk - -import ( - "testing" - "time" -) - -func TestSubscriber(t *testing.T) { - client := newFakeClient() - - s, err := NewSubscriber(client, path, newFactory(""), logger) - if err != nil { - t.Fatalf("failed to create new Subscriber: %v", err) - } - defer s.Stop() - - if _, err := s.Endpoints(); err != nil { - t.Fatal(err) - } -} - -func TestBadFactory(t *testing.T) { - client := newFakeClient() - - s, err := NewSubscriber(client, path, newFactory("kaboom"), logger) - if err != nil { - t.Fatalf("failed to create new Subscriber: %v", err) - } - defer s.Stop() - - // instance1 came online - client.AddService(path+"/instance1", "kaboom") - - // instance2 came online - client.AddService(path+"/instance2", "zookeeper_node_data") - - if err = asyncTest(100*time.Millisecond, 1, s); err != nil { - t.Error(err) - } -} - -func TestServiceUpdate(t *testing.T) { - client := newFakeClient() - - s, err := NewSubscriber(client, path, newFactory(""), logger) - if err != nil { - t.Fatalf("failed to create new Subscriber: %v", err) - } - defer s.Stop() - - endpoints, err := s.Endpoints() - if err != nil { - t.Fatal(err) - } - if want, have := 0, len(endpoints); want != have { - t.Errorf("want %d, have %d", want, have) - } - - // instance1 came online - client.AddService(path+"/instance1", "zookeeper_node_data1") - - // instance2 came online - client.AddService(path+"/instance2", "zookeeper_node_data2") - - // we should have 2 instances - if err = asyncTest(100*time.Millisecond, 2, s); err != nil { - t.Error(err) - } - - // TODO(pb): this bit is flaky - // - //// watch triggers an error... - //client.SendErrorOnWatch() - // - //// test if error was consumed - //if err = client.ErrorIsConsumedWithin(100 * time.Millisecond); err != nil { - // t.Error(err) - //} - - // instance3 came online - client.AddService(path+"/instance3", "zookeeper_node_data3") - - // we should have 3 instances - if err = asyncTest(100*time.Millisecond, 3, s); err != nil { - t.Error(err) - } - - // instance1 goes offline - client.RemoveService(path + "/instance1") - - // instance2 goes offline - client.RemoveService(path + "/instance2") - - // we should have 1 instance - if err = asyncTest(100*time.Millisecond, 1, s); err != nil { - t.Error(err) - } -} - -func TestBadSubscriberCreate(t *testing.T) { - client := newFakeClient() - client.SendErrorOnWatch() - s, err := NewSubscriber(client, path, newFactory(""), logger) - if err == nil { - t.Error("expected error on new Subscriber") - } - if s != nil { - t.Error("expected Subscriber not to be created") - } - s, err = NewSubscriber(client, "BadPath", newFactory(""), logger) - if err == nil { - t.Error("expected error on new Subscriber") - } - if s != nil { - t.Error("expected Subscriber not to be created") - } -} diff --git a/sd/zk/util_test.go b/sd/zk/util_test.go index 91ba966..4f975a5 100644 --- a/sd/zk/util_test.go +++ b/sd/zk/util_test.go @@ -114,7 +114,7 @@ } } -func asyncTest(timeout time.Duration, want int, s *Subscriber) (err error) { +func asyncTest(timeout time.Duration, want int, s sd.Endpointer) (err error) { var endpoints []endpoint.Endpoint have := -1 // want can never be <0 t := time.After(timeout)