Codebase list golang-github-go-kit-kit / 0e6a1b1
Fixed instancer logic for Consul (#1215) The instancer loop was never updating the lastIndex but passing it to getInstances which caused Instancer to spam Consul repeatedly and nearly killed our Consul agent pods and time instances were added or removed from Consul for that service. Joseph Kratz authored 2 years ago GitHub committed 2 years ago
2 changed file(s) with 67 addition(s) and 2 deletion(s). Raw diff Collapse all Expand all
6464 instances []string
6565 err error
6666 d time.Duration = 10 * time.Millisecond
67 index uint64
6768 )
6869 for {
69 index := lastIndex
7070 instances, index, err = s.getInstances(lastIndex, s.quitc)
7171 switch {
7272 case errors.Is(err, errStopped):
8181 time.Sleep(d)
8282 d = conn.Exponential(d)
8383 case index < lastIndex:
84 s.logger.Log("err", "index is less than previous; reseting to default")
84 s.logger.Log("err", "index is less than previous; resetting to default")
8585 lastIndex = defaultIndex
8686 time.Sleep(d)
8787 d = conn.Exponential(d)
8888 default:
89 lastIndex = index
8990 s.cache.Update(sd.Event{Instances: instances})
9091 d = 10 * time.Millisecond
9192 }
11
22 import (
33 "context"
4 "fmt"
45 "io"
56 "testing"
67 "time"
260261 t.Error("failed, to receive call in time")
261262 }
262263 }
264
265 type indexTestClient struct {
266 client *testClient
267 index uint64
268 errs chan error
269 }
270
271 func newIndexTestClient(c *testClient, errs chan error) *indexTestClient {
272 return &indexTestClient{
273 client: c,
274 index: 0,
275 errs: errs,
276 }
277 }
278
279 func (i *indexTestClient) Register(r *consul.AgentServiceRegistration) error {
280 return i.client.Register(r)
281 }
282
283 func (i *indexTestClient) Deregister(r *consul.AgentServiceRegistration) error {
284 return i.client.Deregister(r)
285 }
286
287 func (i *indexTestClient) Service(service, tag string, passingOnly bool, queryOpts *consul.QueryOptions) ([]*consul.ServiceEntry, *consul.QueryMeta, error) {
288
289 // Assumes this is the first call Service, loop hasn't begun running yet
290 if i.index == 0 && queryOpts.WaitIndex == 0 {
291 i.index = 100
292 entries, meta, err := i.client.Service(service, tag, passingOnly, queryOpts)
293 meta.LastIndex = i.index
294 return entries, meta, err
295 }
296
297 if queryOpts.WaitIndex < i.index {
298 i.errs <- fmt.Errorf("wait index %d is less than or equal to previous value", queryOpts.WaitIndex)
299 }
300
301 entries, meta, err := i.client.Service(service, tag, passingOnly, queryOpts)
302 i.index++
303 meta.LastIndex = i.index
304 return entries, meta, err
305 }
306
307 func TestInstancerLoopIndex(t *testing.T) {
308
309 var (
310 errs = make(chan error, 1)
311 logger = log.NewNopLogger()
312 client = newIndexTestClient(newTestClient(consulState), errs)
313 )
314
315 go func() {
316 for err := range errs {
317 t.Error(err)
318 t.FailNow()
319 }
320 }()
321
322 instancer := NewInstancer(client, logger, "search", []string{"api"}, true)
323 defer instancer.Stop()
324
325 time.Sleep(2 * time.Second)
326 }