diff --git a/sd/eureka/instancer.go b/sd/eureka/instancer.go index 69c4556..d00b98c 100644 --- a/sd/eureka/instancer.go +++ b/sd/eureka/instancer.go @@ -33,15 +33,10 @@ quitc: make(chan chan struct{}), } - instances, err := s.getInstances() - if err == nil { - s.logger.Log("instances", len(instances)) - } else { - s.logger.Log("during", "getInstances", "err", err) - } - - s.cache.Update(sd.Event{Instances: instances, Err: err}) - go s.loop() + done := make(chan struct{}) + updates := conn.ScheduleAppUpdates(app, true, done) + s.consume(<-updates) + go s.loop(updates, done) return s } @@ -53,26 +48,24 @@ s.quitc = nil } -func (s *Instancer) loop() { - var ( - await = false - done = make(chan struct{}) - updatec = s.conn.ScheduleAppUpdates(s.app, await, done) - ) +func (s *Instancer) consume(update fargo.AppUpdate) { + if update.Err != nil { + s.logger.Log("during", "Update", "err", update.Err) + s.cache.Update(sd.Event{Err: update.Err}) + return + } + instances := convertFargoAppToInstances(update.App) + s.logger.Log("instances", len(instances)) + s.cache.Update(sd.Event{Instances: instances}) +} + +func (s *Instancer) loop(updates <-chan fargo.AppUpdate, done chan<- struct{}) { defer close(done) for { select { - case update := <-updatec: - if update.Err != nil { - s.logger.Log("during", "Update", "err", update.Err) - s.cache.Update(sd.Event{Err: update.Err}) - continue - } - instances := convertFargoAppToInstances(update.App) - s.logger.Log("instances", len(instances)) - s.cache.Update(sd.Event{Instances: instances}) - + case update := <-updates: + s.consume(update) case q := <-s.quitc: close(q) return diff --git a/sd/eureka/instancer_test.go b/sd/eureka/instancer_test.go index 98c83e5..cde4c61 100644 --- a/sd/eureka/instancer_test.go +++ b/sd/eureka/instancer_test.go @@ -13,59 +13,75 @@ func TestInstancer(t *testing.T) { connection := &testConnection{ - instances: []*fargo.Instance{instanceTest1}, - application: appUpdateTest, + instances: []*fargo.Instance{instanceTest1, instanceTest2}, errApplication: nil, } instancer := NewInstancer(connection, appNameTest, loggerTest) defer instancer.Stop() - state := instancer.cache.State() + state := instancer.state() if state.Err != nil { t.Fatal(state.Err) } - if want, have := 1, len(state.Instances); want != have { + if want, have := 2, len(state.Instances); want != have { t.Errorf("want %d, have %d", want, have) } } -func TestInstancerScheduleUpdates(t *testing.T) { +func TestInstancerReceivesUpdates(t *testing.T) { connection := &testConnection{ instances: []*fargo.Instance{instanceTest1}, - application: appUpdateTest, errApplication: nil, } instancer := NewInstancer(connection, appNameTest, loggerTest) defer instancer.Stop() - state := instancer.cache.State() - if want, have := 1, len(state.Instances); want != have { - t.Errorf("want %d, have %d", want, have) + verifyCount := func(want int) (have int, converged bool) { + const maxPollAttempts = 5 + const delayPerAttempt = 200 * time.Millisecond + for i := 1; ; i++ { + state := instancer.state() + if have := len(state.Instances); want == have { + return have, true + } else if i == maxPollAttempts { + return have, false + } + time.Sleep(delayPerAttempt) + } } - time.Sleep(50 * time.Millisecond) + if have, converged := verifyCount(1); !converged { + t.Fatalf("initial: want %d, have %d", 1, have) + } - state = instancer.cache.State() - if want, have := 2, len(state.Instances); want != have { - t.Errorf("want %v, have %v", want, have) + if err := connection.RegisterInstance(instanceTest2); err != nil { + t.Fatalf("failed to register an instance: %v", err) + } + if have, converged := verifyCount(2); !converged { + t.Fatalf("after registration: want %d, have %d", 2, have) + } + + if err := connection.DeregisterInstance(instanceTest1); err != nil { + t.Fatalf("failed to unregister an instance: %v", err) + } + if have, converged := verifyCount(1); !converged { + t.Fatalf("after deregistration: want %d, have %d", 1, have) } } -func TestBadInstancerInstances(t *testing.T) { +func TestBadInstancerScheduleUpdates(t *testing.T) { connection := &testConnection{ - instances: []*fargo.Instance{}, - errInstances: errTest, - application: appUpdateTest, - errApplication: nil, + instances: []*fargo.Instance{instanceTest1}, + errApplication: errTest, } instancer := NewInstancer(connection, appNameTest, loggerTest) defer instancer.Stop() - state := instancer.cache.State() + state := instancer.state() if state.Err == nil { t.Fatal("expecting error") } @@ -74,33 +90,3 @@ t.Errorf("want %d, have %d", want, have) } } - -func TestBadInstancerScheduleUpdates(t *testing.T) { - connection := &testConnection{ - instances: []*fargo.Instance{instanceTest1}, - application: appUpdateTest, - errApplication: errTest, - } - - instancer := NewInstancer(connection, appNameTest, loggerTest) - defer instancer.Stop() - - state := instancer.cache.State() - if state.Err != nil { - t.Error(state.Err) - } - if want, have := 1, len(state.Instances); want != have { - t.Errorf("want %d, have %d", want, have) - } - - time.Sleep(50 * time.Millisecond) - - state = instancer.cache.State() - if state.Err == nil { - t.Fatal("expecting error") - } - - if want, have := 0, len(state.Instances); want != have { - t.Errorf("want %v, have %v", want, have) - } -} diff --git a/sd/eureka/registrar.go b/sd/eureka/registrar.go index 99fef7c..7af7836 100644 --- a/sd/eureka/registrar.go +++ b/sd/eureka/registrar.go @@ -28,6 +28,10 @@ type fargoUnsuccessfulHTTPResponse struct { statusCode int messagePrefix string +} + +func (u *fargoUnsuccessfulHTTPResponse) Error() string { + return fmt.Sprintf("err=%s code=%d", u.messagePrefix, u.statusCode) } // Registrar maintains service instance liveness information in Eureka. @@ -110,18 +114,30 @@ } } +func httpResponseStatusCode(err error) (code int, present bool) { + if code, ok := fargo.HTTPResponseStatusCode(err); ok { + return code, true + } + // Allow injection of errors for testing. + if u, ok := err.(*fargoUnsuccessfulHTTPResponse); ok { + return u.statusCode, true + } + return 0, false +} + +func isNotFound(err error) bool { + code, ok := httpResponseStatusCode(err) + return ok && code == http.StatusNotFound +} + func (r *Registrar) heartbeat() error { err := r.conn.HeartBeatInstance(r.instance) - if err != nil { - if u, ok := err.(*fargoUnsuccessfulHTTPResponse); ok && u.statusCode == http.StatusNotFound { - // Instance expired (e.g. network partition). Re-register. - r.logger.Log("during", "heartbeat", err.Error()) - return r.conn.ReregisterInstance(r.instance) - } + if err == nil { + return nil + } + if isNotFound(err) { + // Instance expired (e.g. network partition). Re-register. + return r.conn.ReregisterInstance(r.instance) } return err } - -func (u *fargoUnsuccessfulHTTPResponse) Error() string { - return fmt.Sprintf("err=%s code=%d", u.messagePrefix, u.statusCode) -} diff --git a/sd/eureka/util_test.go b/sd/eureka/util_test.go index a66e712..e8a71a4 100644 --- a/sd/eureka/util_test.go +++ b/sd/eureka/util_test.go @@ -2,16 +2,19 @@ import ( "errors" + "fmt" "reflect" + "sync" + "time" "github.com/go-kit/kit/log" "github.com/hudl/fargo" ) type testConnection struct { - instances []*fargo.Instance - application *fargo.Application - errInstances error + mu sync.RWMutex + instances []*fargo.Instance + errApplication error errHeartbeat error errRegister error @@ -23,10 +26,6 @@ errNotFound = &fargoUnsuccessfulHTTPResponse{statusCode: 404, messagePrefix: "not found"} loggerTest = log.NewNopLogger() appNameTest = "go-kit" - appUpdateTest = &fargo.Application{ - Name: appNameTest, - Instances: []*fargo.Instance{instanceTest1, instanceTest2}, - } instanceTest1 = &fargo.Instance{ HostName: "serveregistrar1.acme.org", Port: 8080, @@ -59,16 +58,18 @@ var _ fargoConnection = (*testConnection)(nil) func (c *testConnection) RegisterInstance(i *fargo.Instance) error { - if c.errRegister == nil { - for _, instance := range c.instances { - if reflect.DeepEqual(*instance, *i) { - return errors.New("already registered") - } + if c.errRegister != nil { + return c.errRegister + } + c.mu.Lock() + defer c.mu.Unlock() + for _, instance := range c.instances { + if reflect.DeepEqual(*instance, *i) { + return errors.New("already registered") } - - c.instances = append(c.instances, i) } - return c.errRegister + c.instances = append(c.instances, i) + return nil } func (c *testConnection) HeartBeatInstance(i *fargo.Instance) error { @@ -76,33 +77,76 @@ } func (c *testConnection) DeregisterInstance(i *fargo.Instance) error { - if c.errDeregister == nil { - var newInstances []*fargo.Instance - for _, instance := range c.instances { - if reflect.DeepEqual(*instance, *i) { - continue - } - newInstances = append(newInstances, instance) + if c.errDeregister != nil { + return c.errDeregister + } + c.mu.Lock() + defer c.mu.Unlock() + remaining := make([]*fargo.Instance, 0, len(c.instances)) + for _, instance := range c.instances { + if reflect.DeepEqual(*instance, *i) { + continue } - if len(newInstances) == len(c.instances) { - return errors.New("not registered") - } - - c.instances = newInstances + remaining = append(remaining, instance) } - return c.errDeregister + if len(remaining) == len(c.instances) { + return errors.New("not registered") + } + c.instances = remaining + return nil } func (c *testConnection) ReregisterInstance(ins *fargo.Instance) error { return nil } -func (c *testConnection) ScheduleAppUpdates(name string, await bool, done <-chan struct{}) <-chan fargo.AppUpdate { - updatec := make(chan fargo.AppUpdate, 1) - updatec <- fargo.AppUpdate{App: c.application, Err: c.errApplication} - return updatec +func (c *testConnection) instancesForApplication(name string) []*fargo.Instance { + c.mu.RLock() + defer c.mu.RUnlock() + instances := make([]*fargo.Instance, 0, len(c.instances)) + for _, i := range c.instances { + if i.App == name { + instances = append(instances, i) + } + } + return instances } func (c *testConnection) GetApp(name string) (*fargo.Application, error) { - return &fargo.Application{Name: appNameTest, Instances: c.instances}, c.errInstances + if err := c.errApplication; err != nil { + return nil, err + } + instances := c.instancesForApplication(name) + if len(instances) == 0 { + return nil, fmt.Errorf("Application not found for name=%s", name) + } + return &fargo.Application{Name: name, Instances: instances}, nil } + +func (c *testConnection) ScheduleAppUpdates(name string, await bool, done <-chan struct{}) <-chan fargo.AppUpdate { + updatec := make(chan fargo.AppUpdate, 1) + send := func() { + app, err := c.GetApp(name) + select { + case updatec <- fargo.AppUpdate{App: app, Err: err}: + default: + } + } + + if await { + send() + } + go func() { + ticker := time.NewTicker(100 * time.Millisecond) + for { + select { + case <-ticker.C: + send() + case <-done: + ticker.Stop() + return + } + } + }() + return updatec +}