diff --git a/circle.yml b/circle.yml index 0c35bb2..a5acd9c 100644 --- a/circle.yml +++ b/circle.yml @@ -24,3 +24,4 @@ ETCD_ADDR: http://localhost:2379 CONSUL_ADDR: localhost:8500 ZK_ADDR: localhost:2181 + EUREKA_ADDR: http://localhost:8761/eureka diff --git a/docker-compose-integration.yml b/docker-compose-integration.yml index f316a13..287d97d 100644 --- a/docker-compose-integration.yml +++ b/docker-compose-integration.yml @@ -14,3 +14,9 @@ image: zookeeper ports: - "2181:2181" + eureka: + image: springcloud/eureka + environment: + eureka.server.responseCacheUpdateIntervalMs: 1000 + ports: + - "8761:8761" diff --git a/sd/eureka/client.go b/sd/eureka/client.go new file mode 100644 index 0000000..7e6bbcd --- /dev/null +++ b/sd/eureka/client.go @@ -0,0 +1,83 @@ +package eureka + +import ( + stdeureka "github.com/hudl/fargo" + stdeurekalogging "github.com/op/go-logging" +) + +func init() { + // Quieten Fargo's own logging + stdeurekalogging.SetLevel(stdeurekalogging.ERROR, "fargo") +} + +// Client is a wrapper around the Eureka API. +type Client interface { + // Register an instance with Eureka. + Register(i *stdeureka.Instance) error + + // Deregister an instance from Eureka. + Deregister(i *stdeureka.Instance) error + + // Send an instance heartbeat to Eureka. + Heartbeat(i *stdeureka.Instance) error + + // Get all instances for an app in Eureka. + Instances(app string) ([]*stdeureka.Instance, error) + + // Receive scheduled updates about an app's instances in Eureka. + ScheduleUpdates(app string, quitc chan struct{}) <-chan stdeureka.AppUpdate +} + +type client struct { + connection *stdeureka.EurekaConnection +} + +// NewClient returns an implementation of the Client interface, wrapping a +// concrete connection to Eureka using the Fargo library. +// Taking in Fargo's own connection abstraction gives the user maximum +// freedom in regards to how that connection is configured. +func NewClient(ec *stdeureka.EurekaConnection) Client { + return &client{connection: ec} +} + +func (c *client) Register(i *stdeureka.Instance) error { + if c.instanceRegistered(i) { + // Already registered. Send a heartbeat instead. + return c.Heartbeat(i) + } + return c.connection.RegisterInstance(i) +} + +func (c *client) Deregister(i *stdeureka.Instance) error { + return c.connection.DeregisterInstance(i) +} + +func (c *client) Heartbeat(i *stdeureka.Instance) (err error) { + if err = c.connection.HeartBeatInstance(i); err != nil && c.instanceNotFoundErr(err) { + // Instance not registered. Register first before sending heartbeats. + return c.Register(i) + } + return err +} + +func (c *client) Instances(app string) ([]*stdeureka.Instance, error) { + stdApp, err := c.connection.GetApp(app) + if err != nil { + return nil, err + } + return stdApp.Instances, nil +} + +func (c *client) ScheduleUpdates(app string, quitc chan struct{}) <-chan stdeureka.AppUpdate { + return c.connection.ScheduleAppUpdates(app, false, quitc) +} + +func (c *client) instanceRegistered(i *stdeureka.Instance) bool { + _, err := c.connection.GetInstance(i.App, i.Id()) + return err == nil +} + +func (c *client) instanceNotFoundErr(err error) bool { + code, ok := stdeureka.HTTPResponseStatusCode(err) + return ok && code == 404 +} diff --git a/sd/eureka/client_test.go b/sd/eureka/client_test.go new file mode 100644 index 0000000..294bd8b --- /dev/null +++ b/sd/eureka/client_test.go @@ -0,0 +1,94 @@ +package eureka + +import ( + "errors" + "reflect" + + "github.com/go-kit/kit/log" + stdeureka "github.com/hudl/fargo" +) + +var ( + errTest = errors.New("kaboom") + loggerTest = log.NewNopLogger() + instanceTest1 = &stdeureka.Instance{ + HostName: "server1.acme.org", + Port: 8080, + App: "go-kit", + IPAddr: "192.168.0.1", + VipAddress: "192.168.0.1", + SecureVipAddress: "192.168.0.1", + HealthCheckUrl: "http://server1.acme.org:8080/healthz", + StatusPageUrl: "http://server1.acme.org:8080/status", + HomePageUrl: "http://server1.acme.org:8080/", + Status: stdeureka.UP, + DataCenterInfo: stdeureka.DataCenterInfo{Name: stdeureka.MyOwn}, + LeaseInfo: stdeureka.LeaseInfo{RenewalIntervalInSecs: 1}, + } + instanceTest2 = &stdeureka.Instance{ + HostName: "server2.acme.org", + Port: 8080, + App: "go-kit", + IPAddr: "192.168.0.2", + VipAddress: "192.168.0.2", + SecureVipAddress: "192.168.0.2", + HealthCheckUrl: "http://server2.acme.org:8080/healthz", + StatusPageUrl: "http://server2.acme.org:8080/status", + HomePageUrl: "http://server2.acme.org:8080/", + Status: stdeureka.UP, + DataCenterInfo: stdeureka.DataCenterInfo{Name: stdeureka.MyOwn}, + } + applicationTest = &stdeureka.Application{ + Name: "go-kit", + Instances: []*stdeureka.Instance{instanceTest1, instanceTest2}, + } +) + +type testClient struct { + instances []*stdeureka.Instance + application *stdeureka.Application + errInstances error + errApplication error + errHeartbeat error +} + +func (c *testClient) Register(i *stdeureka.Instance) error { + for _, instance := range c.instances { + if reflect.DeepEqual(*instance, *i) { + return errors.New("already registered") + } + } + + c.instances = append(c.instances, i) + return nil +} + +func (c *testClient) Deregister(i *stdeureka.Instance) error { + var newInstances []*stdeureka.Instance + for _, instance := range c.instances { + if reflect.DeepEqual(*instance, *i) { + continue + } + newInstances = append(newInstances, instance) + } + if len(newInstances) == len(c.instances) { + return errors.New("not registered") + } + + c.instances = newInstances + return nil +} + +func (c *testClient) Heartbeat(i *stdeureka.Instance) (err error) { + return c.errHeartbeat +} + +func (c *testClient) Instances(app string) ([]*stdeureka.Instance, error) { + return c.instances, c.errInstances +} + +func (c *testClient) ScheduleUpdates(service string, quitc chan struct{}) <-chan stdeureka.AppUpdate { + updatec := make(chan stdeureka.AppUpdate, 1) + updatec <- stdeureka.AppUpdate{App: c.application, Err: c.errApplication} + return updatec +} diff --git a/sd/eureka/doc.go b/sd/eureka/doc.go new file mode 100644 index 0000000..d41c352 --- /dev/null +++ b/sd/eureka/doc.go @@ -0,0 +1,2 @@ +// Package eureka provides subscriber and registrar implementations for Netflix OSS's Eureka +package eureka diff --git a/sd/eureka/integration_test.go b/sd/eureka/integration_test.go new file mode 100644 index 0000000..02b8b41 --- /dev/null +++ b/sd/eureka/integration_test.go @@ -0,0 +1,106 @@ +// +build integration + +package eureka + +import ( + "io" + "os" + "testing" + "time" + + "github.com/go-kit/kit/endpoint" + "github.com/go-kit/kit/log" + stdeureka "github.com/hudl/fargo" +) + +// Package sd/eureka provides a wrapper around the Netflix Eureka service +// registry by way of the Fargo library. This test assumes the user has an +// instance of Eureka available at the address in the environment variable. +// Example `${EUREKA_ADDR}` format: http://localhost:8761/eureka +// +// NOTE: when starting a Eureka server for integration testing, ensure +// the response cache interval is reduced to one second. This can be +// achieved with the following Java argument: +// `-Deureka.server.responseCacheUpdateIntervalMs=1000` +func TestIntegration(t *testing.T) { + eurekaAddr := os.Getenv("EUREKA_ADDR") + if eurekaAddr == "" { + t.Skip("EUREKA_ADDR is not set") + } + + var client Client + { + var stdConfig stdeureka.Config + stdConfig.Eureka.ServiceUrls = []string{eurekaAddr} + stdConfig.Eureka.PollIntervalSeconds = 1 + + stdConnection := stdeureka.NewConnFromConfig(stdConfig) + client = NewClient(&stdConnection) + } + + logger := log.NewLogfmtLogger(os.Stderr) + logger = log.With(logger, "ts", log.DefaultTimestamp) + + // Register one instance. + registrar1 := NewRegistrar(client, instanceTest1, log.With(logger, "component", "registrar1")) + registrar1.Register() + defer registrar1.Deregister() + + // This should be enough time for the Eureka server response cache to update. + time.Sleep(time.Second) + + // Build a subscriber. + factory := func(instance string) (endpoint.Endpoint, io.Closer, error) { + t.Logf("factory invoked for %q", instance) + return endpoint.Nop, nil, nil + } + s := NewSubscriber( + client, + factory, + log.With(logger, "component", "subscriber"), + instanceTest1.App, + ) + defer s.Stop() + + // We should have one endpoint immediately after subscriber instantiation. + endpoints, err := s.Endpoints() + if err != nil { + t.Error(err) + } + if want, have := 1, len(endpoints); want != have { + t.Errorf("want %d, have %d", want, have) + } + + // Register a second instance + registrar2 := NewRegistrar(client, instanceTest2, log.With(logger, "component", "registrar2")) + registrar2.Register() + defer registrar2.Deregister() // In case of exceptional circumstances. + + // This should be enough time for a scheduled update assuming Eureka is + // configured with the properties mentioned in the function comments. + time.Sleep(2 * time.Second) + + // Now we should have two endpoints. + endpoints, err = s.Endpoints() + if err != nil { + t.Error(err) + } + if want, have := 2, len(endpoints); want != have { + t.Errorf("want %d, have %d", want, have) + } + + // Deregister the second instance. + registrar2.Deregister() + + // Wait for another scheduled update. + time.Sleep(2 * time.Second) + + // And then there was one. + endpoints, err = s.Endpoints() + if err != nil { + t.Error(err) + } + if want, have := 1, len(endpoints); want != have { + t.Errorf("want %d, have %d", want, have) + } +} diff --git a/sd/eureka/registrar.go b/sd/eureka/registrar.go new file mode 100644 index 0000000..5f64291 --- /dev/null +++ b/sd/eureka/registrar.go @@ -0,0 +1,89 @@ +package eureka + +import ( + "fmt" + "sync" + "time" + + stdeureka "github.com/hudl/fargo" + + "github.com/go-kit/kit/log" +) + +// Registrar maintains service instance liveness information in Eureka. +type Registrar struct { + client Client + instance *stdeureka.Instance + logger log.Logger + + quitmtx sync.Mutex + quit chan bool +} + +// NewRegistrar returns an Eureka Registrar acting on behalf of the provided +// Fargo instance. +func NewRegistrar(client Client, i *stdeureka.Instance, l log.Logger) *Registrar { + return &Registrar{ + client: client, + instance: i, + logger: log.With(l, "service", i.App, "address", fmt.Sprintf("%s:%d", i.IPAddr, i.Port)), + } +} + +// Register implements sd.Registrar interface. +func (r *Registrar) Register() { + if err := r.client.Register(r.instance); err != nil { + r.logger.Log("err", err) + } else { + r.logger.Log("action", "register") + } + + if r.instance.LeaseInfo.RenewalIntervalInSecs > 0 { + // User has opted for heartbeat functionality in Eureka. + go r.loop() + } +} + +// Deregister implements sd.Registrar interface. +func (r *Registrar) Deregister() { + if err := r.client.Deregister(r.instance); err != nil { + r.logger.Log("err", err) + } else { + r.logger.Log("action", "deregister") + } + + r.quitmtx.Lock() + defer r.quitmtx.Unlock() + if r.quit != nil { + r.quit <- true + } +} + +func (r *Registrar) loop() { + r.quitmtx.Lock() + if r.quit != nil { + defer r.quitmtx.Unlock() + return // Already running. + } + r.quit = make(chan bool) + r.quitmtx.Unlock() + + tick := time.NewTicker(time.Duration(r.instance.LeaseInfo.RenewalIntervalInSecs) * time.Second) + defer tick.Stop() + for { + select { + case <-tick.C: + if err := r.client.Heartbeat(r.instance); err != nil { + r.logger.Log("err", err) + } + case <-r.quit: + r.quitmtx.Lock() + defer r.quitmtx.Unlock() + + close(r.quit) + r.quit = nil + + return + } + } +} diff --git a/sd/eureka/registrar_test.go b/sd/eureka/registrar_test.go new file mode 100644 index 0000000..9ae7f3d --- /dev/null +++ b/sd/eureka/registrar_test.go @@ -0,0 +1,58 @@ +package eureka + +import ( + "testing" + "time" + + stdeureka "github.com/hudl/fargo" +) + +func TestRegistrar(t *testing.T) { + client := &testClient{ + instances: []*stdeureka.Instance{}, + errHeartbeat: errTest, + } + + r := NewRegistrar(client, instanceTest1, loggerTest) + if want, have := 0, len(client.instances); want != have { + t.Errorf("want %d, have %d", want, have) + } + + // Not registered. + r.Deregister() + if want, have := 0, len(client.instances); want != have { + t.Errorf("want %d, have %d", want, have) + } + + // Register. + r.Register() + if want, have := 1, len(client.instances); want != have { + t.Errorf("want %d, have %d", want, have) + } + + // Deregister. + r.Deregister() + if want, have := 0, len(client.instances); want != have { + t.Errorf("want %d, have %d", want, have) + } + + // Already registered. + r.Register() + if want, have := 1, len(client.instances); want != have { + t.Errorf("want %d, have %d", want, have) + } + r.Register() + if want, have := 1, len(client.instances); want != have { + t.Errorf("want %d, have %d", want, have) + } + + // Wait for a heartbeat failure. + time.Sleep(time.Second) + if want, have := 1, len(client.instances); want != have { + t.Errorf("want %d, have %d", want, have) + } + r.Deregister() + if want, have := 0, len(client.instances); want != have { + t.Errorf("want %d, have %d", want, have) + } +} diff --git a/sd/eureka/subscriber.go b/sd/eureka/subscriber.go new file mode 100644 index 0000000..5873eab --- /dev/null +++ b/sd/eureka/subscriber.go @@ -0,0 +1,99 @@ +package eureka + +import ( + "fmt" + + stdeureka "github.com/hudl/fargo" + + "github.com/go-kit/kit/endpoint" + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/sd" + "github.com/go-kit/kit/sd/cache" +) + +// Subscriber yields endpoints stored in the Eureka registry for the given app. +// Changes in that app are watched and will update the Subscriber endpoints. +type Subscriber struct { + client Client + cache *cache.Cache + logger log.Logger + app string + quitc chan struct{} +} + +var _ sd.Subscriber = &Subscriber{} + +// NewSubscriber returns a Eureka subscriber. It will start watching the given +// app string for changes, and update the endpoints accordingly. +func NewSubscriber(c Client, f sd.Factory, l log.Logger, app string) *Subscriber { + s := &Subscriber{ + client: c, + cache: cache.New(f, l), + app: app, + logger: l, + quitc: make(chan struct{}), + } + + instances, err := s.getInstances() + if err == nil { + s.logger.Log("app", s.app, "instances", len(instances)) + } else { + s.logger.Log("app", s.app, "msg", "failed to retrieve instances", "err", err) + } + + s.cache.Update(instances) + go s.loop() + return s +} + +func (s *Subscriber) getInstances() ([]string, error) { + stdInstances, err := s.client.Instances(s.app) + if err != nil { + return nil, err + } + return convertStdInstances(stdInstances), nil +} + +func (s *Subscriber) loop() { + updatec := s.client.ScheduleUpdates(s.app, s.quitc) + for { + select { + case <-s.quitc: + return + case u := <-updatec: + if u.Err != nil { + s.logger.Log("app", s.app, "msg", "failed to retrieve instances", "err", u.Err) + continue + } + + instances := convertStdApplication(u.App) + s.logger.Log("app", s.app, "instances", len(instances)) + s.cache.Update(instances) + } + } +} + +// Endpoints implements the Subscriber interface. +func (s *Subscriber) Endpoints() ([]endpoint.Endpoint, error) { + return s.cache.Endpoints(), nil +} + +// Stop terminates the Subscriber. +func (s *Subscriber) Stop() { + close(s.quitc) +} + +func convertStdApplication(stdApplication *stdeureka.Application) (instances []string) { + if stdApplication != nil { + instances = convertStdInstances(stdApplication.Instances) + } + return instances +} + +func convertStdInstances(stdInstances []*stdeureka.Instance) []string { + instances := make([]string, len(stdInstances)) + for i, stdInstance := range stdInstances { + instances[i] = fmt.Sprintf("%s:%d", stdInstance.IPAddr, stdInstance.Port) + } + return instances +} diff --git a/sd/eureka/subscriber_test.go b/sd/eureka/subscriber_test.go new file mode 100644 index 0000000..9ef7264 --- /dev/null +++ b/sd/eureka/subscriber_test.go @@ -0,0 +1,140 @@ +package eureka + +import ( + "io" + "testing" + "time" + + "github.com/go-kit/kit/endpoint" + stdeureka "github.com/hudl/fargo" +) + +func TestSubscriber(t *testing.T) { + factory := func(string) (endpoint.Endpoint, io.Closer, error) { + return endpoint.Nop, nil, nil + } + + client := &testClient{ + instances: []*stdeureka.Instance{instanceTest1}, + application: applicationTest, + errApplication: nil, + } + + s := NewSubscriber(client, factory, loggerTest, instanceTest1.App) + defer s.Stop() + + endpoints, err := s.Endpoints() + if err != nil { + t.Fatal(err) + } + + if want, have := 1, len(endpoints); want != have { + t.Errorf("want %d, have %d", want, have) + } +} + +func TestSubscriberScheduleUpdates(t *testing.T) { + factory := func(string) (endpoint.Endpoint, io.Closer, error) { + return endpoint.Nop, nil, nil + } + + client := &testClient{ + instances: []*stdeureka.Instance{instanceTest1}, + application: applicationTest, + errApplication: nil, + } + + s := NewSubscriber(client, factory, loggerTest, instanceTest1.App) + defer s.Stop() + + endpoints, _ := s.Endpoints() + if want, have := 1, len(endpoints); want != have { + t.Errorf("want %d, have %d", want, have) + } + + time.Sleep(50 * time.Millisecond) + + endpoints, _ = s.Endpoints() + if want, have := 2, len(endpoints); want != have { + t.Errorf("want %v, have %v", want, have) + } +} + +func TestBadFactory(t *testing.T) { + factory := func(string) (endpoint.Endpoint, io.Closer, error) { + return nil, nil, errTest + } + + client := &testClient{ + instances: []*stdeureka.Instance{instanceTest1}, + } + + s := NewSubscriber(client, factory, loggerTest, instanceTest1.App) + defer s.Stop() + + endpoints, err := s.Endpoints() + if err != nil { + t.Fatal(err) + } + + if want, have := 0, len(endpoints); want != have { + t.Errorf("want %d, have %d", want, have) + } +} + +func TestBadSubscriberInstances(t *testing.T) { + factory := func(string) (endpoint.Endpoint, io.Closer, error) { + return endpoint.Nop, nil, nil + } + + client := &testClient{ + errInstances: errTest, + application: applicationTest, + errApplication: nil, + } + + s := NewSubscriber(client, factory, loggerTest, instanceTest1.App) + defer s.Stop() + + endpoints, err := s.Endpoints() + if err != nil { + t.Fatal(err) + } + + if want, have := 0, len(endpoints); want != have { + t.Errorf("want %d, have %d", want, have) + } +} + +func TestBadSubscriberScheduleUpdates(t *testing.T) { + factory := func(string) (endpoint.Endpoint, io.Closer, error) { + return endpoint.Nop, nil, nil + } + + client := &testClient{ + instances: []*stdeureka.Instance{instanceTest1}, + application: applicationTest, + errApplication: errTest, + } + + s := NewSubscriber(client, factory, loggerTest, instanceTest1.App) + defer s.Stop() + + endpoints, err := s.Endpoints() + if err != nil { + t.Error(err) + } + if want, have := 1, len(endpoints); want != have { + t.Errorf("want %d, have %d", want, have) + } + + time.Sleep(50 * time.Millisecond) + + endpoints, err = s.Endpoints() + if err != nil { + t.Error(err) + } + if want, have := 1, len(endpoints); want != have { + t.Errorf("want %v, have %v", want, have) + } +}