Package list golang-github-go-kit-kit / 9225fe9
Address review comments and add more tests Yuri Shkuro 4 years ago
13 changed file(s) with 216 addition(s) and 91 deletion(s). Raw diff Collapse all Expand all
4242 instancer = consul.NewInstancer(sdclient, logger, consulService, consulTags, passingOnly)
4343 endpoints profilesvc.Endpoints
4444 )
45 // TODO: thought experiment
46 mapping := []struct {
47 factory func(s profilesvc.Service) endpoint.Endpoint
48 endpoint *endpoint.Endpoint
49 }{
50 {
51 factory: profilesvc.MakePostProfileEndpoint,
52 endpoint: &endpoints.PostProfileEndpoint,
53 },
54 {
55 factory: profilesvc.MakeGetProfileEndpoint,
56 endpoint: &endpoints.GetProfileEndpoint,
57 },
58 }
59 for _, m := range mapping {
60 factory := factoryFor(m.factory)
61 endpointer := sd.NewEndpointer(instancer, factory, logger)
62 balancer := lb.NewRoundRobin(endpointer)
63 retry := lb.Retry(retryMax, retryTimeout, balancer)
64 *m.endpoint = retry
65 }
66 // TODO: why not 2 lines per endpoint registration above instead of 7 lines per endpoint below?
6745 {
6846 factory := factoryFor(profilesvc.MakePostProfileEndpoint)
6947 endpointer := sd.NewEndpointer(instancer, factory, logger)
2121 endpoints []endpoint.Endpoint
2222 logger log.Logger
2323 invalidateDeadline time.Time
24 timeNow func() time.Time
2425 }
2526
2627 type endpointCloser struct {
3536 factory: factory,
3637 cache: map[string]endpointCloser{},
3738 logger: logger,
39 timeNow: time.Now,
3840 }
3941 }
4042
4648 c.mtx.Lock()
4749 defer c.mtx.Unlock()
4850
51 // Happy path.
4952 if event.Err == nil {
5053 c.updateCache(event.Instances)
51 c.invalidateDeadline = time.Time{}
5254 c.err = nil
53 }
54
55 c.logger.Log("err", event.Err)
56
57 if c.options.invalidateOnErrorTimeout == nil {
58 // keep returning the last known endpoints on error
5955 return
6056 }
6157
58 // Sad path. Something's gone wrong in sd.
59 c.logger.Log("err", event.Err)
60 if c.options.invalidateOnErrorTimeout == nil {
61 return // keep returning the last known endpoints on error
62 }
63 if c.err != nil {
64 return // already in the error state, do nothing & keep original error
65 }
6266 c.err = event.Err
63
64 if !c.invalidateDeadline.IsZero() {
65 // aleady in the error state, do nothing
66 return
67 }
6867 // set new deadline to invalidate Endpoints unless non-error Event is received
69 c.invalidateDeadline = time.Now().Add(*c.options.invalidateOnErrorTimeout)
68 c.invalidateDeadline = c.timeNow().Add(*c.options.invalidateOnErrorTimeout)
7069 return
7170 }
7271
120119 func (c *endpointCache) Endpoints() ([]endpoint.Endpoint, error) {
121120 c.mtx.RLock()
122121
123 if c.err == nil || time.Now().Before(c.invalidateDeadline) {
122 if c.err == nil || c.timeNow().Before(c.invalidateDeadline) {
124123 defer c.mtx.RUnlock()
125124 return c.endpoints, nil
126125 }
129128 c.mtx.Lock()
130129 defer c.mtx.Unlock()
131130
131 // Re-check due to a race between RUnlock() and Lock().
132 if c.err == nil || c.timeNow().Before(c.invalidateDeadline) {
133 return c.endpoints, nil
134 }
135
132136 c.updateCache(nil) // close any remaining active endpoints
133
134137 return nil, c.err
135138 }
4242 }
4343 assertEndpointsLen(t, cache, 2)
4444
45 // Error, should continue returning old endpoints
46 cache.Update(Event{Err: errors.New("sd error")})
47 select {
48 case <-ca:
49 t.Errorf("endpoint a closed, not good")
50 case <-cb:
51 t.Errorf("endpoint b closed, not good")
52 case <-time.After(time.Millisecond):
53 t.Logf("no closures yet, good")
54 }
55 assertEndpointsLen(t, cache, 2)
56
4557 // Delete b
4658 go cache.Update(Event{Instances: []string{"a"}})
4759 select {
6678 assertEndpointsLen(t, cache, 0)
6779 }
6880
81 func TestCacheErrorAndTimeout(t *testing.T) {
82 var (
83 ca = make(closer)
84 cb = make(closer)
85 c = map[string]io.Closer{"a": ca, "b": cb}
86 f = func(instance string) (endpoint.Endpoint, io.Closer, error) { return endpoint.Nop, c[instance], nil }
87 timeOut = 100 * time.Millisecond
88 cache = newEndpointCache(f, log.NewNopLogger(), endpointerOptions{invalidateOnErrorTimeout: &timeOut})
89 )
90
91 timeNow := time.Now()
92 cache.timeNow = func() time.Time { return timeNow }
93
94 // Populate
95 cache.Update(Event{Instances: []string{"a"}})
96 select {
97 case <-ca:
98 t.Errorf("endpoint a closed, not good")
99 case <-time.After(time.Millisecond):
100 t.Logf("no closures yet, good")
101 }
102 assertEndpointsLen(t, cache, 1)
103
104 // Send error, keep time still.
105 cache.Update(Event{Err: errors.New("sd error")})
106 select {
107 case <-ca:
108 t.Errorf("endpoint a closed, not good")
109 case <-time.After(time.Millisecond):
110 t.Logf("no closures yet, good")
111 }
112 assertEndpointsLen(t, cache, 1)
113
114 // Move the time, but less than the timeout
115 timeNow = timeNow.Add(timeOut / 2)
116 assertEndpointsLen(t, cache, 1)
117 select {
118 case <-ca:
119 t.Errorf("endpoint a closed, not good")
120 case <-time.After(time.Millisecond):
121 t.Logf("no closures yet, good")
122 }
123
124 // Move the time past the timeout
125 timeNow = timeNow.Add(timeOut)
126 assertEndpointsError(t, cache, "sd error")
127 select {
128 case <-ca:
129 t.Logf("endpoint a closed, good")
130 case <-time.After(time.Millisecond):
131 t.Errorf("didn't close the deleted instance in time")
132 }
133
134 // Send another error
135 cache.Update(Event{Err: errors.New("another sd error")})
136 assertEndpointsError(t, cache, "sd error") // expect original error
137 }
138
69139 func TestBadFactory(t *testing.T) {
70140 cache := newEndpointCache(func(string) (endpoint.Endpoint, io.Closer, error) {
71141 return nil, nil, errors.New("bad factory")
86156 }
87157 }
88158
159 func assertEndpointsError(t *testing.T, cache *endpointCache, wantErr string) {
160 endpoints, err := cache.Endpoints()
161 if err == nil {
162 t.Errorf("expecting error, not good")
163 return
164 }
165 if want, have := wantErr, err.Error(); want != have {
166 t.Errorf("want %s, have %s", want, have)
167 return
168 }
169 if want, have := 0, len(endpoints); want != have {
170 t.Errorf("want %d, have %d", want, have)
171 }
172 }
173
89174 type closer chan struct{}
90175
91176 func (c closer) Close() error { close(c); return nil }
1414
1515 // Instancer yields instances for a service in Consul.
1616 type Instancer struct {
17 instance.Cache
17 cache *instance.Cache
1818 client Client
1919 logger log.Logger
2020 service string
2828 // are present.
2929 func NewInstancer(client Client, logger log.Logger, service string, tags []string, passingOnly bool) *Instancer {
3030 s := &Instancer{
31 Cache: *instance.NewCache(),
31 cache: instance.NewCache(),
3232 client: client,
3333 logger: log.With(logger, "service", service, "tags", fmt.Sprint(tags)),
3434 service: service,
4444 s.logger.Log("err", err)
4545 }
4646
47 s.Update(sd.Event{Instances: instances, Err: err})
47 s.cache.Update(sd.Event{Instances: instances, Err: err})
4848 go s.loop(index)
4949 return s
5050 }
6666 return // stopped via quitc
6767 case err != nil:
6868 s.logger.Log("err", err)
69 s.Update(sd.Event{Err: err})
69 s.cache.Update(sd.Event{Err: err})
7070 default:
71 s.Update(sd.Event{Instances: instances})
71 s.cache.Update(sd.Event{Instances: instances})
7272 }
7373 }
7474 }
122122 }
123123 }
124124
125 // Register implements Instancer.
126 func (s *Instancer) Register(ch chan<- sd.Event) {
127 s.cache.Register(ch)
128 }
129
130 // Deregister implements Instancer.
131 func (s *Instancer) Deregister(ch chan<- sd.Event) {
132 s.cache.Deregister(ch)
133 }
134
125135 func filterEntries(entries []*consul.ServiceEntry, tags ...string) []*consul.ServiceEntry {
126136 var es []*consul.ServiceEntry
127137
6868 s := NewInstancer(client, logger, "search", []string{"api"}, true)
6969 defer s.Stop()
7070
71 state := s.State()
71 state := s.cache.State()
7272 if want, have := 2, len(state.Instances); want != have {
7373 t.Errorf("want %d, have %d", want, have)
7474 }
8383 s := NewInstancer(client, logger, "feed", []string{}, true)
8484 defer s.Stop()
8585
86 state := s.State()
86 state := s.cache.State()
8787 if want, have := 0, len(state.Instances); want != have {
8888 t.Fatalf("want %d, have %d", want, have)
8989 }
9898 s := NewInstancer(client, logger, "search", []string{"api", "v2"}, true)
9999 defer s.Stop()
100100
101 state := s.State()
101 state := s.cache.State()
102102 if want, have := 1, len(state.Instances); want != have {
103103 t.Fatalf("want %d, have %d", want, have)
104104 }
108108 s := NewInstancer(newTestClient(consulState), log.NewNopLogger(), "search", []string{"db"}, true)
109109 defer s.Stop()
110110
111 state := s.State()
111 state := s.cache.State()
112112 if want, have := 1, len(state.Instances); want != have {
113113 t.Fatalf("want %d, have %d", want, have)
114114 }
1212 // Instancer yields instances from the named DNS SRV record. The name is
1313 // resolved on a fixed schedule. Priorities and weights are ignored.
1414 type Instancer struct {
15 instance.Cache
15 cache *instance.Cache
1616 name string
1717 logger log.Logger
1818 quit chan struct{}
3737 logger log.Logger,
3838 ) *Instancer {
3939 p := &Instancer{
40 Cache: *instance.NewCache(),
40 cache: instance.NewCache(),
4141 name: name,
4242 logger: logger,
4343 quit: make(chan struct{}),
4949 } else {
5050 logger.Log("name", name, "err", err)
5151 }
52 p.Update(sd.Event{Instances: instances, Err: err})
52 p.cache.Update(sd.Event{Instances: instances, Err: err})
5353
5454 go p.loop(refresh, lookup)
5555 return p
6868 instances, err := p.resolve(lookup)
6969 if err != nil {
7070 p.logger.Log("name", p.name, "err", err)
71 p.Update(sd.Event{Err: err})
71 p.cache.Update(sd.Event{Err: err})
7272 continue // don't replace potentially-good with bad
7373 }
74 p.Update(sd.Event{Instances: instances})
74 p.cache.Update(sd.Event{Instances: instances})
7575
7676 case <-p.quit:
7777 return
9090 }
9191 return instances, nil
9292 }
93
94 // Register implements Instancer.
95 func (s *Instancer) Register(ch chan<- sd.Event) {
96 s.cache.Register(ch)
97 }
98
99 // Deregister implements Instancer.
100 func (s *Instancer) Deregister(ch chan<- sd.Event) {
101 s.cache.Deregister(ch)
102 }
3131 defer instancer.Stop()
3232
3333 // First lookup, empty
34 state := instancer.State()
34 state := instancer.cache.State()
3535 if state.Err != nil {
3636 t.Error(state.Err)
3737 }
5555 // TODO(pb): solve by running the read through the loop goroutine.
5656 time.Sleep(100 * time.Millisecond)
5757
58 state = instancer.State()
58 state = instancer.cache.State()
5959 if state.Err != nil {
6060 t.Error(state.Err)
6161 }
2323 // NewEndpointer creates an Endpointer that subscribes to updates from Instancer src
2424 // and uses factory f to create Endpoints. If src notifies of an error, the Endpointer
2525 // keeps returning previously created Endpoints assuming they are still good, unless
26 // this behavior is disabled with ResetOnError option.
27 func NewEndpointer(src Instancer, f Factory, logger log.Logger, options ...EndpointerOption) Endpointer {
26 // this behavior is disabled via InvalidateOnError option.
27 func NewEndpointer(src Instancer, f Factory, logger log.Logger, options ...EndpointerOption) *DefaultEndpointer {
2828 opts := endpointerOptions{}
2929 for _, opt := range options {
3030 opt(&opts)
3131 }
32 se := &simpleEndpointer{
33 endpointCache: *newEndpointCache(f, logger, opts),
34 instancer: src,
35 ch: make(chan Event),
32 se := &DefaultEndpointer{
33 cache: newEndpointCache(f, logger, opts),
34 instancer: src,
35 ch: make(chan Event),
3636 }
3737 go se.receive()
3838 src.Register(se.ch)
5959 invalidateOnErrorTimeout *time.Duration
6060 }
6161
62 type simpleEndpointer struct {
63 endpointCache
64
62 // DefaultEndpointer implements an Endpointer interface.
63 // When created with NewEndpointer function, it automatically registers
64 // as a subscriber to events from the Instances and maintains a list
65 // of active Endpoints.
66 type DefaultEndpointer struct {
67 cache *endpointCache
6568 instancer Instancer
6669 ch chan Event
6770 }
6871
69 func (se *simpleEndpointer) receive() {
70 for event := range se.ch {
71 se.Update(event)
72 func (de *DefaultEndpointer) receive() {
73 for event := range de.ch {
74 de.cache.Update(event)
7275 }
7376 }
7477
75 func (se *simpleEndpointer) Close() {
76 se.instancer.Deregister(se.ch)
77 close(se.ch)
78 // Close de-registeres DefaultEndpointer from the Instancer and stops the internal go-routine.
79 func (de *DefaultEndpointer) Close() {
80 de.instancer.Deregister(de.ch)
81 close(de.ch)
7882 }
83
84 // Endpoints implements Endpointer.
85 func (de *DefaultEndpointer) Endpoints() ([]endpoint.Endpoint, error) {
86 return de.cache.Endpoints()
87 }
88 // Instancer yields instances stored in a certain etcd keyspace. Any kind of
99 // change in that keyspace is watched and will update the Instancer's Instancers.
1010 type Instancer struct {
11 instance.Cache
11 cache *instance.Cache
1212 client Client
1313 prefix string
1414 logger log.Logger
2121 s := &Instancer{
2222 client: c,
2323 prefix: prefix,
24 Cache: *instance.NewCache(),
24 cache: instance.NewCache(),
2525 logger: logger,
2626 quitc: make(chan struct{}),
2727 }
3232 } else {
3333 logger.Log("prefix", s.prefix, "err", err)
3434 }
35 s.Update(sd.Event{Instances: instances, Err: err})
35 s.cache.Update(sd.Event{Instances: instances, Err: err})
3636
3737 go s.loop()
3838 return s, nil
4747 instances, err := s.client.GetEntries(s.prefix)
4848 if err != nil {
4949 s.logger.Log("msg", "failed to retrieve entries", "err", err)
50 s.Update(sd.Event{Err: err})
50 s.cache.Update(sd.Event{Err: err})
5151 continue
5252 }
53 s.Update(sd.Event{Instances: instances})
53 s.cache.Update(sd.Event{Instances: instances})
5454
5555 case <-s.quitc:
5656 return
6262 func (s *Instancer) Stop() {
6363 close(s.quitc)
6464 }
65
66 // Register implements Instancer.
67 func (s *Instancer) Register(ch chan<- sd.Event) {
68 s.cache.Register(ch)
69 }
70
71 // Deregister implements Instancer.
72 func (s *Instancer) Deregister(ch chan<- sd.Event) {
73 s.cache.Deregister(ch)
74 }
3535 }
3636 defer s.Stop()
3737
38 if state := s.State(); state.Err != nil {
38 if state := s.cache.State(); state.Err != nil {
3939 t.Fatal(state.Err)
4040 }
4141 }
1212 // Instancer yields instances stored in the Eureka registry for the given app.
1313 // Changes in that app are watched and will update the subscribers.
1414 type Instancer struct {
15 instance.Cache
15 cache *instance.Cache
1616 conn fargoConnection
1717 app string
1818 logger log.Logger
2525 logger = log.With(logger, "app", app)
2626
2727 s := &Instancer{
28 Cache: *instance.NewCache(),
28 cache: instance.NewCache(),
2929 conn: conn,
3030 app: app,
3131 logger: logger,
3939 s.logger.Log("during", "getInstances", "err", err)
4040 }
4141
42 s.Update(sd.Event{Instances: instances, Err: err})
42 s.cache.Update(sd.Event{Instances: instances, Err: err})
4343 go s.loop()
4444 return s
4545 }
6565 case update := <-updatec:
6666 if update.Err != nil {
6767 s.logger.Log("during", "Update", "err", update.Err)
68 s.Update(sd.Event{Err: update.Err})
68 s.cache.Update(sd.Event{Err: update.Err})
6969 continue
7070 }
7171 instances := convertFargoAppToInstances(update.App)
7272 s.logger.Log("instances", len(instances))
73 s.Update(sd.Event{Instances: instances})
73 s.cache.Update(sd.Event{Instances: instances})
7474
7575 case q := <-s.quitc:
7676 close(q)
9494 }
9595 return instances
9696 }
97
98 // Register implements Instancer.
99 func (s *Instancer) Register(ch chan<- sd.Event) {
100 s.cache.Register(ch)
101 }
102
103 // Deregister implements Instancer.
104 func (s *Instancer) Deregister(ch chan<- sd.Event) {
105 s.cache.Deregister(ch)
106 }
2020 instancer := NewInstancer(connection, appNameTest, loggerTest)
2121 defer instancer.Stop()
2222
23 state := instancer.State()
23 state := instancer.cache.State()
2424 if state.Err != nil {
2525 t.Fatal(state.Err)
2626 }
4040 instancer := NewInstancer(connection, appNameTest, loggerTest)
4141 defer instancer.Stop()
4242
43 state := instancer.State()
43 state := instancer.cache.State()
4444 if want, have := 1, len(state.Instances); want != have {
4545 t.Errorf("want %d, have %d", want, have)
4646 }
4747
4848 time.Sleep(50 * time.Millisecond)
4949
50 state = instancer.State()
50 state = instancer.cache.State()
5151 if want, have := 2, len(state.Instances); want != have {
5252 t.Errorf("want %v, have %v", want, have)
5353 }
6464 instancer := NewInstancer(connection, appNameTest, loggerTest)
6565 defer instancer.Stop()
6666
67 state := instancer.State()
67 state := instancer.cache.State()
6868 if state.Err == nil {
6969 t.Fatal("expecting error")
7070 }
8484 instancer := NewInstancer(connection, appNameTest, loggerTest)
8585 defer instancer.Stop()
8686
87 state := instancer.State()
87 state := instancer.cache.State()
8888 if state.Err != nil {
8989 t.Error(state.Err)
9090 }
9494
9595 time.Sleep(50 * time.Millisecond)
9696
97 state = instancer.State()
97 state = instancer.cache.State()
9898 if state.Err == nil {
9999 t.Fatal("expecting error")
100100 }
1010 // Instancer yield instances stored in a certain ZooKeeper path. Any kind of
1111 // change in that path is watched and will update the subscribers.
1212 type Instancer struct {
13 instance.Cache
13 cache *instance.Cache
1414 client Client
1515 path string
1616 logger log.Logger
2121 // the given path for changes and update the Instancer endpoints.
2222 func NewInstancer(c Client, path string, logger log.Logger) (*Instancer, error) {
2323 s := &Instancer{
24 Cache: *instance.NewCache(),
24 cache: instance.NewCache(),
2525 client: c,
2626 path: path,
2727 logger: logger,
3636 instances, eventc, err := s.client.GetEntries(s.path)
3737 if err != nil {
3838 logger.Log("path", s.path, "msg", "failed to retrieve entries", "err", err)
39 // TODO why zk constructor exits when other implementations continue?
39 // other implementations continue here, but we exit because we don't know if eventc is valid
4040 return nil, err
4141 }
4242 logger.Log("path", s.path, "instances", len(instances))
43 s.Update(sd.Event{Instances: instances})
43 s.cache.Update(sd.Event{Instances: instances})
4444
4545 go s.loop(eventc)
4646
6161 instances, eventc, err = s.client.GetEntries(s.path)
6262 if err != nil {
6363 s.logger.Log("path", s.path, "msg", "failed to retrieve entries", "err", err)
64 s.Update(sd.Event{Err: err})
64 s.cache.Update(sd.Event{Err: err})
6565 continue
6666 }
6767 s.logger.Log("path", s.path, "instances", len(instances))
68 s.Update(sd.Event{Instances: instances})
68 s.cache.Update(sd.Event{Instances: instances})
6969
7070 case <-s.quitc:
7171 return
7777 func (s *Instancer) Stop() {
7878 close(s.quitc)
7979 }
80
81 // Register implements Instancer.
82 func (s *Instancer) Register(ch chan<- sd.Event) {
83 s.cache.Register(ch)
84 }
85
86 // Deregister implements Instancer.
87 func (s *Instancer) Deregister(ch chan<- sd.Event) {
88 s.cache.Deregister(ch)
89 }