Codebase list golang-github-go-kit-kit / 5561005
Improve inconsistent Consul SD index handling. (#999) Consul has guidelines for how the index movement should be handled. https://www.consul.io/api-docs/features/blocking#implementation-details Generally the index should always increase but if it does not increase this patch implements the suggested solutions for recovery. Weston Schmidt authored 3 years ago GitHub committed 3 years ago
3 changed file(s) with 70 addition(s) and 2 deletion(s). Raw diff Collapse all Expand all
8989 results = append(results, entry)
9090 }
9191
92 return results, &stdconsul.QueryMeta{}, nil
92 return results, &stdconsul.QueryMeta{LastIndex: opts.WaitIndex}, nil
9393 }
9494
9595 func (c *testClient) Register(r *stdconsul.AgentServiceRegistration) error {
6666 d time.Duration = 10 * time.Millisecond
6767 )
6868 for {
69 instances, lastIndex, err = s.getInstances(lastIndex, s.quitc)
69 index := lastIndex
70 instances, index, err = s.getInstances(lastIndex, s.quitc)
7071 switch {
7172 case err == errStopped:
7273 return // stopped via quitc
7576 time.Sleep(d)
7677 d = conn.Exponential(d)
7778 s.cache.Update(sd.Event{Err: err})
79 case index == defaultIndex:
80 s.logger.Log("err", "index is not sane")
81 time.Sleep(d)
82 d = conn.Exponential(d)
83 case index < lastIndex:
84 s.logger.Log("err", "index is less than previous; reseting to default")
85 lastIndex = defaultIndex
86 time.Sleep(d)
87 d = conn.Exponential(d)
7888 default:
7989 s.cache.Update(sd.Event{Instances: instances})
8090 d = 10 * time.Millisecond
201201 t.Error("failed, to receive call in time")
202202 }
203203 }
204
205 type badIndexTestClient struct {
206 client *testClient
207 called chan struct{}
208 }
209
210 func newBadIndexTestClient(client *testClient, called chan struct{}) Client {
211 return &badIndexTestClient{client: client, called: called}
212 }
213
214 func (c *badIndexTestClient) Register(r *consul.AgentServiceRegistration) error {
215 return c.client.Register(r)
216 }
217
218 func (c *badIndexTestClient) Deregister(r *consul.AgentServiceRegistration) error {
219 return c.client.Deregister(r)
220 }
221
222 func (c *badIndexTestClient) Service(service, tag string, passingOnly bool, queryOpts *consul.QueryOptions) ([]*consul.ServiceEntry, *consul.QueryMeta, error) {
223 switch {
224 case queryOpts.WaitIndex == 0:
225 queryOpts.WaitIndex = 100
226 case queryOpts.WaitIndex == 100:
227 queryOpts.WaitIndex = 99
228 default:
229 }
230 c.called <- struct{}{}
231 return c.client.Service(service, tag, passingOnly, queryOpts)
232 }
233
234 func TestInstancerWithInvalidIndex(t *testing.T) {
235 var (
236 called = make(chan struct{}, 1)
237 logger = log.NewNopLogger()
238 client = newBadIndexTestClient(newTestClient(consulState), called)
239 )
240
241 s := NewInstancer(client, logger, "search", []string{"api"}, true)
242 defer s.Stop()
243
244 select {
245 case <-called:
246 case <-time.Tick(time.Millisecond * 500):
247 t.Error("failed, to receive call")
248 }
249
250 state := s.cache.State()
251 if want, have := 2, len(state.Instances); want != have {
252 t.Errorf("want %d, have %d", want, have)
253 }
254
255 // loop should continue
256 select {
257 case <-called:
258 case <-time.Tick(time.Millisecond * 500):
259 t.Error("failed, to receive call in time")
260 }
261 }