diff --git a/sd/eureka/client.go b/sd/eureka/client.go deleted file mode 100644 index deeb497..0000000 --- a/sd/eureka/client.go +++ /dev/null @@ -1,77 +0,0 @@ -package eureka - -import ( - "github.com/hudl/fargo" -) - -// Client is a wrapper around the Eureka API. -type Client interface { - // Register an instance with Eureka. - Register(i *fargo.Instance) error - - // Deregister an instance from Eureka. - Deregister(i *fargo.Instance) error - - // Send an instance heartbeat to Eureka. - Heartbeat(i *fargo.Instance) error - - // Get all instances for an app in Eureka. - Instances(app string) ([]*fargo.Instance, error) - - // Receive scheduled updates about an app's instances in Eureka. - ScheduleUpdates(app string, quitc chan struct{}) <-chan fargo.AppUpdate -} - -type client struct { - connection *fargo.EurekaConnection -} - -// NewClient returns an implementation of the Client interface, wrapping a -// concrete connection to Eureka using the Fargo library. -// Taking in Fargo's own connection abstraction gives the user maximum -// freedom in regards to how that connection is configured. -func NewClient(ec *fargo.EurekaConnection) Client { - return &client{connection: ec} -} - -func (c *client) Register(i *fargo.Instance) error { - if c.instanceRegistered(i) { - // Already registered. Send a heartbeat instead. - return c.Heartbeat(i) - } - return c.connection.RegisterInstance(i) -} - -func (c *client) Deregister(i *fargo.Instance) error { - return c.connection.DeregisterInstance(i) -} - -func (c *client) Heartbeat(i *fargo.Instance) (err error) { - if err = c.connection.HeartBeatInstance(i); err != nil && c.instanceNotFoundErr(err) { - // Instance not registered. Register first before sending heartbeats. - return c.Register(i) - } - return err -} - -func (c *client) Instances(app string) ([]*fargo.Instance, error) { - stdApp, err := c.connection.GetApp(app) - if err != nil { - return nil, err - } - return stdApp.Instances, nil -} - -func (c *client) ScheduleUpdates(app string, quitc chan struct{}) <-chan fargo.AppUpdate { - return c.connection.ScheduleAppUpdates(app, false, quitc) -} - -func (c *client) instanceRegistered(i *fargo.Instance) bool { - _, err := c.connection.GetInstance(i.App, i.Id()) - return err == nil -} - -func (c *client) instanceNotFoundErr(err error) bool { - code, ok := fargo.HTTPResponseStatusCode(err) - return ok && code == 404 -} diff --git a/sd/eureka/client_test.go b/sd/eureka/client_test.go deleted file mode 100644 index 4df5eb2..0000000 --- a/sd/eureka/client_test.go +++ /dev/null @@ -1,95 +0,0 @@ -package eureka - -import ( - "errors" - "reflect" - - "github.com/hudl/fargo" - - "github.com/go-kit/kit/log" -) - -var ( - errTest = errors.New("kaboom") - loggerTest = log.NewNopLogger() - instanceTest1 = &fargo.Instance{ - HostName: "server1.acme.org", - Port: 8080, - App: "go-kit", - IPAddr: "192.168.0.1", - VipAddress: "192.168.0.1", - SecureVipAddress: "192.168.0.1", - HealthCheckUrl: "http://server1.acme.org:8080/healthz", - StatusPageUrl: "http://server1.acme.org:8080/status", - HomePageUrl: "http://server1.acme.org:8080/", - Status: fargo.UP, - DataCenterInfo: fargo.DataCenterInfo{Name: fargo.MyOwn}, - LeaseInfo: fargo.LeaseInfo{RenewalIntervalInSecs: 1}, - } - instanceTest2 = &fargo.Instance{ - HostName: "server2.acme.org", - Port: 8080, - App: "go-kit", - IPAddr: "192.168.0.2", - VipAddress: "192.168.0.2", - SecureVipAddress: "192.168.0.2", - HealthCheckUrl: "http://server2.acme.org:8080/healthz", - StatusPageUrl: "http://server2.acme.org:8080/status", - HomePageUrl: "http://server2.acme.org:8080/", - Status: fargo.UP, - DataCenterInfo: fargo.DataCenterInfo{Name: fargo.MyOwn}, - } - applicationTest = &fargo.Application{ - Name: "go-kit", - Instances: []*fargo.Instance{instanceTest1, instanceTest2}, - } -) - -type testClient struct { - instances []*fargo.Instance - application *fargo.Application - errInstances error - errApplication error - errHeartbeat error -} - -func (c *testClient) Register(i *fargo.Instance) error { - for _, instance := range c.instances { - if reflect.DeepEqual(*instance, *i) { - return errors.New("already registered") - } - } - - c.instances = append(c.instances, i) - return nil -} - -func (c *testClient) Deregister(i *fargo.Instance) error { - var newInstances []*fargo.Instance - for _, instance := range c.instances { - if reflect.DeepEqual(*instance, *i) { - continue - } - newInstances = append(newInstances, instance) - } - if len(newInstances) == len(c.instances) { - return errors.New("not registered") - } - - c.instances = newInstances - return nil -} - -func (c *testClient) Heartbeat(i *fargo.Instance) (err error) { - return c.errHeartbeat -} - -func (c *testClient) Instances(app string) ([]*fargo.Instance, error) { - return c.instances, c.errInstances -} - -func (c *testClient) ScheduleUpdates(service string, quitc chan struct{}) <-chan fargo.AppUpdate { - updatec := make(chan fargo.AppUpdate, 1) - updatec <- fargo.AppUpdate{App: c.application, Err: c.errApplication} - return updatec -} diff --git a/sd/eureka/integration_test.go b/sd/eureka/integration_test.go index ff998fa..b519f70 100644 --- a/sd/eureka/integration_test.go +++ b/sd/eureka/integration_test.go @@ -29,37 +29,36 @@ t.Skip("EUREKA_ADDR is not set") } - var client Client - { - var fargoConfig fargo.Config - fargoConfig.Eureka.ServiceUrls = []string{eurekaAddr} - fargoConfig.Eureka.PollIntervalSeconds = 1 - - fargoConnection := fargo.NewConnFromConfig(fargoConfig) - client = NewClient(&fargoConnection) - } - logger := log.NewLogfmtLogger(os.Stderr) logger = log.With(logger, "ts", log.DefaultTimestamp) + var fargoConfig fargo.Config + // Target Eureka server(s). + fargoConfig.Eureka.ServiceUrls = []string{eurekaAddr} + // How often the subscriber should poll for updates. + fargoConfig.Eureka.PollIntervalSeconds = 1 + + // Create a Fargo connection and a Eureka registrar. + fargoConnection := fargo.NewConnFromConfig(fargoConfig) + registrar1 := NewRegistrar(&fargoConnection, instanceTest1, log.With(logger, "component", "registrar1")) + // Register one instance. - registrar1 := NewRegistrar(client, instanceTest1, log.With(logger, "component", "registrar1")) registrar1.Register() defer registrar1.Deregister() // This should be enough time for the Eureka server response cache to update. time.Sleep(time.Second) - // Build a subscriber. + // Build a Eureka subscriber. factory := func(instance string) (endpoint.Endpoint, io.Closer, error) { t.Logf("factory invoked for %q", instance) return endpoint.Nop, nil, nil } s := NewSubscriber( - client, + &fargoConnection, + appNameTest, factory, log.With(logger, "component", "subscriber"), - instanceTest1.App, ) defer s.Stop() @@ -73,7 +72,7 @@ } // Register a second instance - registrar2 := NewRegistrar(client, instanceTest2, log.With(logger, "component", "registrar2")) + registrar2 := NewRegistrar(&fargoConnection, instanceTest2, log.With(logger, "component", "registrar2")) registrar2.Register() defer registrar2.Deregister() // In case of exceptional circumstances. diff --git a/sd/eureka/registrar.go b/sd/eureka/registrar.go index 1a14d3d..99fef7c 100644 --- a/sd/eureka/registrar.go +++ b/sd/eureka/registrar.go @@ -2,79 +2,126 @@ import ( "fmt" + "net/http" "sync" "time" "github.com/hudl/fargo" "github.com/go-kit/kit/log" + "github.com/go-kit/kit/sd" ) + +// Matches official Netflix Java client default. +const defaultRenewalInterval = 30 * time.Second + +// The methods of fargo.Connection used in this package. +type fargoConnection interface { + RegisterInstance(instance *fargo.Instance) error + DeregisterInstance(instance *fargo.Instance) error + ReregisterInstance(instance *fargo.Instance) error + HeartBeatInstance(instance *fargo.Instance) error + ScheduleAppUpdates(name string, await bool, done <-chan struct{}) <-chan fargo.AppUpdate + GetApp(name string) (*fargo.Application, error) +} + +type fargoUnsuccessfulHTTPResponse struct { + statusCode int + messagePrefix string +} // Registrar maintains service instance liveness information in Eureka. type Registrar struct { - client Client + conn fargoConnection instance *fargo.Instance logger log.Logger - quit chan struct{} - wg sync.WaitGroup + quitc chan chan struct{} + sync.Mutex } +var _ sd.Registrar = (*Registrar)(nil) + // NewRegistrar returns an Eureka Registrar acting on behalf of the provided -// Fargo instance. -func NewRegistrar(client Client, i *fargo.Instance, logger log.Logger) *Registrar { +// Fargo connection and instance. See the integration test for usage examples. +func NewRegistrar(conn fargoConnection, instance *fargo.Instance, logger log.Logger) *Registrar { return &Registrar{ - client: client, - instance: i, - logger: log.With(logger, "service", i.App, "address", fmt.Sprintf("%s:%d", i.IPAddr, i.Port)), + conn: conn, + instance: instance, + logger: log.With(logger, "service", instance.App, "address", fmt.Sprintf("%s:%d", instance.IPAddr, instance.Port)), } } -// Register implements sd.Registrar interface. +// Register implements sd.Registrar. func (r *Registrar) Register() { - if err := r.client.Register(r.instance); err != nil { - r.logger.Log("err", err) - } else { - r.logger.Log("action", "register") + r.Lock() + defer r.Unlock() + + if r.quitc != nil { + return // Already in the registration loop. } + if err := r.conn.RegisterInstance(r.instance); err != nil { + r.logger.Log("during", "Register", "err", err) + } + + r.quitc = make(chan chan struct{}) + go r.loop() +} + +// Deregister implements sd.Registrar. +func (r *Registrar) Deregister() { + r.Lock() + defer r.Unlock() + + if r.quitc == nil { + return // Already deregistered. + } + + q := make(chan struct{}) + r.quitc <- q + <-q + r.quitc = nil +} + +func (r *Registrar) loop() { + var renewalInterval time.Duration if r.instance.LeaseInfo.RenewalIntervalInSecs > 0 { - // User has opted for heartbeat functionality in Eureka. - if r.quit == nil { - r.quit = make(chan struct{}) - r.wg.Add(1) - go r.loop() + renewalInterval = time.Duration(r.instance.LeaseInfo.RenewalIntervalInSecs) * time.Second + } else { + renewalInterval = defaultRenewalInterval + } + ticker := time.NewTicker(renewalInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + if err := r.heartbeat(); err != nil { + r.logger.Log("during", "heartbeat", "err", err) + } + + case q := <-r.quitc: + if err := r.conn.DeregisterInstance(r.instance); err != nil { + r.logger.Log("during", "Deregister", "err", err) + } + close(q) + return } } } -// Deregister implements sd.Registrar interface. -func (r *Registrar) Deregister() { - if err := r.client.Deregister(r.instance); err != nil { - r.logger.Log("err", err) - } else { - r.logger.Log("action", "deregister") +func (r *Registrar) heartbeat() error { + err := r.conn.HeartBeatInstance(r.instance) + if err != nil { + if u, ok := err.(*fargoUnsuccessfulHTTPResponse); ok && u.statusCode == http.StatusNotFound { + // Instance expired (e.g. network partition). Re-register. + r.logger.Log("during", "heartbeat", err.Error()) + return r.conn.ReregisterInstance(r.instance) + } } - - if r.quit != nil { - close(r.quit) - r.wg.Wait() - r.quit = nil - } + return err } -func (r *Registrar) loop() { - tick := time.NewTicker(time.Duration(r.instance.LeaseInfo.RenewalIntervalInSecs) * time.Second) - defer tick.Stop() - defer r.wg.Done() - - for { - select { - case <-tick.C: - if err := r.client.Heartbeat(r.instance); err != nil { - r.logger.Log("err", err) - } - case <-r.quit: - return - } - } +func (u *fargoUnsuccessfulHTTPResponse) Error() string { + return fmt.Sprintf("err=%s code=%d", u.messagePrefix, u.statusCode) } diff --git a/sd/eureka/registrar_test.go b/sd/eureka/registrar_test.go index eee26f3..7974e75 100644 --- a/sd/eureka/registrar_test.go +++ b/sd/eureka/registrar_test.go @@ -3,56 +3,100 @@ import ( "testing" "time" - - "github.com/hudl/fargo" ) func TestRegistrar(t *testing.T) { - client := &testClient{ - instances: []*fargo.Instance{}, + connection := &testConnection{ errHeartbeat: errTest, } - r := NewRegistrar(client, instanceTest1, loggerTest) - if want, have := 0, len(client.instances); want != have { - t.Errorf("want %d, have %d", want, have) - } + registrar1 := NewRegistrar(connection, instanceTest1, loggerTest) + registrar2 := NewRegistrar(connection, instanceTest2, loggerTest) // Not registered. - r.Deregister() - if want, have := 0, len(client.instances); want != have { + registrar1.Deregister() + if want, have := 0, len(connection.instances); want != have { t.Errorf("want %d, have %d", want, have) } // Register. - r.Register() - if want, have := 1, len(client.instances); want != have { + registrar1.Register() + if want, have := 1, len(connection.instances); want != have { + t.Errorf("want %d, have %d", want, have) + } + + registrar2.Register() + if want, have := 2, len(connection.instances); want != have { t.Errorf("want %d, have %d", want, have) } // Deregister. - r.Deregister() - if want, have := 0, len(client.instances); want != have { + registrar1.Deregister() + if want, have := 1, len(connection.instances); want != have { t.Errorf("want %d, have %d", want, have) } // Already registered. - r.Register() - if want, have := 1, len(client.instances); want != have { + registrar1.Register() + if want, have := 2, len(connection.instances); want != have { t.Errorf("want %d, have %d", want, have) } - r.Register() - if want, have := 1, len(client.instances); want != have { + registrar1.Register() + if want, have := 2, len(connection.instances); want != have { t.Errorf("want %d, have %d", want, have) } // Wait for a heartbeat failure. - time.Sleep(time.Second) - if want, have := 1, len(client.instances); want != have { + time.Sleep(1010 * time.Millisecond) + if want, have := 2, len(connection.instances); want != have { t.Errorf("want %d, have %d", want, have) } - r.Deregister() - if want, have := 0, len(client.instances); want != have { + registrar1.Deregister() + if want, have := 1, len(connection.instances); want != have { t.Errorf("want %d, have %d", want, have) } } + +func TestBadRegister(t *testing.T) { + connection := &testConnection{ + errRegister: errTest, + } + + registrar := NewRegistrar(connection, instanceTest1, loggerTest) + registrar.Register() + if want, have := 0, len(connection.instances); want != have { + t.Errorf("want %d, have %d", want, have) + } +} + +func TestBadDeregister(t *testing.T) { + connection := &testConnection{ + errDeregister: errTest, + } + + registrar := NewRegistrar(connection, instanceTest1, loggerTest) + registrar.Register() + if want, have := 1, len(connection.instances); want != have { + t.Errorf("want %d, have %d", want, have) + } + registrar.Deregister() + if want, have := 1, len(connection.instances); want != have { + t.Errorf("want %d, have %d", want, have) + } +} + +func TestExpiredInstance(t *testing.T) { + connection := &testConnection{ + errHeartbeat: errNotFound, + } + + registrar := NewRegistrar(connection, instanceTest1, loggerTest) + registrar.Register() + + // Wait for a heartbeat failure. + time.Sleep(1010 * time.Millisecond) + + if want, have := 1, len(connection.instances); want != have { + t.Errorf("want %d, have %d", want, have) + } +} diff --git a/sd/eureka/subscriber.go b/sd/eureka/subscriber.go index 2788354..0300d0d 100644 --- a/sd/eureka/subscriber.go +++ b/sd/eureka/subscriber.go @@ -14,31 +14,35 @@ // Subscriber yields endpoints stored in the Eureka registry for the given app. // Changes in that app are watched and will update the Subscriber endpoints. type Subscriber struct { - client Client - cache *cache.Cache - logger log.Logger - app string - quitc chan struct{} + conn fargoConnection + app string + factory sd.Factory + logger log.Logger + cache *cache.Cache + quitc chan chan struct{} } -var _ sd.Subscriber = &Subscriber{} +var _ sd.Subscriber = (*Subscriber)(nil) // NewSubscriber returns a Eureka subscriber. It will start watching the given // app string for changes, and update the endpoints accordingly. -func NewSubscriber(c Client, factory sd.Factory, logger log.Logger, app string) *Subscriber { +func NewSubscriber(conn fargoConnection, app string, factory sd.Factory, logger log.Logger) *Subscriber { + logger = log.With(logger, "app", app) + s := &Subscriber{ - client: c, - cache: cache.New(factory, logger), - app: app, - logger: logger, - quitc: make(chan struct{}), + conn: conn, + app: app, + factory: factory, + logger: logger, + cache: cache.New(factory, logger), + quitc: make(chan chan struct{}), } instances, err := s.getInstances() if err == nil { - s.logger.Log("app", s.app, "instances", len(instances)) + s.logger.Log("instances", len(instances)) } else { - s.logger.Log("app", s.app, "msg", "failed to retrieve instances", "err", err) + s.logger.Log("during", "getInstances", "err", err) } s.cache.Update(instances) @@ -46,54 +50,57 @@ return s } -func (s *Subscriber) getInstances() ([]string, error) { - fargoInstances, err := s.client.Instances(s.app) - if err != nil { - return nil, err - } - return convertFargoInstances(fargoInstances), nil -} - -func (s *Subscriber) loop() { - updatec := s.client.ScheduleUpdates(s.app, s.quitc) - for { - select { - case <-s.quitc: - return - case u := <-updatec: - if u.Err != nil { - s.logger.Log("app", s.app, "msg", "failed to retrieve instances", "err", u.Err) - continue - } - - instances := convertFargoApplication(u.App) - s.logger.Log("app", s.app, "instances", len(instances)) - s.cache.Update(instances) - } - } -} - // Endpoints implements the Subscriber interface. func (s *Subscriber) Endpoints() ([]endpoint.Endpoint, error) { return s.cache.Endpoints(), nil } -// Stop terminates the Subscriber. +// Stop terminates the subscriber. func (s *Subscriber) Stop() { - close(s.quitc) + q := make(chan struct{}) + s.quitc <- q + <-q + s.quitc = nil } -func convertFargoApplication(fargoApplication *fargo.Application) (instances []string) { - if fargoApplication != nil { - instances = convertFargoInstances(fargoApplication.Instances) +func (s *Subscriber) loop() { + var ( + await = false + done = make(chan struct{}) + updatec = s.conn.ScheduleAppUpdates(s.app, await, done) + ) + defer close(done) + + for { + select { + case update := <-updatec: + if update.Err != nil { + s.logger.Log("during", "Update", "err", update.Err) + continue + } + instances := convertFargoAppToInstances(update.App) + s.logger.Log("instances", len(instances)) + s.cache.Update(instances) + + case q := <-s.quitc: + close(q) + return + } + } +} + +func (s *Subscriber) getInstances() ([]string, error) { + app, err := s.conn.GetApp(s.app) + if err != nil { + return nil, err + } + return convertFargoAppToInstances(app), nil +} + +func convertFargoAppToInstances(app *fargo.Application) []string { + instances := make([]string, len(app.Instances)) + for i, inst := range app.Instances { + instances[i] = fmt.Sprintf("%s:%d", inst.IPAddr, inst.Port) } return instances } - -func convertFargoInstances(fargoInstances []*fargo.Instance) []string { - instances := make([]string, len(fargoInstances)) - for i, fargoInstance := range fargoInstances { - instances[i] = fmt.Sprintf("%s:%d", fargoInstance.IPAddr, fargoInstance.Port) - } - return instances -} diff --git a/sd/eureka/subscriber_test.go b/sd/eureka/subscriber_test.go index 9c74843..4d0d9b3 100644 --- a/sd/eureka/subscriber_test.go +++ b/sd/eureka/subscriber_test.go @@ -15,16 +15,16 @@ return endpoint.Nop, nil, nil } - client := &testClient{ + connection := &testConnection{ instances: []*fargo.Instance{instanceTest1}, - application: applicationTest, + application: appUpdateTest, errApplication: nil, } - s := NewSubscriber(client, factory, loggerTest, instanceTest1.App) - defer s.Stop() + subscriber := NewSubscriber(connection, appNameTest, factory, loggerTest) + defer subscriber.Stop() - endpoints, err := s.Endpoints() + endpoints, err := subscriber.Endpoints() if err != nil { t.Fatal(err) } @@ -39,23 +39,23 @@ return endpoint.Nop, nil, nil } - client := &testClient{ + connection := &testConnection{ instances: []*fargo.Instance{instanceTest1}, - application: applicationTest, + application: appUpdateTest, errApplication: nil, } - s := NewSubscriber(client, factory, loggerTest, instanceTest1.App) - defer s.Stop() + subscriber := NewSubscriber(connection, appNameTest, factory, loggerTest) + defer subscriber.Stop() - endpoints, _ := s.Endpoints() + endpoints, _ := subscriber.Endpoints() if want, have := 1, len(endpoints); want != have { t.Errorf("want %d, have %d", want, have) } time.Sleep(50 * time.Millisecond) - endpoints, _ = s.Endpoints() + endpoints, _ = subscriber.Endpoints() if want, have := 2, len(endpoints); want != have { t.Errorf("want %v, have %v", want, have) } @@ -66,14 +66,16 @@ return nil, nil, errTest } - client := &testClient{ - instances: []*fargo.Instance{instanceTest1}, + connection := &testConnection{ + instances: []*fargo.Instance{instanceTest1}, + application: appUpdateTest, + errApplication: nil, } - s := NewSubscriber(client, factory, loggerTest, instanceTest1.App) - defer s.Stop() + subscriber := NewSubscriber(connection, appNameTest, factory, loggerTest) + defer subscriber.Stop() - endpoints, err := s.Endpoints() + endpoints, err := subscriber.Endpoints() if err != nil { t.Fatal(err) } @@ -88,16 +90,17 @@ return endpoint.Nop, nil, nil } - client := &testClient{ + connection := &testConnection{ + instances: []*fargo.Instance{}, errInstances: errTest, - application: applicationTest, + application: appUpdateTest, errApplication: nil, } - s := NewSubscriber(client, factory, loggerTest, instanceTest1.App) - defer s.Stop() + subscriber := NewSubscriber(connection, appNameTest, factory, loggerTest) + defer subscriber.Stop() - endpoints, err := s.Endpoints() + endpoints, err := subscriber.Endpoints() if err != nil { t.Fatal(err) } @@ -112,16 +115,16 @@ return endpoint.Nop, nil, nil } - client := &testClient{ + connection := &testConnection{ instances: []*fargo.Instance{instanceTest1}, - application: applicationTest, + application: appUpdateTest, errApplication: errTest, } - s := NewSubscriber(client, factory, loggerTest, instanceTest1.App) - defer s.Stop() + subscriber := NewSubscriber(connection, appNameTest, factory, loggerTest) + defer subscriber.Stop() - endpoints, err := s.Endpoints() + endpoints, err := subscriber.Endpoints() if err != nil { t.Error(err) } @@ -131,7 +134,7 @@ time.Sleep(50 * time.Millisecond) - endpoints, err = s.Endpoints() + endpoints, err = subscriber.Endpoints() if err != nil { t.Error(err) } diff --git a/sd/eureka/util_test.go b/sd/eureka/util_test.go new file mode 100644 index 0000000..a66e712 --- /dev/null +++ b/sd/eureka/util_test.go @@ -0,0 +1,108 @@ +package eureka + +import ( + "errors" + "reflect" + + "github.com/go-kit/kit/log" + "github.com/hudl/fargo" +) + +type testConnection struct { + instances []*fargo.Instance + application *fargo.Application + errInstances error + errApplication error + errHeartbeat error + errRegister error + errDeregister error +} + +var ( + errTest = errors.New("kaboom") + errNotFound = &fargoUnsuccessfulHTTPResponse{statusCode: 404, messagePrefix: "not found"} + loggerTest = log.NewNopLogger() + appNameTest = "go-kit" + appUpdateTest = &fargo.Application{ + Name: appNameTest, + Instances: []*fargo.Instance{instanceTest1, instanceTest2}, + } + instanceTest1 = &fargo.Instance{ + HostName: "serveregistrar1.acme.org", + Port: 8080, + App: appNameTest, + IPAddr: "192.168.0.1", + VipAddress: "192.168.0.1", + SecureVipAddress: "192.168.0.1", + HealthCheckUrl: "http://serveregistrar1.acme.org:8080/healthz", + StatusPageUrl: "http://serveregistrar1.acme.org:8080/status", + HomePageUrl: "http://serveregistrar1.acme.org:8080/", + Status: fargo.UP, + DataCenterInfo: fargo.DataCenterInfo{Name: fargo.MyOwn}, + LeaseInfo: fargo.LeaseInfo{RenewalIntervalInSecs: 1}, + } + instanceTest2 = &fargo.Instance{ + HostName: "serveregistrar2.acme.org", + Port: 8080, + App: appNameTest, + IPAddr: "192.168.0.2", + VipAddress: "192.168.0.2", + SecureVipAddress: "192.168.0.2", + HealthCheckUrl: "http://serveregistrar2.acme.org:8080/healthz", + StatusPageUrl: "http://serveregistrar2.acme.org:8080/status", + HomePageUrl: "http://serveregistrar2.acme.org:8080/", + Status: fargo.UP, + DataCenterInfo: fargo.DataCenterInfo{Name: fargo.MyOwn}, + } +) + +var _ fargoConnection = (*testConnection)(nil) + +func (c *testConnection) RegisterInstance(i *fargo.Instance) error { + if c.errRegister == nil { + for _, instance := range c.instances { + if reflect.DeepEqual(*instance, *i) { + return errors.New("already registered") + } + } + + c.instances = append(c.instances, i) + } + return c.errRegister +} + +func (c *testConnection) HeartBeatInstance(i *fargo.Instance) error { + return c.errHeartbeat +} + +func (c *testConnection) DeregisterInstance(i *fargo.Instance) error { + if c.errDeregister == nil { + var newInstances []*fargo.Instance + for _, instance := range c.instances { + if reflect.DeepEqual(*instance, *i) { + continue + } + newInstances = append(newInstances, instance) + } + if len(newInstances) == len(c.instances) { + return errors.New("not registered") + } + + c.instances = newInstances + } + return c.errDeregister +} + +func (c *testConnection) ReregisterInstance(ins *fargo.Instance) error { + return nil +} + +func (c *testConnection) ScheduleAppUpdates(name string, await bool, done <-chan struct{}) <-chan fargo.AppUpdate { + updatec := make(chan fargo.AppUpdate, 1) + updatec <- fargo.AppUpdate{App: c.application, Err: c.errApplication} + return updatec +} + +func (c *testConnection) GetApp(name string) (*fargo.Application, error) { + return &fargo.Application{Name: appNameTest, Instances: c.instances}, c.errInstances +}