Codebase list golang-github-go-kit-kit / c80303e
Support push model for service discovery Yuri Shkuro 6 years ago
53 changed file(s) with 1707 addition(s) and 1439 deletion(s). Raw diff Collapse all Expand all
4242 # auto-generated tag files
4343 tags
4444
45 # dependency management files
46 glide.lock
47 glide.yaml
48 vendor/
49
66 set -e
77
88 function go_files { find . -name '*_test.go' ; }
9 function filter { grep -v '/_' ; }
9 function filter { grep -v -e '/_' -e vendor ; }
1010 function remove_relative_prefix { sed -e 's/^\.\///g' ; }
1111
1212 function directories {
8383 tags = []string{}
8484 passingOnly = true
8585 endpoints = addsvc.Endpoints{}
86 instancer = consulsd.NewInstancer(client, logger, "addsvc", tags, passingOnly)
8687 )
8788 {
8889 factory := addsvcFactory(addsvc.MakeSumEndpoint, tracer, logger)
89 subscriber := consulsd.NewSubscriber(client, factory, logger, "addsvc", tags, passingOnly)
90 balancer := lb.NewRoundRobin(subscriber)
90 endpointer := sd.NewEndpointer(instancer, factory, logger)
91 balancer := lb.NewRoundRobin(endpointer)
9192 retry := lb.Retry(*retryMax, *retryTimeout, balancer)
9293 endpoints.SumEndpoint = retry
9394 }
9495 {
9596 factory := addsvcFactory(addsvc.MakeConcatEndpoint, tracer, logger)
96 subscriber := consulsd.NewSubscriber(client, factory, logger, "addsvc", tags, passingOnly)
97 balancer := lb.NewRoundRobin(subscriber)
97 endpointer := sd.NewEndpointer(instancer, factory, logger)
98 balancer := lb.NewRoundRobin(endpointer)
9899 retry := lb.Retry(*retryMax, *retryTimeout, balancer)
99100 endpoints.ConcatEndpoint = retry
100101 }
119120 passingOnly = true
120121 uppercase endpoint.Endpoint
121122 count endpoint.Endpoint
123 instancer = consulsd.NewInstancer(client, logger, "stringsvc", tags, passingOnly)
122124 )
123125 {
124126 factory := stringsvcFactory(ctx, "GET", "/uppercase")
125 subscriber := consulsd.NewSubscriber(client, factory, logger, "stringsvc", tags, passingOnly)
126 balancer := lb.NewRoundRobin(subscriber)
127 endpointer := sd.NewEndpointer(instancer, factory, logger)
128 balancer := lb.NewRoundRobin(endpointer)
127129 retry := lb.Retry(*retryMax, *retryTimeout, balancer)
128130 uppercase = retry
129131 }
130132 {
131133 factory := stringsvcFactory(ctx, "GET", "/count")
132 subscriber := consulsd.NewSubscriber(client, factory, logger, "stringsvc", tags, passingOnly)
133 balancer := lb.NewRoundRobin(subscriber)
134 endpointer := sd.NewEndpointer(instancer, factory, logger)
135 balancer := lb.NewRoundRobin(endpointer)
134136 retry := lb.Retry(*retryMax, *retryTimeout, balancer)
135137 count = retry
136138 }
3939
4040 var (
4141 sdclient = consul.NewClient(apiclient)
42 instancer = consul.NewInstancer(sdclient, logger, consulService, consulTags, passingOnly)
4243 endpoints profilesvc.Endpoints
4344 )
45 // TODO: thought experiment
46 mapping := []struct {
47 factory func(s profilesvc.Service) endpoint.Endpoint
48 endpoint *endpoint.Endpoint
49 }{
50 {
51 factory: profilesvc.MakePostProfileEndpoint,
52 endpoint: &endpoints.PostProfileEndpoint,
53 },
54 {
55 factory: profilesvc.MakeGetProfileEndpoint,
56 endpoint: &endpoints.GetProfileEndpoint,
57 },
58 }
59 for _, m := range mapping {
60 factory := factoryFor(m.factory)
61 endpointer := sd.NewEndpointer(instancer, factory, logger)
62 balancer := lb.NewRoundRobin(endpointer)
63 retry := lb.Retry(retryMax, retryTimeout, balancer)
64 *m.endpoint = retry
65 }
66 // TODO: why not 2 lines per endpoint registration above instead of 7 lines per endpoint below?
4467 {
4568 factory := factoryFor(profilesvc.MakePostProfileEndpoint)
46 subscriber := consul.NewSubscriber(sdclient, factory, logger, consulService, consulTags, passingOnly)
47 balancer := lb.NewRoundRobin(subscriber)
69 endpointer := sd.NewEndpointer(instancer, factory, logger)
70 balancer := lb.NewRoundRobin(endpointer)
4871 retry := lb.Retry(retryMax, retryTimeout, balancer)
4972 endpoints.PostProfileEndpoint = retry
5073 }
5174 {
5275 factory := factoryFor(profilesvc.MakeGetProfileEndpoint)
53 subscriber := consul.NewSubscriber(sdclient, factory, logger, consulService, consulTags, passingOnly)
54 balancer := lb.NewRoundRobin(subscriber)
76 endpointer := sd.NewEndpointer(instancer, factory, logger)
77 balancer := lb.NewRoundRobin(endpointer)
5578 retry := lb.Retry(retryMax, retryTimeout, balancer)
5679 endpoints.GetProfileEndpoint = retry
5780 }
5881 {
5982 factory := factoryFor(profilesvc.MakePutProfileEndpoint)
60 subscriber := consul.NewSubscriber(sdclient, factory, logger, consulService, consulTags, passingOnly)
61 balancer := lb.NewRoundRobin(subscriber)
83 endpointer := sd.NewEndpointer(instancer, factory, logger)
84 balancer := lb.NewRoundRobin(endpointer)
6285 retry := lb.Retry(retryMax, retryTimeout, balancer)
6386 endpoints.PutProfileEndpoint = retry
6487 }
6588 {
6689 factory := factoryFor(profilesvc.MakePatchProfileEndpoint)
67 subscriber := consul.NewSubscriber(sdclient, factory, logger, consulService, consulTags, passingOnly)
68 balancer := lb.NewRoundRobin(subscriber)
90 endpointer := sd.NewEndpointer(instancer, factory, logger)
91 balancer := lb.NewRoundRobin(endpointer)
6992 retry := lb.Retry(retryMax, retryTimeout, balancer)
7093 endpoints.PatchProfileEndpoint = retry
7194 }
7295 {
7396 factory := factoryFor(profilesvc.MakeDeleteProfileEndpoint)
74 subscriber := consul.NewSubscriber(sdclient, factory, logger, consulService, consulTags, passingOnly)
75 balancer := lb.NewRoundRobin(subscriber)
97 endpointer := sd.NewEndpointer(instancer, factory, logger)
98 balancer := lb.NewRoundRobin(endpointer)
7699 retry := lb.Retry(retryMax, retryTimeout, balancer)
77100 endpoints.DeleteProfileEndpoint = retry
78101 }
79102 {
80103 factory := factoryFor(profilesvc.MakeGetAddressesEndpoint)
81 subscriber := consul.NewSubscriber(sdclient, factory, logger, consulService, consulTags, passingOnly)
82 balancer := lb.NewRoundRobin(subscriber)
104 endpointer := sd.NewEndpointer(instancer, factory, logger)
105 balancer := lb.NewRoundRobin(endpointer)
83106 retry := lb.Retry(retryMax, retryTimeout, balancer)
84107 endpoints.GetAddressesEndpoint = retry
85108 }
86109 {
87110 factory := factoryFor(profilesvc.MakeGetAddressEndpoint)
88 subscriber := consul.NewSubscriber(sdclient, factory, logger, consulService, consulTags, passingOnly)
89 balancer := lb.NewRoundRobin(subscriber)
111 endpointer := sd.NewEndpointer(instancer, factory, logger)
112 balancer := lb.NewRoundRobin(endpointer)
90113 retry := lb.Retry(retryMax, retryTimeout, balancer)
91114 endpoints.GetAddressEndpoint = retry
92115 }
93116 {
94117 factory := factoryFor(profilesvc.MakePostAddressEndpoint)
95 subscriber := consul.NewSubscriber(sdclient, factory, logger, consulService, consulTags, passingOnly)
96 balancer := lb.NewRoundRobin(subscriber)
118 endpointer := sd.NewEndpointer(instancer, factory, logger)
119 balancer := lb.NewRoundRobin(endpointer)
97120 retry := lb.Retry(retryMax, retryTimeout, balancer)
98121 endpoints.PostAddressEndpoint = retry
99122 }
100123 {
101124 factory := factoryFor(profilesvc.MakeDeleteAddressEndpoint)
102 subscriber := consul.NewSubscriber(sdclient, factory, logger, consulService, consulTags, passingOnly)
103 balancer := lb.NewRoundRobin(subscriber)
125 endpointer := sd.NewEndpointer(instancer, factory, logger)
126 balancer := lb.NewRoundRobin(endpointer)
104127 retry := lb.Retry(retryMax, retryTimeout, balancer)
105128 endpoints.DeleteAddressEndpoint = retry
106129 }
3939 // discovery system.
4040 var (
4141 instanceList = split(instances)
42 subscriber sd.FixedSubscriber
42 endpointer sd.FixedEndpointer
4343 )
4444 logger.Log("proxy_to", fmt.Sprint(instanceList))
4545 for _, instance := range instanceList {
4747 e = makeUppercaseProxy(ctx, instance)
4848 e = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{}))(e)
4949 e = ratelimit.NewTokenBucketLimiter(jujuratelimit.NewBucketWithRate(float64(qps), int64(qps)))(e)
50 subscriber = append(subscriber, e)
50 endpointer = append(endpointer, e)
5151 }
5252
5353 // Now, build a single, retrying, load-balancing endpoint out of all of
5454 // those individual endpoints.
55 balancer := lb.NewRoundRobin(subscriber)
55 balancer := lb.NewRoundRobin(endpointer)
5656 retry := lb.Retry(maxAttempts, maxTime, balancer)
5757
5858 // And finally, return the ServiceMiddleware, implemented by proxymw.
0 package sd
1
2 import (
3 "io"
4 "testing"
5
6 "github.com/go-kit/kit/endpoint"
7 "github.com/go-kit/kit/log"
8 )
9
10 func BenchmarkEndpoints(b *testing.B) {
11 var (
12 ca = make(closer)
13 cb = make(closer)
14 cmap = map[string]io.Closer{"a": ca, "b": cb}
15 factory = func(instance string) (endpoint.Endpoint, io.Closer, error) { return endpoint.Nop, cmap[instance], nil }
16 c = newEndpointCache(factory, log.NewNopLogger(), endpointerOptions{})
17 )
18
19 b.ReportAllocs()
20
21 c.Update(Event{Instances: []string{"a", "b"}})
22
23 b.RunParallel(func(pb *testing.PB) {
24 for pb.Next() {
25 c.Endpoints()
26 }
27 })
28 }
+0
-29
sd/cache/benchmark_test.go less more
0 package cache
1
2 import (
3 "io"
4 "testing"
5
6 "github.com/go-kit/kit/endpoint"
7 "github.com/go-kit/kit/log"
8 )
9
10 func BenchmarkEndpoints(b *testing.B) {
11 var (
12 ca = make(closer)
13 cb = make(closer)
14 cmap = map[string]io.Closer{"a": ca, "b": cb}
15 factory = func(instance string) (endpoint.Endpoint, io.Closer, error) { return endpoint.Nop, cmap[instance], nil }
16 c = New(factory, log.NewNopLogger())
17 )
18
19 b.ReportAllocs()
20
21 c.Update([]string{"a", "b"})
22
23 b.RunParallel(func(pb *testing.PB) {
24 for pb.Next() {
25 c.Endpoints()
26 }
27 })
28 }
+0
-96
sd/cache/cache.go less more
0 package cache
1
2 import (
3 "io"
4 "sort"
5 "sync"
6 "sync/atomic"
7
8 "github.com/go-kit/kit/endpoint"
9 "github.com/go-kit/kit/log"
10 "github.com/go-kit/kit/sd"
11 )
12
13 // Cache collects the most recent set of endpoints from a service discovery
14 // system via a subscriber, and makes them available to consumers. Cache is
15 // meant to be embedded inside of a concrete subscriber, and can serve Service
16 // invocations directly.
17 type Cache struct {
18 mtx sync.RWMutex
19 factory sd.Factory
20 cache map[string]endpointCloser
21 slice atomic.Value // []endpoint.Endpoint
22 logger log.Logger
23 }
24
25 type endpointCloser struct {
26 endpoint.Endpoint
27 io.Closer
28 }
29
30 // New returns a new, empty endpoint cache.
31 func New(factory sd.Factory, logger log.Logger) *Cache {
32 return &Cache{
33 factory: factory,
34 cache: map[string]endpointCloser{},
35 logger: logger,
36 }
37 }
38
39 // Update should be invoked by clients with a complete set of current instance
40 // strings whenever that set changes. The cache manufactures new endpoints via
41 // the factory, closes old endpoints when they disappear, and persists existing
42 // endpoints if they survive through an update.
43 func (c *Cache) Update(instances []string) {
44 c.mtx.Lock()
45 defer c.mtx.Unlock()
46
47 // Deterministic order (for later).
48 sort.Strings(instances)
49
50 // Produce the current set of services.
51 cache := make(map[string]endpointCloser, len(instances))
52 for _, instance := range instances {
53 // If it already exists, just copy it over.
54 if sc, ok := c.cache[instance]; ok {
55 cache[instance] = sc
56 delete(c.cache, instance)
57 continue
58 }
59
60 // If it doesn't exist, create it.
61 service, closer, err := c.factory(instance)
62 if err != nil {
63 c.logger.Log("instance", instance, "err", err)
64 continue
65 }
66 cache[instance] = endpointCloser{service, closer}
67 }
68
69 // Close any leftover endpoints.
70 for _, sc := range c.cache {
71 if sc.Closer != nil {
72 sc.Closer.Close()
73 }
74 }
75
76 // Populate the slice of endpoints.
77 slice := make([]endpoint.Endpoint, 0, len(cache))
78 for _, instance := range instances {
79 // A bad factory may mean an instance is not present.
80 if _, ok := cache[instance]; !ok {
81 continue
82 }
83 slice = append(slice, cache[instance].Endpoint)
84 }
85
86 // Swap and trigger GC for old copies.
87 c.slice.Store(slice)
88 c.cache = cache
89 }
90
91 // Endpoints yields the current set of (presumably identical) endpoints, ordered
92 // lexicographically by the corresponding instance string.
93 func (c *Cache) Endpoints() []endpoint.Endpoint {
94 return c.slice.Load().([]endpoint.Endpoint)
95 }
+0
-91
sd/cache/cache_test.go less more
0 package cache
1
2 import (
3 "errors"
4 "io"
5 "testing"
6 "time"
7
8 "github.com/go-kit/kit/endpoint"
9 "github.com/go-kit/kit/log"
10 )
11
12 func TestCache(t *testing.T) {
13 var (
14 ca = make(closer)
15 cb = make(closer)
16 c = map[string]io.Closer{"a": ca, "b": cb}
17 f = func(instance string) (endpoint.Endpoint, io.Closer, error) { return endpoint.Nop, c[instance], nil }
18 cache = New(f, log.NewNopLogger())
19 )
20
21 // Populate
22 cache.Update([]string{"a", "b"})
23 select {
24 case <-ca:
25 t.Errorf("endpoint a closed, not good")
26 case <-cb:
27 t.Errorf("endpoint b closed, not good")
28 case <-time.After(time.Millisecond):
29 t.Logf("no closures yet, good")
30 }
31 if want, have := 2, len(cache.Endpoints()); want != have {
32 t.Errorf("want %d, have %d", want, have)
33 }
34
35 // Duplicate, should be no-op
36 cache.Update([]string{"a", "b"})
37 select {
38 case <-ca:
39 t.Errorf("endpoint a closed, not good")
40 case <-cb:
41 t.Errorf("endpoint b closed, not good")
42 case <-time.After(time.Millisecond):
43 t.Logf("no closures yet, good")
44 }
45 if want, have := 2, len(cache.Endpoints()); want != have {
46 t.Errorf("want %d, have %d", want, have)
47 }
48
49 // Delete b
50 go cache.Update([]string{"a"})
51 select {
52 case <-ca:
53 t.Errorf("endpoint a closed, not good")
54 case <-cb:
55 t.Logf("endpoint b closed, good")
56 case <-time.After(time.Second):
57 t.Errorf("didn't close the deleted instance in time")
58 }
59 if want, have := 1, len(cache.Endpoints()); want != have {
60 t.Errorf("want %d, have %d", want, have)
61 }
62
63 // Delete a
64 go cache.Update([]string{})
65 select {
66 // case <-cb: will succeed, as it's closed
67 case <-ca:
68 t.Logf("endpoint a closed, good")
69 case <-time.After(time.Second):
70 t.Errorf("didn't close the deleted instance in time")
71 }
72 if want, have := 0, len(cache.Endpoints()); want != have {
73 t.Errorf("want %d, have %d", want, have)
74 }
75 }
76
77 func TestBadFactory(t *testing.T) {
78 cache := New(func(string) (endpoint.Endpoint, io.Closer, error) {
79 return nil, nil, errors.New("bad factory")
80 }, log.NewNopLogger())
81
82 cache.Update([]string{"foo:1234", "bar:5678"})
83 if want, have := 0, len(cache.Endpoints()); want != have {
84 t.Errorf("want %d, have %d", want, have)
85 }
86 }
87
88 type closer chan struct{}
89
90 func (c closer) Close() error { close(c); return nil }
0 package sd
1
2 import (
3 "io"
4 "sort"
5 "sync"
6 "time"
7
8 "github.com/go-kit/kit/endpoint"
9 "github.com/go-kit/kit/log"
10 )
11
12 // endpointCache collects the most recent set of instances from a service discovery
13 // system, creates endpoints for them using a factory function, and makes
14 // them available to consumers.
15 type endpointCache struct {
16 options endpointerOptions
17 mtx sync.RWMutex
18 factory Factory
19 cache map[string]endpointCloser
20 err error
21 endpoints []endpoint.Endpoint
22 logger log.Logger
23 invalidateDeadline time.Time
24 }
25
26 type endpointCloser struct {
27 endpoint.Endpoint
28 io.Closer
29 }
30
31 // newEndpointCache returns a new, empty endpointCache.
32 func newEndpointCache(factory Factory, logger log.Logger, options endpointerOptions) *endpointCache {
33 return &endpointCache{
34 options: options,
35 factory: factory,
36 cache: map[string]endpointCloser{},
37 logger: logger,
38 }
39 }
40
41 // Update should be invoked by clients with a complete set of current instance
42 // strings whenever that set changes. The cache manufactures new endpoints via
43 // the factory, closes old endpoints when they disappear, and persists existing
44 // endpoints if they survive through an update.
45 func (c *endpointCache) Update(event Event) {
46 c.mtx.Lock()
47 defer c.mtx.Unlock()
48
49 if event.Err == nil {
50 c.updateCache(event.Instances)
51 c.invalidateDeadline = time.Time{}
52 c.err = nil
53 }
54
55 c.logger.Log("err", event.Err)
56
57 if c.options.invalidateOnErrorTimeout == nil {
58 // keep returning the last known endpoints on error
59 return
60 }
61
62 c.err = event.Err
63
64 if !c.invalidateDeadline.IsZero() {
65 // aleady in the error state, do nothing
66 return
67 }
68 // set new deadline to invalidate Endpoints unless non-error Event is received
69 c.invalidateDeadline = time.Now().Add(*c.options.invalidateOnErrorTimeout)
70 return
71 }
72
73 func (c *endpointCache) updateCache(instances []string) {
74 // Deterministic order (for later).
75 sort.Strings(instances)
76
77 // Produce the current set of services.
78 cache := make(map[string]endpointCloser, len(instances))
79 for _, instance := range instances {
80 // If it already exists, just copy it over.
81 if sc, ok := c.cache[instance]; ok {
82 cache[instance] = sc
83 delete(c.cache, instance)
84 continue
85 }
86
87 // If it doesn't exist, create it.
88 service, closer, err := c.factory(instance)
89 if err != nil {
90 c.logger.Log("instance", instance, "err", err)
91 continue
92 }
93 cache[instance] = endpointCloser{service, closer}
94 }
95
96 // Close any leftover endpoints.
97 for _, sc := range c.cache {
98 if sc.Closer != nil {
99 sc.Closer.Close()
100 }
101 }
102
103 // Populate the slice of endpoints.
104 endpoints := make([]endpoint.Endpoint, 0, len(cache))
105 for _, instance := range instances {
106 // A bad factory may mean an instance is not present.
107 if _, ok := cache[instance]; !ok {
108 continue
109 }
110 endpoints = append(endpoints, cache[instance].Endpoint)
111 }
112
113 // Swap and trigger GC for old copies.
114 c.endpoints = endpoints
115 c.cache = cache
116 }
117
118 // Endpoints yields the current set of (presumably identical) endpoints, ordered
119 // lexicographically by the corresponding instance string.
120 func (c *endpointCache) Endpoints() ([]endpoint.Endpoint, error) {
121 c.mtx.RLock()
122
123 if c.err == nil || time.Now().Before(c.invalidateDeadline) {
124 defer c.mtx.RUnlock()
125 return c.endpoints, nil
126 }
127
128 c.mtx.RUnlock()
129 c.mtx.Lock()
130 defer c.mtx.Unlock()
131
132 c.updateCache(nil) // close any remaining active endpoints
133
134 return nil, c.err
135 }
0 package sd
1
2 import (
3 "errors"
4 "io"
5 "testing"
6 "time"
7
8 "github.com/go-kit/kit/endpoint"
9 "github.com/go-kit/kit/log"
10 )
11
12 func TestCache(t *testing.T) {
13 var (
14 ca = make(closer)
15 cb = make(closer)
16 c = map[string]io.Closer{"a": ca, "b": cb}
17 f = func(instance string) (endpoint.Endpoint, io.Closer, error) { return endpoint.Nop, c[instance], nil }
18 cache = newEndpointCache(f, log.NewNopLogger(), endpointerOptions{})
19 )
20
21 // Populate
22 cache.Update(Event{Instances: []string{"a", "b"}})
23 select {
24 case <-ca:
25 t.Errorf("endpoint a closed, not good")
26 case <-cb:
27 t.Errorf("endpoint b closed, not good")
28 case <-time.After(time.Millisecond):
29 t.Logf("no closures yet, good")
30 }
31 assertEndpointsLen(t, cache, 2)
32
33 // Duplicate, should be no-op
34 cache.Update(Event{Instances: []string{"a", "b"}})
35 select {
36 case <-ca:
37 t.Errorf("endpoint a closed, not good")
38 case <-cb:
39 t.Errorf("endpoint b closed, not good")
40 case <-time.After(time.Millisecond):
41 t.Logf("no closures yet, good")
42 }
43 assertEndpointsLen(t, cache, 2)
44
45 // Delete b
46 go cache.Update(Event{Instances: []string{"a"}})
47 select {
48 case <-ca:
49 t.Errorf("endpoint a closed, not good")
50 case <-cb:
51 t.Logf("endpoint b closed, good")
52 case <-time.After(time.Second):
53 t.Errorf("didn't close the deleted instance in time")
54 }
55 assertEndpointsLen(t, cache, 1)
56
57 // Delete a
58 go cache.Update(Event{Instances: []string{}})
59 select {
60 // case <-cb: will succeed, as it's closed
61 case <-ca:
62 t.Logf("endpoint a closed, good")
63 case <-time.After(time.Second):
64 t.Errorf("didn't close the deleted instance in time")
65 }
66 assertEndpointsLen(t, cache, 0)
67 }
68
69 func TestBadFactory(t *testing.T) {
70 cache := newEndpointCache(func(string) (endpoint.Endpoint, io.Closer, error) {
71 return nil, nil, errors.New("bad factory")
72 }, log.NewNopLogger(), endpointerOptions{})
73
74 cache.Update(Event{Instances: []string{"foo:1234", "bar:5678"}})
75 assertEndpointsLen(t, cache, 0)
76 }
77
78 func assertEndpointsLen(t *testing.T, cache *endpointCache, l int) {
79 endpoints, err := cache.Endpoints()
80 if err != nil {
81 t.Errorf("unexpected error %v", err)
82 return
83 }
84 if want, have := l, len(endpoints); want != have {
85 t.Errorf("want %d, have %d", want, have)
86 }
87 }
88
89 type closer chan struct{}
90
91 func (c closer) Close() error { close(c); return nil }
0 // Package consul provides subscriber and registrar implementations for Consul.
0 // Package consul provides Instancer and Registrar implementations for Consul.
11 package consul
0 package consul
1
2 import (
3 "fmt"
4 "io"
5
6 consul "github.com/hashicorp/consul/api"
7
8 "github.com/go-kit/kit/log"
9 "github.com/go-kit/kit/sd"
10 "github.com/go-kit/kit/sd/internal/instance"
11 )
12
13 const defaultIndex = 0
14
15 // Instancer yields instances for a service in Consul.
16 type Instancer struct {
17 instance.Cache
18 client Client
19 logger log.Logger
20 service string
21 tags []string
22 passingOnly bool
23 quitc chan struct{}
24 }
25
26 // NewInstancer returns a Consul instancer that publishes instances for the
27 // requested service. It only returns instances for which all of the passed tags
28 // are present.
29 func NewInstancer(client Client, logger log.Logger, service string, tags []string, passingOnly bool) *Instancer {
30 s := &Instancer{
31 Cache: *instance.NewCache(),
32 client: client,
33 logger: log.With(logger, "service", service, "tags", fmt.Sprint(tags)),
34 service: service,
35 tags: tags,
36 passingOnly: passingOnly,
37 quitc: make(chan struct{}),
38 }
39
40 instances, index, err := s.getInstances(defaultIndex, nil)
41 if err == nil {
42 s.logger.Log("instances", len(instances))
43 } else {
44 s.logger.Log("err", err)
45 }
46
47 s.Update(sd.Event{Instances: instances, Err: err})
48 go s.loop(index)
49 return s
50 }
51
52 // Stop terminates the instancer.
53 func (s *Instancer) Stop() {
54 close(s.quitc)
55 }
56
57 func (s *Instancer) loop(lastIndex uint64) {
58 var (
59 instances []string
60 err error
61 )
62 for {
63 instances, lastIndex, err = s.getInstances(lastIndex, s.quitc)
64 switch {
65 case err == io.EOF:
66 return // stopped via quitc
67 case err != nil:
68 s.logger.Log("err", err)
69 s.Update(sd.Event{Err: err})
70 default:
71 s.Update(sd.Event{Instances: instances})
72 }
73 }
74 }
75
76 func (s *Instancer) getInstances(lastIndex uint64, interruptc chan struct{}) ([]string, uint64, error) {
77 tag := ""
78 if len(s.tags) > 0 {
79 tag = s.tags[0]
80 }
81
82 // Consul doesn't support more than one tag in its service query method.
83 // https://github.com/hashicorp/consul/issues/294
84 // Hashi suggest prepared queries, but they don't support blocking.
85 // https://www.consul.io/docs/agent/http/query.html#execute
86 // If we want blocking for efficiency, we must filter tags manually.
87
88 type response struct {
89 instances []string
90 index uint64
91 }
92
93 var (
94 errc = make(chan error, 1)
95 resc = make(chan response, 1)
96 )
97
98 go func() {
99 entries, meta, err := s.client.Service(s.service, tag, s.passingOnly, &consul.QueryOptions{
100 WaitIndex: lastIndex,
101 })
102 if err != nil {
103 errc <- err
104 return
105 }
106 if len(s.tags) > 1 {
107 entries = filterEntries(entries, s.tags[1:]...)
108 }
109 resc <- response{
110 instances: makeInstances(entries),
111 index: meta.LastIndex,
112 }
113 }()
114
115 select {
116 case err := <-errc:
117 return nil, 0, err
118 case res := <-resc:
119 return res.instances, res.index, nil
120 case <-interruptc:
121 return nil, 0, io.EOF
122 }
123 }
124
125 func filterEntries(entries []*consul.ServiceEntry, tags ...string) []*consul.ServiceEntry {
126 var es []*consul.ServiceEntry
127
128 ENTRIES:
129 for _, entry := range entries {
130 ts := make(map[string]struct{}, len(entry.Service.Tags))
131 for _, tag := range entry.Service.Tags {
132 ts[tag] = struct{}{}
133 }
134
135 for _, tag := range tags {
136 if _, ok := ts[tag]; !ok {
137 continue ENTRIES
138 }
139 }
140 es = append(es, entry)
141 }
142
143 return es
144 }
145
146 func makeInstances(entries []*consul.ServiceEntry) []string {
147 instances := make([]string, len(entries))
148 for i, entry := range entries {
149 addr := entry.Node.Address
150 if entry.Service.Address != "" {
151 addr = entry.Service.Address
152 }
153 instances[i] = fmt.Sprintf("%s:%d", addr, entry.Service.Port)
154 }
155 return instances
156 }
0 package consul
1
2 import (
3 "context"
4 "testing"
5
6 consul "github.com/hashicorp/consul/api"
7
8 "github.com/go-kit/kit/log"
9 "github.com/go-kit/kit/sd"
10 )
11
12 var _ sd.Instancer = &Instancer{} // API check
13
14 var consulState = []*consul.ServiceEntry{
15 {
16 Node: &consul.Node{
17 Address: "10.0.0.0",
18 Node: "app00.local",
19 },
20 Service: &consul.AgentService{
21 ID: "search-api-0",
22 Port: 8000,
23 Service: "search",
24 Tags: []string{
25 "api",
26 "v1",
27 },
28 },
29 },
30 {
31 Node: &consul.Node{
32 Address: "10.0.0.1",
33 Node: "app01.local",
34 },
35 Service: &consul.AgentService{
36 ID: "search-api-1",
37 Port: 8001,
38 Service: "search",
39 Tags: []string{
40 "api",
41 "v2",
42 },
43 },
44 },
45 {
46 Node: &consul.Node{
47 Address: "10.0.0.1",
48 Node: "app01.local",
49 },
50 Service: &consul.AgentService{
51 Address: "10.0.0.10",
52 ID: "search-db-0",
53 Port: 9000,
54 Service: "search",
55 Tags: []string{
56 "db",
57 },
58 },
59 },
60 }
61
62 func TestInstancer(t *testing.T) {
63 var (
64 logger = log.NewNopLogger()
65 client = newTestClient(consulState)
66 )
67
68 s := NewInstancer(client, logger, "search", []string{"api"}, true)
69 defer s.Stop()
70
71 state := s.State()
72 if want, have := 2, len(state.Instances); want != have {
73 t.Errorf("want %d, have %d", want, have)
74 }
75 }
76
77 func TestInstancerNoService(t *testing.T) {
78 var (
79 logger = log.NewNopLogger()
80 client = newTestClient(consulState)
81 )
82
83 s := NewInstancer(client, logger, "feed", []string{}, true)
84 defer s.Stop()
85
86 state := s.State()
87 if want, have := 0, len(state.Instances); want != have {
88 t.Fatalf("want %d, have %d", want, have)
89 }
90 }
91
92 func TestInstancerWithTags(t *testing.T) {
93 var (
94 logger = log.NewNopLogger()
95 client = newTestClient(consulState)
96 )
97
98 s := NewInstancer(client, logger, "search", []string{"api", "v2"}, true)
99 defer s.Stop()
100
101 state := s.State()
102 if want, have := 1, len(state.Instances); want != have {
103 t.Fatalf("want %d, have %d", want, have)
104 }
105 }
106
107 func TestInstancerAddressOverride(t *testing.T) {
108 s := NewInstancer(newTestClient(consulState), log.NewNopLogger(), "search", []string{"db"}, true)
109 defer s.Stop()
110
111 state := s.State()
112 if want, have := 1, len(state.Instances); want != have {
113 t.Fatalf("want %d, have %d", want, have)
114 }
115
116 endpoint, closer, err := testFactory(state.Instances[0])
117 if err != nil {
118 t.Fatal(err)
119 }
120 if closer != nil {
121 defer closer.Close()
122 }
123
124 response, err := endpoint(context.Background(), struct{}{})
125 if err != nil {
126 t.Fatal(err)
127 }
128
129 if want, have := "10.0.0.10:9000", response.(string); want != have {
130 t.Errorf("want %q, have %q", want, have)
131 }
132 }
99
1010 "github.com/go-kit/kit/endpoint"
1111 "github.com/go-kit/kit/log"
12 "github.com/go-kit/kit/sd"
1213 stdconsul "github.com/hashicorp/consul/api"
1314 )
1415
3738 // skipping check(s)
3839 }
3940
40 // Build a subscriber on r.Name + r.Tags.
41 // Build an Instancer on r.Name + r.Tags.
4142 factory := func(instance string) (endpoint.Endpoint, io.Closer, error) {
4243 t.Logf("factory invoked for %q", instance)
4344 return endpoint.Nop, nil, nil
4445 }
45 subscriber := NewSubscriber(
46 instancer := NewInstancer(
4647 client,
47 factory,
48 log.With(logger, "component", "subscriber"),
48 log.With(logger, "component", "instancer"),
4949 r.Name,
5050 r.Tags,
5151 true,
52 )
53 endpointer := sd.NewEndpointer(
54 instancer,
55 factory,
56 log.With(logger, "component", "endpointer"),
5257 )
5358
5459 time.Sleep(time.Second)
5560
5661 // Before we publish, we should have no endpoints.
57 endpoints, err := subscriber.Endpoints()
62 endpoints, err := endpointer.Endpoints()
5863 if err != nil {
5964 t.Error(err)
6065 }
7075 time.Sleep(time.Second)
7176
7277 // Now we should have one active endpoints.
73 endpoints, err = subscriber.Endpoints()
78 endpoints, err = endpointer.Endpoints()
7479 if err != nil {
7580 t.Error(err)
7681 }
+0
-166
sd/consul/subscriber.go less more
0 package consul
1
2 import (
3 "fmt"
4 "io"
5
6 consul "github.com/hashicorp/consul/api"
7
8 "github.com/go-kit/kit/endpoint"
9 "github.com/go-kit/kit/log"
10 "github.com/go-kit/kit/sd"
11 "github.com/go-kit/kit/sd/cache"
12 )
13
14 const defaultIndex = 0
15
16 // Subscriber yields endpoints for a service in Consul. Updates to the service
17 // are watched and will update the Subscriber endpoints.
18 type Subscriber struct {
19 cache *cache.Cache
20 client Client
21 logger log.Logger
22 service string
23 tags []string
24 passingOnly bool
25 endpointsc chan []endpoint.Endpoint
26 quitc chan struct{}
27 }
28
29 var _ sd.Subscriber = &Subscriber{}
30
31 // NewSubscriber returns a Consul subscriber which returns endpoints for the
32 // requested service. It only returns instances for which all of the passed tags
33 // are present.
34 func NewSubscriber(client Client, factory sd.Factory, logger log.Logger, service string, tags []string, passingOnly bool) *Subscriber {
35 s := &Subscriber{
36 cache: cache.New(factory, logger),
37 client: client,
38 logger: log.With(logger, "service", service, "tags", fmt.Sprint(tags)),
39 service: service,
40 tags: tags,
41 passingOnly: passingOnly,
42 quitc: make(chan struct{}),
43 }
44
45 instances, index, err := s.getInstances(defaultIndex, nil)
46 if err == nil {
47 s.logger.Log("instances", len(instances))
48 } else {
49 s.logger.Log("err", err)
50 }
51
52 s.cache.Update(instances)
53 go s.loop(index)
54 return s
55 }
56
57 // Endpoints implements the Subscriber interface.
58 func (s *Subscriber) Endpoints() ([]endpoint.Endpoint, error) {
59 return s.cache.Endpoints(), nil
60 }
61
62 // Stop terminates the subscriber.
63 func (s *Subscriber) Stop() {
64 close(s.quitc)
65 }
66
67 func (s *Subscriber) loop(lastIndex uint64) {
68 var (
69 instances []string
70 err error
71 )
72 for {
73 instances, lastIndex, err = s.getInstances(lastIndex, s.quitc)
74 switch {
75 case err == io.EOF:
76 return // stopped via quitc
77 case err != nil:
78 s.logger.Log("err", err)
79 default:
80 s.cache.Update(instances)
81 }
82 }
83 }
84
85 func (s *Subscriber) getInstances(lastIndex uint64, interruptc chan struct{}) ([]string, uint64, error) {
86 tag := ""
87 if len(s.tags) > 0 {
88 tag = s.tags[0]
89 }
90
91 // Consul doesn't support more than one tag in its service query method.
92 // https://github.com/hashicorp/consul/issues/294
93 // Hashi suggest prepared queries, but they don't support blocking.
94 // https://www.consul.io/docs/agent/http/query.html#execute
95 // If we want blocking for efficiency, we must filter tags manually.
96
97 type response struct {
98 instances []string
99 index uint64
100 }
101
102 var (
103 errc = make(chan error, 1)
104 resc = make(chan response, 1)
105 )
106
107 go func() {
108 entries, meta, err := s.client.Service(s.service, tag, s.passingOnly, &consul.QueryOptions{
109 WaitIndex: lastIndex,
110 })
111 if err != nil {
112 errc <- err
113 return
114 }
115 if len(s.tags) > 1 {
116 entries = filterEntries(entries, s.tags[1:]...)
117 }
118 resc <- response{
119 instances: makeInstances(entries),
120 index: meta.LastIndex,
121 }
122 }()
123
124 select {
125 case err := <-errc:
126 return nil, 0, err
127 case res := <-resc:
128 return res.instances, res.index, nil
129 case <-interruptc:
130 return nil, 0, io.EOF
131 }
132 }
133
134 func filterEntries(entries []*consul.ServiceEntry, tags ...string) []*consul.ServiceEntry {
135 var es []*consul.ServiceEntry
136
137 ENTRIES:
138 for _, entry := range entries {
139 ts := make(map[string]struct{}, len(entry.Service.Tags))
140 for _, tag := range entry.Service.Tags {
141 ts[tag] = struct{}{}
142 }
143
144 for _, tag := range tags {
145 if _, ok := ts[tag]; !ok {
146 continue ENTRIES
147 }
148 }
149 es = append(es, entry)
150 }
151
152 return es
153 }
154
155 func makeInstances(entries []*consul.ServiceEntry) []string {
156 instances := make([]string, len(entries))
157 for i, entry := range entries {
158 addr := entry.Node.Address
159 if entry.Service.Address != "" {
160 addr = entry.Service.Address
161 }
162 instances[i] = fmt.Sprintf("%s:%d", addr, entry.Service.Port)
163 }
164 return instances
165 }
+0
-138
sd/consul/subscriber_test.go less more
0 package consul
1
2 import (
3 "context"
4 "testing"
5
6 consul "github.com/hashicorp/consul/api"
7
8 "github.com/go-kit/kit/log"
9 )
10
11 var consulState = []*consul.ServiceEntry{
12 {
13 Node: &consul.Node{
14 Address: "10.0.0.0",
15 Node: "app00.local",
16 },
17 Service: &consul.AgentService{
18 ID: "search-api-0",
19 Port: 8000,
20 Service: "search",
21 Tags: []string{
22 "api",
23 "v1",
24 },
25 },
26 },
27 {
28 Node: &consul.Node{
29 Address: "10.0.0.1",
30 Node: "app01.local",
31 },
32 Service: &consul.AgentService{
33 ID: "search-api-1",
34 Port: 8001,
35 Service: "search",
36 Tags: []string{
37 "api",
38 "v2",
39 },
40 },
41 },
42 {
43 Node: &consul.Node{
44 Address: "10.0.0.1",
45 Node: "app01.local",
46 },
47 Service: &consul.AgentService{
48 Address: "10.0.0.10",
49 ID: "search-db-0",
50 Port: 9000,
51 Service: "search",
52 Tags: []string{
53 "db",
54 },
55 },
56 },
57 }
58
59 func TestSubscriber(t *testing.T) {
60 var (
61 logger = log.NewNopLogger()
62 client = newTestClient(consulState)
63 )
64
65 s := NewSubscriber(client, testFactory, logger, "search", []string{"api"}, true)
66 defer s.Stop()
67
68 endpoints, err := s.Endpoints()
69 if err != nil {
70 t.Fatal(err)
71 }
72
73 if want, have := 2, len(endpoints); want != have {
74 t.Errorf("want %d, have %d", want, have)
75 }
76 }
77
78 func TestSubscriberNoService(t *testing.T) {
79 var (
80 logger = log.NewNopLogger()
81 client = newTestClient(consulState)
82 )
83
84 s := NewSubscriber(client, testFactory, logger, "feed", []string{}, true)
85 defer s.Stop()
86
87 endpoints, err := s.Endpoints()
88 if err != nil {
89 t.Fatal(err)
90 }
91
92 if want, have := 0, len(endpoints); want != have {
93 t.Fatalf("want %d, have %d", want, have)
94 }
95 }
96
97 func TestSubscriberWithTags(t *testing.T) {
98 var (
99 logger = log.NewNopLogger()
100 client = newTestClient(consulState)
101 )
102
103 s := NewSubscriber(client, testFactory, logger, "search", []string{"api", "v2"}, true)
104 defer s.Stop()
105
106 endpoints, err := s.Endpoints()
107 if err != nil {
108 t.Fatal(err)
109 }
110
111 if want, have := 1, len(endpoints); want != have {
112 t.Fatalf("want %d, have %d", want, have)
113 }
114 }
115
116 func TestSubscriberAddressOverride(t *testing.T) {
117 s := NewSubscriber(newTestClient(consulState), testFactory, log.NewNopLogger(), "search", []string{"db"}, true)
118 defer s.Stop()
119
120 endpoints, err := s.Endpoints()
121 if err != nil {
122 t.Fatal(err)
123 }
124
125 if want, have := 1, len(endpoints); want != have {
126 t.Fatalf("want %d, have %d", want, have)
127 }
128
129 response, err := endpoints[0](context.Background(), struct{}{})
130 if err != nil {
131 t.Fatal(err)
132 }
133
134 if want, have := "10.0.0.10:9000", response.(string); want != have {
135 t.Errorf("want %q, have %q", want, have)
136 }
137 }
0 // Package dnssrv provides a subscriber implementation for DNS SRV records.
0 // Package dnssrv provides an Instancer implementation for DNS SRV records.
11 package dnssrv
0 package dnssrv
1
2 import (
3 "fmt"
4 "net"
5 "time"
6
7 "github.com/go-kit/kit/log"
8 "github.com/go-kit/kit/sd"
9 "github.com/go-kit/kit/sd/internal/instance"
10 )
11
12 // Instancer yields instances from the named DNS SRV record. The name is
13 // resolved on a fixed schedule. Priorities and weights are ignored.
14 type Instancer struct {
15 instance.Cache
16 name string
17 logger log.Logger
18 quit chan struct{}
19 }
20
21 // NewInstancer returns a DNS SRV instancer.
22 func NewInstancer(
23 name string,
24 ttl time.Duration,
25 logger log.Logger,
26 ) *Instancer {
27 return NewInstancerDetailed(name, time.NewTicker(ttl), net.LookupSRV, logger)
28 }
29
30 // NewInstancerDetailed is the same as NewInstancer, but allows users to
31 // provide an explicit lookup refresh ticker instead of a TTL, and specify the
32 // lookup function instead of using net.LookupSRV.
33 func NewInstancerDetailed(
34 name string,
35 refresh *time.Ticker,
36 lookup Lookup,
37 logger log.Logger,
38 ) *Instancer {
39 p := &Instancer{
40 Cache: *instance.NewCache(),
41 name: name,
42 logger: logger,
43 quit: make(chan struct{}),
44 }
45
46 instances, err := p.resolve(lookup)
47 if err == nil {
48 logger.Log("name", name, "instances", len(instances))
49 } else {
50 logger.Log("name", name, "err", err)
51 }
52 p.Update(sd.Event{Instances: instances, Err: err})
53
54 go p.loop(refresh, lookup)
55 return p
56 }
57
58 // Stop terminates the Instancer.
59 func (p *Instancer) Stop() {
60 close(p.quit)
61 }
62
63 func (p *Instancer) loop(t *time.Ticker, lookup Lookup) {
64 defer t.Stop()
65 for {
66 select {
67 case <-t.C:
68 instances, err := p.resolve(lookup)
69 if err != nil {
70 p.logger.Log("name", p.name, "err", err)
71 p.Update(sd.Event{Err: err})
72 continue // don't replace potentially-good with bad
73 }
74 p.Update(sd.Event{Instances: instances})
75
76 case <-p.quit:
77 return
78 }
79 }
80 }
81
82 func (p *Instancer) resolve(lookup Lookup) ([]string, error) {
83 _, addrs, err := lookup("", "", p.name)
84 if err != nil {
85 return nil, err
86 }
87 instances := make([]string, len(addrs))
88 for i, addr := range addrs {
89 instances[i] = net.JoinHostPort(addr.Target, fmt.Sprint(addr.Port))
90 }
91 return instances, nil
92 }
0 package dnssrv
1
2 import (
3 "net"
4 "sync/atomic"
5 "testing"
6 "time"
7
8 "github.com/go-kit/kit/log"
9 "github.com/go-kit/kit/sd"
10 )
11
12 var _ sd.Instancer = &Instancer{} // API check
13
14 func TestRefresh(t *testing.T) {
15 name := "some.service.internal"
16
17 ticker := time.NewTicker(time.Second)
18 ticker.Stop()
19 tickc := make(chan time.Time)
20 ticker.C = tickc
21
22 var lookups uint64
23 records := []*net.SRV{}
24 lookup := func(service, proto, name string) (string, []*net.SRV, error) {
25 t.Logf("lookup(%q, %q, %q)", service, proto, name)
26 atomic.AddUint64(&lookups, 1)
27 return "cname", records, nil
28 }
29
30 instancer := NewInstancerDetailed(name, ticker, lookup, log.NewNopLogger())
31 defer instancer.Stop()
32
33 // First lookup, empty
34 state := instancer.State()
35 if state.Err != nil {
36 t.Error(state.Err)
37 }
38 if want, have := 0, len(state.Instances); want != have {
39 t.Errorf("want %d, have %d", want, have)
40 }
41 if want, have := uint64(1), atomic.LoadUint64(&lookups); want != have {
42 t.Errorf("want %d, have %d", want, have)
43 }
44
45 // Load some records and lookup again
46 records = []*net.SRV{
47 {Target: "1.0.0.1", Port: 1001},
48 {Target: "1.0.0.2", Port: 1002},
49 {Target: "1.0.0.3", Port: 1003},
50 }
51 tickc <- time.Now()
52
53 // There is a race condition where the instancer.State call below
54 // invokes the cache before it is updated by the tick above.
55 // TODO(pb): solve by running the read through the loop goroutine.
56 time.Sleep(100 * time.Millisecond)
57
58 state = instancer.State()
59 if state.Err != nil {
60 t.Error(state.Err)
61 }
62 if want, have := 3, len(state.Instances); want != have {
63 t.Errorf("want %d, have %d", want, have)
64 }
65 if want, have := uint64(2), atomic.LoadUint64(&lookups); want != have {
66 t.Errorf("want %d, have %d", want, have)
67 }
68 }
69
70 type nopCloser struct{}
71
72 func (nopCloser) Close() error { return nil }
+0
-100
sd/dnssrv/subscriber.go less more
0 package dnssrv
1
2 import (
3 "fmt"
4 "net"
5 "time"
6
7 "github.com/go-kit/kit/endpoint"
8 "github.com/go-kit/kit/log"
9 "github.com/go-kit/kit/sd"
10 "github.com/go-kit/kit/sd/cache"
11 )
12
13 // Subscriber yields endpoints taken from the named DNS SRV record. The name is
14 // resolved on a fixed schedule. Priorities and weights are ignored.
15 type Subscriber struct {
16 name string
17 cache *cache.Cache
18 logger log.Logger
19 quit chan struct{}
20 }
21
22 // NewSubscriber returns a DNS SRV subscriber.
23 func NewSubscriber(
24 name string,
25 ttl time.Duration,
26 factory sd.Factory,
27 logger log.Logger,
28 ) *Subscriber {
29 return NewSubscriberDetailed(name, time.NewTicker(ttl), net.LookupSRV, factory, logger)
30 }
31
32 // NewSubscriberDetailed is the same as NewSubscriber, but allows users to
33 // provide an explicit lookup refresh ticker instead of a TTL, and specify the
34 // lookup function instead of using net.LookupSRV.
35 func NewSubscriberDetailed(
36 name string,
37 refresh *time.Ticker,
38 lookup Lookup,
39 factory sd.Factory,
40 logger log.Logger,
41 ) *Subscriber {
42 p := &Subscriber{
43 name: name,
44 cache: cache.New(factory, logger),
45 logger: logger,
46 quit: make(chan struct{}),
47 }
48
49 instances, err := p.resolve(lookup)
50 if err == nil {
51 logger.Log("name", name, "instances", len(instances))
52 } else {
53 logger.Log("name", name, "err", err)
54 }
55 p.cache.Update(instances)
56
57 go p.loop(refresh, lookup)
58 return p
59 }
60
61 // Stop terminates the Subscriber.
62 func (p *Subscriber) Stop() {
63 close(p.quit)
64 }
65
66 func (p *Subscriber) loop(t *time.Ticker, lookup Lookup) {
67 defer t.Stop()
68 for {
69 select {
70 case <-t.C:
71 instances, err := p.resolve(lookup)
72 if err != nil {
73 p.logger.Log("name", p.name, "err", err)
74 continue // don't replace potentially-good with bad
75 }
76 p.cache.Update(instances)
77
78 case <-p.quit:
79 return
80 }
81 }
82 }
83
84 // Endpoints implements the Subscriber interface.
85 func (p *Subscriber) Endpoints() ([]endpoint.Endpoint, error) {
86 return p.cache.Endpoints(), nil
87 }
88
89 func (p *Subscriber) resolve(lookup Lookup) ([]string, error) {
90 _, addrs, err := lookup("", "", p.name)
91 if err != nil {
92 return []string{}, err
93 }
94 instances := make([]string, len(addrs))
95 for i, addr := range addrs {
96 instances[i] = net.JoinHostPort(addr.Target, fmt.Sprint(addr.Port))
97 }
98 return instances, nil
99 }
+0
-85
sd/dnssrv/subscriber_test.go less more
0 package dnssrv
1
2 import (
3 "io"
4 "net"
5 "sync/atomic"
6 "testing"
7 "time"
8
9 "github.com/go-kit/kit/endpoint"
10 "github.com/go-kit/kit/log"
11 )
12
13 func TestRefresh(t *testing.T) {
14 name := "some.service.internal"
15
16 ticker := time.NewTicker(time.Second)
17 ticker.Stop()
18 tickc := make(chan time.Time)
19 ticker.C = tickc
20
21 var lookups uint64
22 records := []*net.SRV{}
23 lookup := func(service, proto, name string) (string, []*net.SRV, error) {
24 t.Logf("lookup(%q, %q, %q)", service, proto, name)
25 atomic.AddUint64(&lookups, 1)
26 return "cname", records, nil
27 }
28
29 var generates uint64
30 factory := func(instance string) (endpoint.Endpoint, io.Closer, error) {
31 t.Logf("factory(%q)", instance)
32 atomic.AddUint64(&generates, 1)
33 return endpoint.Nop, nopCloser{}, nil
34 }
35
36 subscriber := NewSubscriberDetailed(name, ticker, lookup, factory, log.NewNopLogger())
37 defer subscriber.Stop()
38
39 // First lookup, empty
40 endpoints, err := subscriber.Endpoints()
41 if err != nil {
42 t.Error(err)
43 }
44 if want, have := 0, len(endpoints); want != have {
45 t.Errorf("want %d, have %d", want, have)
46 }
47 if want, have := uint64(1), atomic.LoadUint64(&lookups); want != have {
48 t.Errorf("want %d, have %d", want, have)
49 }
50 if want, have := uint64(0), atomic.LoadUint64(&generates); want != have {
51 t.Errorf("want %d, have %d", want, have)
52 }
53
54 // Load some records and lookup again
55 records = []*net.SRV{
56 {Target: "1.0.0.1", Port: 1001},
57 {Target: "1.0.0.2", Port: 1002},
58 {Target: "1.0.0.3", Port: 1003},
59 }
60 tickc <- time.Now()
61
62 // There is a race condition where the subscriber.Endpoints call below
63 // invokes the cache before it is updated by the tick above.
64 // TODO(pb): solve by running the read through the loop goroutine.
65 time.Sleep(100 * time.Millisecond)
66
67 endpoints, err = subscriber.Endpoints()
68 if err != nil {
69 t.Error(err)
70 }
71 if want, have := 3, len(endpoints); want != have {
72 t.Errorf("want %d, have %d", want, have)
73 }
74 if want, have := uint64(2), atomic.LoadUint64(&lookups); want != have {
75 t.Errorf("want %d, have %d", want, have)
76 }
77 if want, have := uint64(len(records)), atomic.LoadUint64(&generates); want != have {
78 t.Errorf("want %d, have %d", want, have)
79 }
80 }
81
82 type nopCloser struct{}
83
84 func (nopCloser) Close() error { return nil }
0 package sd
1
2 import (
3 "time"
4
5 "github.com/go-kit/kit/endpoint"
6 "github.com/go-kit/kit/log"
7 )
8
9 // Endpointer listens to a service discovery system and yields a set of
10 // identical endpoints on demand. An error indicates a problem with connectivity
11 // to the service discovery system, or within the system itself; an Endpointer
12 // may yield no endpoints without error.
13 type Endpointer interface {
14 Endpoints() ([]endpoint.Endpoint, error)
15 }
16
17 // FixedEndpointer yields a fixed set of endpoints.
18 type FixedEndpointer []endpoint.Endpoint
19
20 // Endpoints implements Endpointer.
21 func (s FixedEndpointer) Endpoints() ([]endpoint.Endpoint, error) { return s, nil }
22
23 // NewEndpointer creates an Endpointer that subscribes to updates from Instancer src
24 // and uses factory f to create Endpoints. If src notifies of an error, the Endpointer
25 // keeps returning previously created Endpoints assuming they are still good, unless
26 // this behavior is disabled with ResetOnError option.
27 func NewEndpointer(src Instancer, f Factory, logger log.Logger, options ...EndpointerOption) Endpointer {
28 opts := endpointerOptions{}
29 for _, opt := range options {
30 opt(&opts)
31 }
32 se := &simpleEndpointer{
33 endpointCache: *newEndpointCache(f, logger, opts),
34 instancer: src,
35 ch: make(chan Event),
36 }
37 go se.receive()
38 src.Register(se.ch)
39 return se
40 }
41
42 // EndpointerOption allows control of endpointCache behavior.
43 type EndpointerOption func(*endpointerOptions)
44
45 // InvalidateOnError returns EndpointerOption that controls how the Endpointer
46 // behaves when then Instancer publishes an Event containing an error.
47 // Without this option the Endpointer continues returning the last known
48 // endpoints. With this option, the Endpointer continues returning the last
49 // known endpoints until the timeout elapses, then closes all active endpoints
50 // and starts returning an error. Once the Instancer sends a new update with
51 // valid resource instances, the normal operation is resumed.
52 func InvalidateOnError(timeout time.Duration) EndpointerOption {
53 return func(opts *endpointerOptions) {
54 opts.invalidateOnErrorTimeout = &timeout
55 }
56 }
57
58 type endpointerOptions struct {
59 invalidateOnErrorTimeout *time.Duration
60 }
61
62 type simpleEndpointer struct {
63 endpointCache
64
65 instancer Instancer
66 ch chan Event
67 }
68
69 func (se *simpleEndpointer) receive() {
70 for event := range se.ch {
71 se.Update(event)
72 }
73 }
74
75 func (se *simpleEndpointer) Close() {
76 se.instancer.Deregister(se.ch)
77 close(se.ch)
78 }
0 // Package etcd provides a Subscriber and Registrar implementation for etcd. If
0 // Package etcd provides an Instancer and Registrar implementation for etcd. If
11 // you use etcd as your service discovery system, this package will help you
22 // implement the registration and client-side load balancing patterns.
33 package etcd
66
77 "github.com/go-kit/kit/endpoint"
88 "github.com/go-kit/kit/log"
9 "github.com/go-kit/kit/sd"
910 "github.com/go-kit/kit/sd/lb"
1011 )
1112
4344 defer registrar.Deregister()
4445
4546 // It's likely that we'll also want to connect to other services and call
46 // their methods. We can build a subscriber to listen for changes from etcd
47 // and build endpoints, wrap it with a load-balancer to pick a single
47 // their methods. We can build an Instancer to listen for changes from etcd,
48 // create Endpointer, wrap it with a load-balancer to pick a single
4849 // endpoint, and finally wrap it with a retry strategy to get something that
4950 // can be used as an endpoint directly.
5051 barPrefix := "/services/barsvc"
51 subscriber, err := NewSubscriber(client, barPrefix, barFactory, log.NewNopLogger())
52 logger := log.NewNopLogger()
53 instancer, err := NewInstancer(client, barPrefix, logger)
5254 if err != nil {
5355 panic(err)
5456 }
55 balancer := lb.NewRoundRobin(subscriber)
57 endpointer := sd.NewEndpointer(instancer, barFactory, logger)
58 balancer := lb.NewRoundRobin(endpointer)
5659 retry := lb.Retry(3, 3*time.Second, balancer)
5760
5861 // And now retry can be used like any other endpoint.
0 package etcd
1
2 import (
3 "github.com/go-kit/kit/log"
4 "github.com/go-kit/kit/sd"
5 "github.com/go-kit/kit/sd/internal/instance"
6 )
7
8 // Instancer yields instances stored in a certain etcd keyspace. Any kind of
9 // change in that keyspace is watched and will update the Instancer's Instancers.
10 type Instancer struct {
11 instance.Cache
12 client Client
13 prefix string
14 logger log.Logger
15 quitc chan struct{}
16 }
17
18 // NewInstancer returns an etcd instancer. It will start watching the given
19 // prefix for changes, and update the subscribers.
20 func NewInstancer(c Client, prefix string, logger log.Logger) (*Instancer, error) {
21 s := &Instancer{
22 client: c,
23 prefix: prefix,
24 Cache: *instance.NewCache(),
25 logger: logger,
26 quitc: make(chan struct{}),
27 }
28
29 instances, err := s.client.GetEntries(s.prefix)
30 if err == nil {
31 logger.Log("prefix", s.prefix, "instances", len(instances))
32 } else {
33 logger.Log("prefix", s.prefix, "err", err)
34 }
35 s.Update(sd.Event{Instances: instances, Err: err})
36
37 go s.loop()
38 return s, nil
39 }
40
41 func (s *Instancer) loop() {
42 ch := make(chan struct{})
43 go s.client.WatchPrefix(s.prefix, ch)
44 for {
45 select {
46 case <-ch:
47 instances, err := s.client.GetEntries(s.prefix)
48 if err != nil {
49 s.logger.Log("msg", "failed to retrieve entries", "err", err)
50 s.Update(sd.Event{Err: err})
51 continue
52 }
53 s.Update(sd.Event{Instances: instances})
54
55 case <-s.quitc:
56 return
57 }
58 }
59 }
60
61 // Stop terminates the Instancer.
62 func (s *Instancer) Stop() {
63 close(s.quitc)
64 }
0 package etcd
1
2 import (
3 "errors"
4 "testing"
5
6 stdetcd "github.com/coreos/etcd/client"
7
8 "github.com/go-kit/kit/log"
9 "github.com/go-kit/kit/sd"
10 )
11
12 var (
13 node = &stdetcd.Node{
14 Key: "/foo",
15 Nodes: []*stdetcd.Node{
16 {Key: "/foo/1", Value: "1:1"},
17 {Key: "/foo/2", Value: "1:2"},
18 },
19 }
20 fakeResponse = &stdetcd.Response{
21 Node: node,
22 }
23 )
24
25 var _ sd.Instancer = &Instancer{} // API check
26
27 func TestInstancer(t *testing.T) {
28 client := &fakeClient{
29 responses: map[string]*stdetcd.Response{"/foo": fakeResponse},
30 }
31
32 s, err := NewInstancer(client, "/foo", log.NewNopLogger())
33 if err != nil {
34 t.Fatal(err)
35 }
36 defer s.Stop()
37
38 if state := s.State(); state.Err != nil {
39 t.Fatal(state.Err)
40 }
41 }
42
43 type fakeClient struct {
44 responses map[string]*stdetcd.Response
45 }
46
47 func (c *fakeClient) GetEntries(prefix string) ([]string, error) {
48 response, ok := c.responses[prefix]
49 if !ok {
50 return nil, errors.New("key not exist")
51 }
52
53 entries := make([]string, len(response.Node.Nodes))
54 for i, node := range response.Node.Nodes {
55 entries[i] = node.Value
56 }
57 return entries, nil
58 }
59
60 func (c *fakeClient) WatchPrefix(prefix string, ch chan struct{}) {}
61
62 func (c *fakeClient) Register(Service) error {
63 return nil
64 }
65 func (c *fakeClient) Deregister(Service) error {
66 return nil
67 }
1010
1111 "github.com/go-kit/kit/endpoint"
1212 "github.com/go-kit/kit/log"
13 "github.com/go-kit/kit/sd"
1314 )
1415
1516 // Package sd/etcd provides a wrapper around the etcd key/value store. This
6667 t.Fatalf("want %q, have %q", want, have)
6768 }
6869
69 subscriber, err := NewSubscriber(
70 instancer, err := NewInstancer(
7071 client,
7172 prefix,
72 func(string) (endpoint.Endpoint, io.Closer, error) { return endpoint.Nop, nil, nil },
73 log.With(log.NewLogfmtLogger(os.Stderr), "component", "subscriber"),
73 log.With(log.NewLogfmtLogger(os.Stderr), "component", "instancer"),
7474 )
7575 if err != nil {
76 t.Fatalf("NewSubscriber: %v", err)
76 t.Fatalf("NewInstancer: %v", err)
7777 }
78 t.Logf("Constructed Subscriber OK")
78 endpointer := sd.NewEndpointer(
79 instancer,
80 func(string) (endpoint.Endpoint, io.Closer, error) { return endpoint.Nop, nil, nil },
81 log.With(log.NewLogfmtLogger(os.Stderr), "component", "instancer"),
82 )
83 t.Logf("Constructed Endpointer OK")
7984
8085 if !within(time.Second, func() bool {
81 endpoints, err := subscriber.Endpoints()
86 endpoints, err := endpointer.Endpoints()
8287 return err == nil && len(endpoints) == 1
8388 }) {
84 t.Fatalf("Subscriber didn't see Register in time")
89 t.Fatalf("Endpointer didn't see Register in time")
8590 }
86 t.Logf("Subscriber saw Register OK")
91 t.Logf("Endpointer saw Register OK")
8792
8893 // Deregister first instance of test data.
8994 registrar.Deregister()
9196
9297 // Check it was deregistered.
9398 if !within(time.Second, func() bool {
94 endpoints, err := subscriber.Endpoints()
99 endpoints, err := endpointer.Endpoints()
95100 t.Logf("Checking Deregister: len(endpoints) = %d, err = %v", len(endpoints), err)
96101 return err == nil && len(endpoints) == 0
97102 }) {
98 t.Fatalf("Subscriber didn't see Deregister in time")
103 t.Fatalf("Endpointer didn't see Deregister in time")
99104 }
100105
101106 // Verify test data no longer exists in etcd.
+0
-72
sd/etcd/subscriber.go less more
0 package etcd
1
2 import (
3 "github.com/go-kit/kit/endpoint"
4 "github.com/go-kit/kit/log"
5 "github.com/go-kit/kit/sd"
6 "github.com/go-kit/kit/sd/cache"
7 )
8
9 // Subscriber yield endpoints stored in a certain etcd keyspace. Any kind of
10 // change in that keyspace is watched and will update the Subscriber endpoints.
11 type Subscriber struct {
12 client Client
13 prefix string
14 cache *cache.Cache
15 logger log.Logger
16 quitc chan struct{}
17 }
18
19 var _ sd.Subscriber = &Subscriber{}
20
21 // NewSubscriber returns an etcd subscriber. It will start watching the given
22 // prefix for changes, and update the endpoints.
23 func NewSubscriber(c Client, prefix string, factory sd.Factory, logger log.Logger) (*Subscriber, error) {
24 s := &Subscriber{
25 client: c,
26 prefix: prefix,
27 cache: cache.New(factory, logger),
28 logger: logger,
29 quitc: make(chan struct{}),
30 }
31
32 instances, err := s.client.GetEntries(s.prefix)
33 if err == nil {
34 logger.Log("prefix", s.prefix, "instances", len(instances))
35 } else {
36 logger.Log("prefix", s.prefix, "err", err)
37 }
38 s.cache.Update(instances)
39
40 go s.loop()
41 return s, nil
42 }
43
44 func (s *Subscriber) loop() {
45 ch := make(chan struct{})
46 go s.client.WatchPrefix(s.prefix, ch)
47 for {
48 select {
49 case <-ch:
50 instances, err := s.client.GetEntries(s.prefix)
51 if err != nil {
52 s.logger.Log("msg", "failed to retrieve entries", "err", err)
53 continue
54 }
55 s.cache.Update(instances)
56
57 case <-s.quitc:
58 return
59 }
60 }
61 }
62
63 // Endpoints implements the Subscriber interface.
64 func (s *Subscriber) Endpoints() ([]endpoint.Endpoint, error) {
65 return s.cache.Endpoints(), nil
66 }
67
68 // Stop terminates the Subscriber.
69 func (s *Subscriber) Stop() {
70 close(s.quitc)
71 }
+0
-96
sd/etcd/subscriber_test.go less more
0 package etcd
1
2 import (
3 "errors"
4 "io"
5 "testing"
6
7 stdetcd "github.com/coreos/etcd/client"
8
9 "github.com/go-kit/kit/endpoint"
10 "github.com/go-kit/kit/log"
11 )
12
13 var (
14 node = &stdetcd.Node{
15 Key: "/foo",
16 Nodes: []*stdetcd.Node{
17 {Key: "/foo/1", Value: "1:1"},
18 {Key: "/foo/2", Value: "1:2"},
19 },
20 }
21 fakeResponse = &stdetcd.Response{
22 Node: node,
23 }
24 )
25
26 func TestSubscriber(t *testing.T) {
27 factory := func(string) (endpoint.Endpoint, io.Closer, error) {
28 return endpoint.Nop, nil, nil
29 }
30
31 client := &fakeClient{
32 responses: map[string]*stdetcd.Response{"/foo": fakeResponse},
33 }
34
35 s, err := NewSubscriber(client, "/foo", factory, log.NewNopLogger())
36 if err != nil {
37 t.Fatal(err)
38 }
39 defer s.Stop()
40
41 if _, err := s.Endpoints(); err != nil {
42 t.Fatal(err)
43 }
44 }
45
46 func TestBadFactory(t *testing.T) {
47 factory := func(string) (endpoint.Endpoint, io.Closer, error) {
48 return nil, nil, errors.New("kaboom")
49 }
50
51 client := &fakeClient{
52 responses: map[string]*stdetcd.Response{"/foo": fakeResponse},
53 }
54
55 s, err := NewSubscriber(client, "/foo", factory, log.NewNopLogger())
56 if err != nil {
57 t.Fatal(err)
58 }
59 defer s.Stop()
60
61 endpoints, err := s.Endpoints()
62 if err != nil {
63 t.Fatal(err)
64 }
65
66 if want, have := 0, len(endpoints); want != have {
67 t.Errorf("want %d, have %d", want, have)
68 }
69 }
70
71 type fakeClient struct {
72 responses map[string]*stdetcd.Response
73 }
74
75 func (c *fakeClient) GetEntries(prefix string) ([]string, error) {
76 response, ok := c.responses[prefix]
77 if !ok {
78 return nil, errors.New("key not exist")
79 }
80
81 entries := make([]string, len(response.Node.Nodes))
82 for i, node := range response.Node.Nodes {
83 entries[i] = node.Value
84 }
85 return entries, nil
86 }
87
88 func (c *fakeClient) WatchPrefix(prefix string, ch chan struct{}) {}
89
90 func (c *fakeClient) Register(Service) error {
91 return nil
92 }
93 func (c *fakeClient) Deregister(Service) error {
94 return nil
95 }
0 // Package eureka provides subscriber and registrar implementations for Netflix OSS's Eureka
0 // Package eureka provides Instancer and Registrar implementations for Netflix OSS's Eureka
11 package eureka
0 package eureka
1
2 import (
3 "fmt"
4
5 "github.com/hudl/fargo"
6
7 "github.com/go-kit/kit/log"
8 "github.com/go-kit/kit/sd"
9 "github.com/go-kit/kit/sd/internal/instance"
10 )
11
12 // Instancer yields instances stored in the Eureka registry for the given app.
13 // Changes in that app are watched and will update the subscribers.
14 type Instancer struct {
15 instance.Cache
16 conn fargoConnection
17 app string
18 logger log.Logger
19 quitc chan chan struct{}
20 }
21
22 // NewInstancer returns a Eureka Instancer. It will start watching the given
23 // app string for changes, and update the subscribers accordingly.
24 func NewInstancer(conn fargoConnection, app string, logger log.Logger) *Instancer {
25 logger = log.With(logger, "app", app)
26
27 s := &Instancer{
28 Cache: *instance.NewCache(),
29 conn: conn,
30 app: app,
31 logger: logger,
32 quitc: make(chan chan struct{}),
33 }
34
35 instances, err := s.getInstances()
36 if err == nil {
37 s.logger.Log("instances", len(instances))
38 } else {
39 s.logger.Log("during", "getInstances", "err", err)
40 }
41
42 s.Update(sd.Event{Instances: instances, Err: err})
43 go s.loop()
44 return s
45 }
46
47 // Stop terminates the Instancer.
48 func (s *Instancer) Stop() {
49 q := make(chan struct{})
50 s.quitc <- q
51 <-q
52 s.quitc = nil
53 }
54
55 func (s *Instancer) loop() {
56 var (
57 await = false
58 done = make(chan struct{})
59 updatec = s.conn.ScheduleAppUpdates(s.app, await, done)
60 )
61 defer close(done)
62
63 for {
64 select {
65 case update := <-updatec:
66 if update.Err != nil {
67 s.logger.Log("during", "Update", "err", update.Err)
68 s.Update(sd.Event{Err: update.Err})
69 continue
70 }
71 instances := convertFargoAppToInstances(update.App)
72 s.logger.Log("instances", len(instances))
73 s.Update(sd.Event{Instances: instances})
74
75 case q := <-s.quitc:
76 close(q)
77 return
78 }
79 }
80 }
81
82 func (s *Instancer) getInstances() ([]string, error) {
83 app, err := s.conn.GetApp(s.app)
84 if err != nil {
85 return nil, err
86 }
87 return convertFargoAppToInstances(app), nil
88 }
89
90 func convertFargoAppToInstances(app *fargo.Application) []string {
91 instances := make([]string, len(app.Instances))
92 for i, inst := range app.Instances {
93 instances[i] = fmt.Sprintf("%s:%d", inst.IPAddr, inst.Port)
94 }
95 return instances
96 }
0 package eureka
1
2 import (
3 "io"
4 "testing"
5 "time"
6
7 "github.com/hudl/fargo"
8
9 "github.com/go-kit/kit/endpoint"
10 "github.com/go-kit/kit/sd"
11 )
12
13 var _ sd.Instancer = &Instancer{} // API check
14
15 func TestInstancer(t *testing.T) {
16 factory := func(string) (endpoint.Endpoint, io.Closer, error) {
17 return endpoint.Nop, nil, nil
18 }
19
20 connection := &testConnection{
21 instances: []*fargo.Instance{instanceTest1},
22 application: appUpdateTest,
23 errApplication: nil,
24 }
25
26 instancer := NewInstancer(connection, appNameTest, loggerTest)
27 defer instancer.Stop()
28 endpointer := sd.NewEndpointer(instancer, factory, loggerTest)
29
30 endpoints, err := endpointer.Endpoints()
31 if err != nil {
32 t.Fatal(err)
33 }
34
35 if want, have := 1, len(endpoints); want != have {
36 t.Errorf("want %d, have %d", want, have)
37 }
38 }
39
40 func TestInstancerScheduleUpdates(t *testing.T) {
41 factory := func(string) (endpoint.Endpoint, io.Closer, error) {
42 return endpoint.Nop, nil, nil
43 }
44
45 connection := &testConnection{
46 instances: []*fargo.Instance{instanceTest1},
47 application: appUpdateTest,
48 errApplication: nil,
49 }
50
51 instancer := NewInstancer(connection, appNameTest, loggerTest)
52 defer instancer.Stop()
53 endpointer := sd.NewEndpointer(instancer, factory, loggerTest)
54
55 endpoints, _ := endpointer.Endpoints()
56 if want, have := 1, len(endpoints); want != have {
57 t.Errorf("want %d, have %d", want, have)
58 }
59
60 time.Sleep(50 * time.Millisecond)
61
62 endpoints, _ = endpointer.Endpoints()
63 if want, have := 2, len(endpoints); want != have {
64 t.Errorf("want %v, have %v", want, have)
65 }
66 }
67
68 func TestBadFactory(t *testing.T) {
69 factory := func(string) (endpoint.Endpoint, io.Closer, error) {
70 return nil, nil, errTest
71 }
72
73 connection := &testConnection{
74 instances: []*fargo.Instance{instanceTest1},
75 application: appUpdateTest,
76 errApplication: nil,
77 }
78
79 instancer := NewInstancer(connection, appNameTest, loggerTest)
80 defer instancer.Stop()
81 endpointer := sd.NewEndpointer(instancer, factory, loggerTest)
82
83 endpoints, err := endpointer.Endpoints()
84 if err != nil {
85 t.Fatal(err)
86 }
87
88 if want, have := 0, len(endpoints); want != have {
89 t.Errorf("want %d, have %d", want, have)
90 }
91 }
92
93 func TestBadInstancerInstances(t *testing.T) {
94 factory := func(string) (endpoint.Endpoint, io.Closer, error) {
95 return endpoint.Nop, nil, nil
96 }
97
98 connection := &testConnection{
99 instances: []*fargo.Instance{},
100 errInstances: errTest,
101 application: appUpdateTest,
102 errApplication: nil,
103 }
104
105 instancer := NewInstancer(connection, appNameTest, loggerTest)
106 defer instancer.Stop()
107 endpointer := sd.NewEndpointer(instancer, factory, loggerTest)
108
109 endpoints, err := endpointer.Endpoints()
110 if err != nil {
111 t.Fatal(err)
112 }
113
114 if want, have := 0, len(endpoints); want != have {
115 t.Errorf("want %d, have %d", want, have)
116 }
117 }
118
119 func TestBadInstancerScheduleUpdates(t *testing.T) {
120 factory := func(string) (endpoint.Endpoint, io.Closer, error) {
121 return endpoint.Nop, nil, nil
122 }
123
124 connection := &testConnection{
125 instances: []*fargo.Instance{instanceTest1},
126 application: appUpdateTest,
127 errApplication: errTest,
128 }
129
130 instancer := NewInstancer(connection, appNameTest, loggerTest)
131 defer instancer.Stop()
132 endpointer := sd.NewEndpointer(instancer, factory, loggerTest)
133
134 endpoints, err := endpointer.Endpoints()
135 if err != nil {
136 t.Error(err)
137 }
138 if want, have := 1, len(endpoints); want != have {
139 t.Errorf("want %d, have %d", want, have)
140 }
141
142 time.Sleep(50 * time.Millisecond)
143
144 endpoints, err = endpointer.Endpoints()
145 if err != nil {
146 t.Error(err)
147 }
148 if want, have := 1, len(endpoints); want != have {
149 t.Errorf("want %v, have %v", want, have)
150 }
151 }
1111
1212 "github.com/go-kit/kit/endpoint"
1313 "github.com/go-kit/kit/log"
14 "github.com/go-kit/kit/sd"
1415 )
1516
1617 // Package sd/eureka provides a wrapper around the Netflix Eureka service
5354 t.Logf("factory invoked for %q", instance)
5455 return endpoint.Nop, nil, nil
5556 }
56 s := NewSubscriber(
57 instancer := NewInstancer(
5758 &fargoConnection,
5859 appNameTest,
59 factory,
60 log.With(logger, "component", "subscriber"),
60 log.With(logger, "component", "instancer"),
6161 )
62 defer s.Stop()
62 defer instancer.Stop()
63 endpointer := sd.NewEndpointer(instancer, factory, log.With(logger, "component", "endpointer"))
6364
6465 // We should have one endpoint immediately after subscriber instantiation.
65 endpoints, err := s.Endpoints()
66 endpoints, err := endpointer.Endpoints()
6667 if err != nil {
6768 t.Error(err)
6869 }
8081 time.Sleep(2 * time.Second)
8182
8283 // Now we should have two endpoints.
83 endpoints, err = s.Endpoints()
84 endpoints, err = endpointer.Endpoints()
8485 if err != nil {
8586 t.Error(err)
8687 }
9596 time.Sleep(2 * time.Second)
9697
9798 // And then there was one.
98 endpoints, err = s.Endpoints()
99 endpoints, err = endpointer.Endpoints()
99100 if err != nil {
100101 t.Error(err)
101102 }
+0
-106
sd/eureka/subscriber.go less more
0 package eureka
1
2 import (
3 "fmt"
4
5 "github.com/hudl/fargo"
6
7 "github.com/go-kit/kit/endpoint"
8 "github.com/go-kit/kit/log"
9 "github.com/go-kit/kit/sd"
10 "github.com/go-kit/kit/sd/cache"
11 )
12
13 // Subscriber yields endpoints stored in the Eureka registry for the given app.
14 // Changes in that app are watched and will update the Subscriber endpoints.
15 type Subscriber struct {
16 conn fargoConnection
17 app string
18 factory sd.Factory
19 logger log.Logger
20 cache *cache.Cache
21 quitc chan chan struct{}
22 }
23
24 var _ sd.Subscriber = (*Subscriber)(nil)
25
26 // NewSubscriber returns a Eureka subscriber. It will start watching the given
27 // app string for changes, and update the endpoints accordingly.
28 func NewSubscriber(conn fargoConnection, app string, factory sd.Factory, logger log.Logger) *Subscriber {
29 logger = log.With(logger, "app", app)
30
31 s := &Subscriber{
32 conn: conn,
33 app: app,
34 factory: factory,
35 logger: logger,
36 cache: cache.New(factory, logger),
37 quitc: make(chan chan struct{}),
38 }
39
40 instances, err := s.getInstances()
41 if err == nil {
42 s.logger.Log("instances", len(instances))
43 } else {
44 s.logger.Log("during", "getInstances", "err", err)
45 }
46
47 s.cache.Update(instances)
48 go s.loop()
49 return s
50 }
51
52 // Endpoints implements the Subscriber interface.
53 func (s *Subscriber) Endpoints() ([]endpoint.Endpoint, error) {
54 return s.cache.Endpoints(), nil
55 }
56
57 // Stop terminates the subscriber.
58 func (s *Subscriber) Stop() {
59 q := make(chan struct{})
60 s.quitc <- q
61 <-q
62 s.quitc = nil
63 }
64
65 func (s *Subscriber) loop() {
66 var (
67 await = false
68 done = make(chan struct{})
69 updatec = s.conn.ScheduleAppUpdates(s.app, await, done)
70 )
71 defer close(done)
72
73 for {
74 select {
75 case update := <-updatec:
76 if update.Err != nil {
77 s.logger.Log("during", "Update", "err", update.Err)
78 continue
79 }
80 instances := convertFargoAppToInstances(update.App)
81 s.logger.Log("instances", len(instances))
82 s.cache.Update(instances)
83
84 case q := <-s.quitc:
85 close(q)
86 return
87 }
88 }
89 }
90
91 func (s *Subscriber) getInstances() ([]string, error) {
92 app, err := s.conn.GetApp(s.app)
93 if err != nil {
94 return nil, err
95 }
96 return convertFargoAppToInstances(app), nil
97 }
98
99 func convertFargoAppToInstances(app *fargo.Application) []string {
100 instances := make([]string, len(app.Instances))
101 for i, inst := range app.Instances {
102 instances[i] = fmt.Sprintf("%s:%d", inst.IPAddr, inst.Port)
103 }
104 return instances
105 }
+0
-144
sd/eureka/subscriber_test.go less more
0 package eureka
1
2 import (
3 "io"
4 "testing"
5 "time"
6
7 "github.com/hudl/fargo"
8
9 "github.com/go-kit/kit/endpoint"
10 )
11
12 func TestSubscriber(t *testing.T) {
13 factory := func(string) (endpoint.Endpoint, io.Closer, error) {
14 return endpoint.Nop, nil, nil
15 }
16
17 connection := &testConnection{
18 instances: []*fargo.Instance{instanceTest1},
19 application: appUpdateTest,
20 errApplication: nil,
21 }
22
23 subscriber := NewSubscriber(connection, appNameTest, factory, loggerTest)
24 defer subscriber.Stop()
25
26 endpoints, err := subscriber.Endpoints()
27 if err != nil {
28 t.Fatal(err)
29 }
30
31 if want, have := 1, len(endpoints); want != have {
32 t.Errorf("want %d, have %d", want, have)
33 }
34 }
35
36 func TestSubscriberScheduleUpdates(t *testing.T) {
37 factory := func(string) (endpoint.Endpoint, io.Closer, error) {
38 return endpoint.Nop, nil, nil
39 }
40
41 connection := &testConnection{
42 instances: []*fargo.Instance{instanceTest1},
43 application: appUpdateTest,
44 errApplication: nil,
45 }
46
47 subscriber := NewSubscriber(connection, appNameTest, factory, loggerTest)
48 defer subscriber.Stop()
49
50 endpoints, _ := subscriber.Endpoints()
51 if want, have := 1, len(endpoints); want != have {
52 t.Errorf("want %d, have %d", want, have)
53 }
54
55 time.Sleep(50 * time.Millisecond)
56
57 endpoints, _ = subscriber.Endpoints()
58 if want, have := 2, len(endpoints); want != have {
59 t.Errorf("want %v, have %v", want, have)
60 }
61 }
62
63 func TestBadFactory(t *testing.T) {
64 factory := func(string) (endpoint.Endpoint, io.Closer, error) {
65 return nil, nil, errTest
66 }
67
68 connection := &testConnection{
69 instances: []*fargo.Instance{instanceTest1},
70 application: appUpdateTest,
71 errApplication: nil,
72 }
73
74 subscriber := NewSubscriber(connection, appNameTest, factory, loggerTest)
75 defer subscriber.Stop()
76
77 endpoints, err := subscriber.Endpoints()
78 if err != nil {
79 t.Fatal(err)
80 }
81
82 if want, have := 0, len(endpoints); want != have {
83 t.Errorf("want %d, have %d", want, have)
84 }
85 }
86
87 func TestBadSubscriberInstances(t *testing.T) {
88 factory := func(string) (endpoint.Endpoint, io.Closer, error) {
89 return endpoint.Nop, nil, nil
90 }
91
92 connection := &testConnection{
93 instances: []*fargo.Instance{},
94 errInstances: errTest,
95 application: appUpdateTest,
96 errApplication: nil,
97 }
98
99 subscriber := NewSubscriber(connection, appNameTest, factory, loggerTest)
100 defer subscriber.Stop()
101
102 endpoints, err := subscriber.Endpoints()
103 if err != nil {
104 t.Fatal(err)
105 }
106
107 if want, have := 0, len(endpoints); want != have {
108 t.Errorf("want %d, have %d", want, have)
109 }
110 }
111
112 func TestBadSubscriberScheduleUpdates(t *testing.T) {
113 factory := func(string) (endpoint.Endpoint, io.Closer, error) {
114 return endpoint.Nop, nil, nil
115 }
116
117 connection := &testConnection{
118 instances: []*fargo.Instance{instanceTest1},
119 application: appUpdateTest,
120 errApplication: errTest,
121 }
122
123 subscriber := NewSubscriber(connection, appNameTest, factory, loggerTest)
124 defer subscriber.Stop()
125
126 endpoints, err := subscriber.Endpoints()
127 if err != nil {
128 t.Error(err)
129 }
130 if want, have := 1, len(endpoints); want != have {
131 t.Errorf("want %d, have %d", want, have)
132 }
133
134 time.Sleep(50 * time.Millisecond)
135
136 endpoints, err = subscriber.Endpoints()
137 if err != nil {
138 t.Error(err)
139 }
140 if want, have := 1, len(endpoints); want != have {
141 t.Errorf("want %v, have %v", want, have)
142 }
143 }
+0
-9
sd/fixed_subscriber.go less more
0 package sd
1
2 import "github.com/go-kit/kit/endpoint"
3
4 // FixedSubscriber yields a fixed set of services.
5 type FixedSubscriber []endpoint.Endpoint
6
7 // Endpoints implements Subscriber.
8 func (s FixedSubscriber) Endpoints() ([]endpoint.Endpoint, error) { return s, nil }
0 package sd
1
2 // Event represents a push notification generated from the underlying service discovery
3 // implementation. It contains either a full set of available resource instances, or
4 // an error indicating some issue with obtaining information from discovery backend.
5 // Examples of errors may include loosing connection to the discovery backend, or
6 // trying to look up resource instances using an incorrectly formatted key.
7 // After receiving an Event with an error the listenter should treat previously discovered
8 // resource instances as stale (although it may choose to continue using them).
9 // If the Instancer is able to restore connection to the discovery backend it must push
10 // another Event with the current set of resource instances.
11 type Event struct {
12 Instances []string
13 Err error
14 }
15
16 // Instancer listens to a service discovery system and notifies registered
17 // observers of changes in the resource instances. Every event sent to the channels
18 // contains a complete set of instances known to the Instancer. That complete set is
19 // sent immediately upon registering the channel, and on any future updates from
20 // discovery system.
21 type Instancer interface {
22 Register(chan<- Event)
23 Deregister(chan<- Event)
24 }
25
26 // FixedInstancer yields a fixed set of instances.
27 type FixedInstancer []string
28
29 // Register implements Instancer.
30 func (d FixedInstancer) Register(ch chan<- Event) { ch <- Event{Instances: d} }
31
32 // Deregister implements Instancer.
33 func (d FixedInstancer) Deregister(ch chan<- Event) {}
0 package instance
1
2 import (
3 "reflect"
4 "sort"
5 "sync"
6
7 "github.com/go-kit/kit/sd"
8 )
9
10 // Cache keeps track of resource instances provided to it via Update method
11 // and implements the Instancer interface
12 type Cache struct {
13 mtx sync.RWMutex
14 state sd.Event
15 reg registry
16 }
17
18 // NewCache creates a new Cache.
19 func NewCache() *Cache {
20 return &Cache{
21 reg: registry{},
22 }
23 }
24
25 // Update receives new instances from service discovery, stores them internally,
26 // and notifies all registered listeners.
27 func (c *Cache) Update(event sd.Event) {
28 c.mtx.Lock()
29 defer c.mtx.Unlock()
30
31 sort.Strings(event.Instances)
32 if reflect.DeepEqual(c.state, event) {
33 return // no need to broadcast the same instances
34 }
35
36 c.state = event
37 c.reg.broadcast(event)
38 }
39
40 // State returns the current state of discovery (instances or error) as sd.Event
41 func (c *Cache) State() sd.Event {
42 c.mtx.RLock()
43 defer c.mtx.RUnlock()
44 return c.state
45 }
46
47 // Register implements Instancer.
48 func (c *Cache) Register(ch chan<- sd.Event) {
49 c.mtx.Lock()
50 defer c.mtx.Unlock()
51 c.reg.register(ch)
52 // always push the current state to new channels
53 ch <- c.state
54 }
55
56 // Deregister implements Instancer.
57 func (c *Cache) Deregister(ch chan<- sd.Event) {
58 c.mtx.Lock()
59 defer c.mtx.Unlock()
60 c.reg.deregister(ch)
61 }
62
63 // registry is not goroutine-safe.
64 type registry map[chan<- sd.Event]struct{}
65
66 func (r registry) broadcast(event sd.Event) {
67 for c := range r {
68 c <- event
69 }
70 }
71
72 func (r registry) register(c chan<- sd.Event) {
73 r[c] = struct{}{}
74 }
75
76 func (r registry) deregister(c chan<- sd.Event) {
77 delete(r, c)
78 }
0 package instance
1
2 import (
3 "sync"
4 "testing"
5
6 "github.com/go-kit/kit/sd"
7 )
8
9 var _ sd.Instancer = &Cache{} // API check
10
11 func TestCache(t *testing.T) {
12 // TODO this test is not finished yet
13
14 c := NewCache()
15
16 {
17 state := c.State()
18 if want, have := 0, len(state.Instances); want != have {
19 t.Fatalf("want %v instances, have %v", want, have)
20 }
21 }
22
23 notification1 := sd.Event{Instances: []string{"x", "y"}}
24 notification2 := sd.Event{Instances: []string{"a", "b", "c"}}
25
26 c.Update(notification1)
27
28 // times 2 because we have two observers
29 expectedInstances := 2 * (len(notification1.Instances) + len(notification2.Instances))
30
31 wg := sync.WaitGroup{}
32 wg.Add(expectedInstances)
33
34 receiver := func(ch chan sd.Event) {
35 for state := range ch {
36 // count total number of instances received
37 for range state.Instances {
38 wg.Done()
39 }
40 }
41 }
42
43 f1 := make(chan sd.Event)
44 f2 := make(chan sd.Event)
45 go receiver(f1)
46 go receiver(f2)
47
48 c.Register(f1)
49 c.Register(f2)
50
51 c.Update(notification1)
52 c.Update(notification2)
53
54 // if state := c.State(); instances == nil {
55 // if want, have := len(notification2), len(instances); want != have {
56 // t.Errorf("want length %v, have %v", want, have)
57 // } else {
58 // for i := range notification2 {
59 // if want, have := notification2[i], instances[i]; want != have {
60 // t.Errorf("want instance %v, have %v", want, have)
61 // }
62 // }
63 // }
64 // }
65
66 close(f1)
67 close(f2)
68
69 wg.Wait()
70
71 // d.Deregister(f1)
72
73 // d.Unregister(f2)
74 // if want, have := 0, len(d.observers); want != have {
75 // t.Fatalf("want %v observers, have %v", want, have)
76 // }
77 }
77 )
88
99 // NewRandom returns a load balancer that selects services randomly.
10 func NewRandom(s sd.Subscriber, seed int64) Balancer {
10 func NewRandom(s sd.Endpointer, seed int64) Balancer {
1111 return &random{
1212 s: s,
1313 r: rand.New(rand.NewSource(seed)),
1515 }
1616
1717 type random struct {
18 s sd.Subscriber
18 s sd.Endpointer
1919 r *rand.Rand
2020 }
2121
2424 endpoints[i] = func(context.Context, interface{}) (interface{}, error) { counts[i0]++; return struct{}{}, nil }
2525 }
2626
27 subscriber := sd.FixedSubscriber(endpoints)
28 balancer := NewRandom(subscriber, seed)
27 endpointer := sd.FixedEndpointer(endpoints)
28 balancer := NewRandom(endpointer, seed)
2929
3030 for i := 0; i < iterations; i++ {
3131 endpoint, _ := balancer.Endpoint()
4141 }
4242
4343 func TestRandomNoEndpoints(t *testing.T) {
44 subscriber := sd.FixedSubscriber{}
45 balancer := NewRandom(subscriber, 1415926)
44 endpointer := sd.FixedEndpointer{}
45 balancer := NewRandom(endpointer, 1415926)
4646 _, err := balancer.Endpoint()
4747 if want, have := ErrNoEndpoints, err; want != have {
4848 t.Errorf("want %v, have %v", want, have)
1212
1313 func TestRetryMaxTotalFail(t *testing.T) {
1414 var (
15 endpoints = sd.FixedSubscriber{} // no endpoints
15 endpoints = sd.FixedEndpointer{} // no endpoints
1616 rr = lb.NewRoundRobin(endpoints)
1717 retry = lb.Retry(999, time.Second, rr) // lots of retries
1818 ctx = context.Background()
2929 func(context.Context, interface{}) (interface{}, error) { return nil, errors.New("error two") },
3030 func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil /* OK */ },
3131 }
32 subscriber = sd.FixedSubscriber{
32 endpointer = sd.FixedEndpointer{
3333 0: endpoints[0],
3434 1: endpoints[1],
3535 2: endpoints[2],
3636 }
3737 retries = len(endpoints) - 1 // not quite enough retries
38 rr = lb.NewRoundRobin(subscriber)
38 rr = lb.NewRoundRobin(endpointer)
3939 ctx = context.Background()
4040 )
4141 if _, err := lb.Retry(retries, time.Second, rr)(ctx, struct{}{}); err == nil {
5050 func(context.Context, interface{}) (interface{}, error) { return nil, errors.New("error two") },
5151 func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil /* OK */ },
5252 }
53 subscriber = sd.FixedSubscriber{
53 endpointer = sd.FixedEndpointer{
5454 0: endpoints[0],
5555 1: endpoints[1],
5656 2: endpoints[2],
5757 }
5858 retries = len(endpoints) // exactly enough retries
59 rr = lb.NewRoundRobin(subscriber)
59 rr = lb.NewRoundRobin(endpointer)
6060 ctx = context.Background()
6161 )
6262 if _, err := lb.Retry(retries, time.Second, rr)(ctx, struct{}{}); err != nil {
6969 step = make(chan struct{})
7070 e = func(context.Context, interface{}) (interface{}, error) { <-step; return struct{}{}, nil }
7171 timeout = time.Millisecond
72 retry = lb.Retry(999, timeout, lb.NewRoundRobin(sd.FixedSubscriber{0: e}))
72 retry = lb.Retry(999, timeout, lb.NewRoundRobin(sd.FixedEndpointer{0: e}))
7373 errs = make(chan error, 1)
7474 invoke = func() { _, err := retry(context.Background(), struct{}{}); errs <- err }
7575 )
9191 var (
9292 myErr = errors.New("aborting early")
9393 cb = func(int, error) (bool, error) { return false, myErr }
94 endpoints = sd.FixedSubscriber{} // no endpoints
94 endpoints = sd.FixedEndpointer{} // no endpoints
9595 rr = lb.NewRoundRobin(endpoints)
9696 retry = lb.RetryWithCallback(time.Second, rr, cb) // lots of retries
9797 ctx = context.Background()
114114 endpoint = func(ctx context.Context, request interface{}) (interface{}, error) {
115115 return nil, myErr
116116 }
117 endpoints = sd.FixedSubscriber{endpoint} // no endpoints
117 endpoints = sd.FixedEndpointer{endpoint} // no endpoints
118118 rr = lb.NewRoundRobin(endpoints)
119119 retry = lb.RetryWithCallback(time.Second, rr, cb) // lots of retries
120120 ctx = context.Background()
127127
128128 func TestHandleNilCallback(t *testing.T) {
129129 var (
130 subscriber = sd.FixedSubscriber{
130 endpointer = sd.FixedEndpointer{
131131 func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil /* OK */ },
132132 }
133 rr = lb.NewRoundRobin(subscriber)
133 rr = lb.NewRoundRobin(endpointer)
134134 ctx = context.Background()
135135 )
136136 retry := lb.RetryWithCallback(time.Second, rr, nil)
77 )
88
99 // NewRoundRobin returns a load balancer that returns services in sequence.
10 func NewRoundRobin(s sd.Subscriber) Balancer {
10 func NewRoundRobin(s sd.Endpointer) Balancer {
1111 return &roundRobin{
1212 s: s,
1313 c: 0,
1515 }
1616
1717 type roundRobin struct {
18 s sd.Subscriber
18 s sd.Endpointer
1919 c uint64
2020 }
2121
2121 }
2222 )
2323
24 subscriber := sd.FixedSubscriber(endpoints)
25 balancer := NewRoundRobin(subscriber)
24 endpointer := sd.FixedEndpointer(endpoints)
25 balancer := NewRoundRobin(endpointer)
2626
2727 for i, want := range [][]int{
2828 {1, 0, 0},
4545 }
4646
4747 func TestRoundRobinNoEndpoints(t *testing.T) {
48 subscriber := sd.FixedSubscriber{}
49 balancer := NewRoundRobin(subscriber)
48 endpointer := sd.FixedEndpointer{}
49 balancer := NewRoundRobin(endpointer)
5050 _, err := balancer.Endpoint()
5151 if want, have := ErrNoEndpoints, err; want != have {
5252 t.Errorf("want %v, have %v", want, have)
5454 }
5555
5656 func TestRoundRobinNoRace(t *testing.T) {
57 balancer := NewRoundRobin(sd.FixedSubscriber([]endpoint.Endpoint{
57 balancer := NewRoundRobin(sd.FixedEndpointer([]endpoint.Endpoint{
5858 endpoint.Nop,
5959 endpoint.Nop,
6060 endpoint.Nop,
+0
-11
sd/subscriber.go less more
0 package sd
1
2 import "github.com/go-kit/kit/endpoint"
3
4 // Subscriber listens to a service discovery system and yields a set of
5 // identical endpoints on demand. An error indicates a problem with connectivity
6 // to the service discovery system, or within the system itself; a subscriber
7 // may yield no endpoints without error.
8 type Subscriber interface {
9 Endpoints() ([]endpoint.Endpoint, error)
10 }
106106 t.Fatal("expected new Client, got nil")
107107 }
108108
109 s, err := NewSubscriber(c, "/validpath", newFactory(""), log.NewNopLogger())
109 s, err := NewInstancer(c, "/validpath", log.NewNopLogger())
110110 if err != stdzk.ErrNoServer {
111111 t.Errorf("unexpected error: %v", err)
112112 }
113113 if s != nil {
114 t.Error("expected failed new Subscriber")
114 t.Error("expected failed new Instancer")
115115 }
116116
117 s, err = NewSubscriber(c, "invalidpath", newFactory(""), log.NewNopLogger())
117 s, err = NewInstancer(c, "invalidpath", log.NewNopLogger())
118118 if err != stdzk.ErrInvalidPath {
119119 t.Errorf("unexpected error: %v", err)
120120 }
130130 t.Errorf("unexpected error: %v", err)
131131 }
132132
133 s, err = NewSubscriber(c, "/validpath", newFactory(""), log.NewNopLogger())
133 s, err = NewInstancer(c, "/validpath", log.NewNopLogger())
134134 if err != ErrClientClosed {
135135 t.Errorf("unexpected error: %v", err)
136136 }
137137 if s != nil {
138 t.Error("expected failed new Subscriber")
138 t.Error("expected failed new Instancer")
139139 }
140140
141141 c, err = NewClient([]string{"localhost:65500"}, log.NewNopLogger(), Payload(payload))
146146 t.Fatal("expected new Client, got nil")
147147 }
148148
149 s, err = NewSubscriber(c, "/validpath", newFactory(""), log.NewNopLogger())
149 s, err = NewInstancer(c, "/validpath", log.NewNopLogger())
150150 if err != stdzk.ErrNoServer {
151151 t.Errorf("unexpected error: %v", err)
152152 }
153153 if s != nil {
154 t.Error("expected failed new Subscriber")
154 t.Error("expected failed new Instancer")
155155 }
156156 }
0 // Package zk provides subscriber and registrar implementations for ZooKeeper.
0 // Package zk provides Instancer and Registrar implementations for ZooKeeper.
11 package zk
0 package zk
1
2 import (
3 "github.com/samuel/go-zookeeper/zk"
4
5 "github.com/go-kit/kit/log"
6 "github.com/go-kit/kit/sd"
7 "github.com/go-kit/kit/sd/internal/instance"
8 )
9
10 // Instancer yield instances stored in a certain ZooKeeper path. Any kind of
11 // change in that path is watched and will update the subscribers.
12 type Instancer struct {
13 instance.Cache
14 client Client
15 path string
16 logger log.Logger
17 quitc chan struct{}
18 }
19
20 // NewInstancer returns a ZooKeeper Instancer. ZooKeeper will start watching
21 // the given path for changes and update the Instancer endpoints.
22 func NewInstancer(c Client, path string, logger log.Logger) (*Instancer, error) {
23 s := &Instancer{
24 Cache: *instance.NewCache(),
25 client: c,
26 path: path,
27 logger: logger,
28 quitc: make(chan struct{}),
29 }
30
31 err := s.client.CreateParentNodes(s.path)
32 if err != nil {
33 return nil, err
34 }
35
36 instances, eventc, err := s.client.GetEntries(s.path)
37 if err != nil {
38 logger.Log("path", s.path, "msg", "failed to retrieve entries", "err", err)
39 // TODO why zk constructor exits when other implementations continue?
40 return nil, err
41 }
42 logger.Log("path", s.path, "instances", len(instances))
43 s.Update(sd.Event{Instances: instances})
44
45 go s.loop(eventc)
46
47 return s, nil
48 }
49
50 func (s *Instancer) loop(eventc <-chan zk.Event) {
51 var (
52 instances []string
53 err error
54 )
55 for {
56 select {
57 case <-eventc:
58 // We received a path update notification. Call GetEntries to
59 // retrieve child node data, and set a new watch, as ZK watches are
60 // one-time triggers.
61 instances, eventc, err = s.client.GetEntries(s.path)
62 if err != nil {
63 s.logger.Log("path", s.path, "msg", "failed to retrieve entries", "err", err)
64 s.Update(sd.Event{Err: err})
65 continue
66 }
67 s.logger.Log("path", s.path, "instances", len(instances))
68 s.Update(sd.Event{Instances: instances})
69
70 case <-s.quitc:
71 return
72 }
73 }
74 }
75
76 // Stop terminates the Instancer.
77 func (s *Instancer) Stop() {
78 close(s.quitc)
79 }
0 package zk
1
2 import (
3 "testing"
4 "time"
5
6 "github.com/go-kit/kit/sd"
7 )
8
9 var _ sd.Instancer = &Instancer{}
10
11 func TestInstancer(t *testing.T) {
12 client := newFakeClient()
13
14 instancer, err := NewInstancer(client, path, logger)
15 if err != nil {
16 t.Fatalf("failed to create new Instancer: %v", err)
17 }
18 defer instancer.Stop()
19 endpointer := sd.NewEndpointer(instancer, newFactory(""), logger)
20
21 if _, err := endpointer.Endpoints(); err != nil {
22 t.Fatal(err)
23 }
24 }
25
26 func TestBadFactory(t *testing.T) {
27 client := newFakeClient()
28
29 instancer, err := NewInstancer(client, path, logger)
30 if err != nil {
31 t.Fatalf("failed to create new Instancer: %v", err)
32 }
33 defer instancer.Stop()
34 endpointer := sd.NewEndpointer(instancer, newFactory("kaboom"), logger)
35
36 // instance1 came online
37 client.AddService(path+"/instance1", "kaboom")
38
39 // instance2 came online
40 client.AddService(path+"/instance2", "zookeeper_node_data")
41
42 if err = asyncTest(100*time.Millisecond, 1, endpointer); err != nil {
43 t.Error(err)
44 }
45 }
46
47 func TestServiceUpdate(t *testing.T) {
48 client := newFakeClient()
49
50 instancer, err := NewInstancer(client, path, logger)
51 if err != nil {
52 t.Fatalf("failed to create new Instancer: %v", err)
53 }
54 defer instancer.Stop()
55 endpointer := sd.NewEndpointer(instancer, newFactory(""), logger)
56
57 endpoints, err := endpointer.Endpoints()
58 if err != nil {
59 t.Fatal(err)
60 }
61 if want, have := 0, len(endpoints); want != have {
62 t.Errorf("want %d, have %d", want, have)
63 }
64
65 // instance1 came online
66 client.AddService(path+"/instance1", "zookeeper_node_data1")
67
68 // instance2 came online
69 client.AddService(path+"/instance2", "zookeeper_node_data2")
70
71 // we should have 2 instances
72 if err = asyncTest(100*time.Millisecond, 2, endpointer); err != nil {
73 t.Error(err)
74 }
75
76 // TODO(pb): this bit is flaky
77 //
78 //// watch triggers an error...
79 //client.SendErrorOnWatch()
80 //
81 //// test if error was consumed
82 //if err = client.ErrorIsConsumedWithin(100 * time.Millisecond); err != nil {
83 // t.Error(err)
84 //}
85
86 // instance3 came online
87 client.AddService(path+"/instance3", "zookeeper_node_data3")
88
89 // we should have 3 instances
90 if err = asyncTest(100*time.Millisecond, 3, endpointer); err != nil {
91 t.Error(err)
92 }
93
94 // instance1 goes offline
95 client.RemoveService(path + "/instance1")
96
97 // instance2 goes offline
98 client.RemoveService(path + "/instance2")
99
100 // we should have 1 instance
101 if err = asyncTest(100*time.Millisecond, 1, endpointer); err != nil {
102 t.Error(err)
103 }
104 }
105
106 func TestBadInstancerCreate(t *testing.T) {
107 client := newFakeClient()
108 client.SendErrorOnWatch()
109
110 instancer, err := NewInstancer(client, path, logger)
111 if err == nil {
112 t.Error("expected error on new Instancer")
113 }
114 if instancer != nil {
115 t.Error("expected Instancer not to be created")
116 }
117 instancer, err = NewInstancer(client, "BadPath", logger)
118 if err == nil {
119 t.Error("expected error on new Instancer")
120 }
121 if instancer != nil {
122 t.Error("expected Instancer not to be created")
123 }
124 }
+0
-86
sd/zk/subscriber.go less more
0 package zk
1
2 import (
3 "github.com/samuel/go-zookeeper/zk"
4
5 "github.com/go-kit/kit/endpoint"
6 "github.com/go-kit/kit/log"
7 "github.com/go-kit/kit/sd"
8 "github.com/go-kit/kit/sd/cache"
9 )
10
11 // Subscriber yield endpoints stored in a certain ZooKeeper path. Any kind of
12 // change in that path is watched and will update the Subscriber endpoints.
13 type Subscriber struct {
14 client Client
15 path string
16 cache *cache.Cache
17 logger log.Logger
18 quitc chan struct{}
19 }
20
21 var _ sd.Subscriber = &Subscriber{}
22
23 // NewSubscriber returns a ZooKeeper subscriber. ZooKeeper will start watching
24 // the given path for changes and update the Subscriber endpoints.
25 func NewSubscriber(c Client, path string, factory sd.Factory, logger log.Logger) (*Subscriber, error) {
26 s := &Subscriber{
27 client: c,
28 path: path,
29 cache: cache.New(factory, logger),
30 logger: logger,
31 quitc: make(chan struct{}),
32 }
33
34 err := s.client.CreateParentNodes(s.path)
35 if err != nil {
36 return nil, err
37 }
38
39 instances, eventc, err := s.client.GetEntries(s.path)
40 if err != nil {
41 logger.Log("path", s.path, "msg", "failed to retrieve entries", "err", err)
42 return nil, err
43 }
44 logger.Log("path", s.path, "instances", len(instances))
45 s.cache.Update(instances)
46
47 go s.loop(eventc)
48
49 return s, nil
50 }
51
52 func (s *Subscriber) loop(eventc <-chan zk.Event) {
53 var (
54 instances []string
55 err error
56 )
57 for {
58 select {
59 case <-eventc:
60 // We received a path update notification. Call GetEntries to
61 // retrieve child node data, and set a new watch, as ZK watches are
62 // one-time triggers.
63 instances, eventc, err = s.client.GetEntries(s.path)
64 if err != nil {
65 s.logger.Log("path", s.path, "msg", "failed to retrieve entries", "err", err)
66 continue
67 }
68 s.logger.Log("path", s.path, "instances", len(instances))
69 s.cache.Update(instances)
70
71 case <-s.quitc:
72 return
73 }
74 }
75 }
76
77 // Endpoints implements the Subscriber interface.
78 func (s *Subscriber) Endpoints() ([]endpoint.Endpoint, error) {
79 return s.cache.Endpoints(), nil
80 }
81
82 // Stop terminates the Subscriber.
83 func (s *Subscriber) Stop() {
84 close(s.quitc)
85 }
+0
-117
sd/zk/subscriber_test.go less more
0 package zk
1
2 import (
3 "testing"
4 "time"
5 )
6
7 func TestSubscriber(t *testing.T) {
8 client := newFakeClient()
9
10 s, err := NewSubscriber(client, path, newFactory(""), logger)
11 if err != nil {
12 t.Fatalf("failed to create new Subscriber: %v", err)
13 }
14 defer s.Stop()
15
16 if _, err := s.Endpoints(); err != nil {
17 t.Fatal(err)
18 }
19 }
20
21 func TestBadFactory(t *testing.T) {
22 client := newFakeClient()
23
24 s, err := NewSubscriber(client, path, newFactory("kaboom"), logger)
25 if err != nil {
26 t.Fatalf("failed to create new Subscriber: %v", err)
27 }
28 defer s.Stop()
29
30 // instance1 came online
31 client.AddService(path+"/instance1", "kaboom")
32
33 // instance2 came online
34 client.AddService(path+"/instance2", "zookeeper_node_data")
35
36 if err = asyncTest(100*time.Millisecond, 1, s); err != nil {
37 t.Error(err)
38 }
39 }
40
41 func TestServiceUpdate(t *testing.T) {
42 client := newFakeClient()
43
44 s, err := NewSubscriber(client, path, newFactory(""), logger)
45 if err != nil {
46 t.Fatalf("failed to create new Subscriber: %v", err)
47 }
48 defer s.Stop()
49
50 endpoints, err := s.Endpoints()
51 if err != nil {
52 t.Fatal(err)
53 }
54 if want, have := 0, len(endpoints); want != have {
55 t.Errorf("want %d, have %d", want, have)
56 }
57
58 // instance1 came online
59 client.AddService(path+"/instance1", "zookeeper_node_data1")
60
61 // instance2 came online
62 client.AddService(path+"/instance2", "zookeeper_node_data2")
63
64 // we should have 2 instances
65 if err = asyncTest(100*time.Millisecond, 2, s); err != nil {
66 t.Error(err)
67 }
68
69 // TODO(pb): this bit is flaky
70 //
71 //// watch triggers an error...
72 //client.SendErrorOnWatch()
73 //
74 //// test if error was consumed
75 //if err = client.ErrorIsConsumedWithin(100 * time.Millisecond); err != nil {
76 // t.Error(err)
77 //}
78
79 // instance3 came online
80 client.AddService(path+"/instance3", "zookeeper_node_data3")
81
82 // we should have 3 instances
83 if err = asyncTest(100*time.Millisecond, 3, s); err != nil {
84 t.Error(err)
85 }
86
87 // instance1 goes offline
88 client.RemoveService(path + "/instance1")
89
90 // instance2 goes offline
91 client.RemoveService(path + "/instance2")
92
93 // we should have 1 instance
94 if err = asyncTest(100*time.Millisecond, 1, s); err != nil {
95 t.Error(err)
96 }
97 }
98
99 func TestBadSubscriberCreate(t *testing.T) {
100 client := newFakeClient()
101 client.SendErrorOnWatch()
102 s, err := NewSubscriber(client, path, newFactory(""), logger)
103 if err == nil {
104 t.Error("expected error on new Subscriber")
105 }
106 if s != nil {
107 t.Error("expected Subscriber not to be created")
108 }
109 s, err = NewSubscriber(client, "BadPath", newFactory(""), logger)
110 if err == nil {
111 t.Error("expected error on new Subscriber")
112 }
113 if s != nil {
114 t.Error("expected Subscriber not to be created")
115 }
116 }
113113 }
114114 }
115115
116 func asyncTest(timeout time.Duration, want int, s *Subscriber) (err error) {
116 func asyncTest(timeout time.Duration, want int, s sd.Endpointer) (err error) {
117117 var endpoints []endpoint.Endpoint
118118 have := -1 // want can never be <0
119119 t := time.After(timeout)