Codebase list golang-github-go-kit-kit / 72a93ad
Initial PR tidy-up based on feedback Martin Baillie 6 years ago
7 changed file(s) with 75 addition(s) and 93 deletion(s). Raw diff Collapse all Expand all
00 package eureka
11
22 import (
3 stdeureka "github.com/hudl/fargo"
4 stdeurekalogging "github.com/op/go-logging"
3 fargo "github.com/hudl/fargo"
54 )
6
7 func init() {
8 // Quieten Fargo's own logging
9 stdeurekalogging.SetLevel(stdeurekalogging.ERROR, "fargo")
10 }
115
126 // Client is a wrapper around the Eureka API.
137 type Client interface {
148 // Register an instance with Eureka.
15 Register(i *stdeureka.Instance) error
9 Register(i *fargo.Instance) error
1610
1711 // Deregister an instance from Eureka.
18 Deregister(i *stdeureka.Instance) error
12 Deregister(i *fargo.Instance) error
1913
2014 // Send an instance heartbeat to Eureka.
21 Heartbeat(i *stdeureka.Instance) error
15 Heartbeat(i *fargo.Instance) error
2216
2317 // Get all instances for an app in Eureka.
24 Instances(app string) ([]*stdeureka.Instance, error)
18 Instances(app string) ([]*fargo.Instance, error)
2519
2620 // Receive scheduled updates about an app's instances in Eureka.
27 ScheduleUpdates(app string, quitc chan struct{}) <-chan stdeureka.AppUpdate
21 ScheduleUpdates(app string, quitc chan struct{}) <-chan fargo.AppUpdate
2822 }
2923
3024 type client struct {
31 connection *stdeureka.EurekaConnection
25 connection *fargo.EurekaConnection
3226 }
3327
3428 // NewClient returns an implementation of the Client interface, wrapping a
3529 // concrete connection to Eureka using the Fargo library.
3630 // Taking in Fargo's own connection abstraction gives the user maximum
3731 // freedom in regards to how that connection is configured.
38 func NewClient(ec *stdeureka.EurekaConnection) Client {
32 func NewClient(ec *fargo.EurekaConnection) Client {
3933 return &client{connection: ec}
4034 }
4135
42 func (c *client) Register(i *stdeureka.Instance) error {
36 func (c *client) Register(i *fargo.Instance) error {
4337 if c.instanceRegistered(i) {
4438 // Already registered. Send a heartbeat instead.
4539 return c.Heartbeat(i)
4741 return c.connection.RegisterInstance(i)
4842 }
4943
50 func (c *client) Deregister(i *stdeureka.Instance) error {
44 func (c *client) Deregister(i *fargo.Instance) error {
5145 return c.connection.DeregisterInstance(i)
5246 }
5347
54 func (c *client) Heartbeat(i *stdeureka.Instance) (err error) {
48 func (c *client) Heartbeat(i *fargo.Instance) (err error) {
5549 if err = c.connection.HeartBeatInstance(i); err != nil && c.instanceNotFoundErr(err) {
5650 // Instance not registered. Register first before sending heartbeats.
5751 return c.Register(i)
5953 return err
6054 }
6155
62 func (c *client) Instances(app string) ([]*stdeureka.Instance, error) {
56 func (c *client) Instances(app string) ([]*fargo.Instance, error) {
6357 stdApp, err := c.connection.GetApp(app)
6458 if err != nil {
6559 return nil, err
6761 return stdApp.Instances, nil
6862 }
6963
70 func (c *client) ScheduleUpdates(app string, quitc chan struct{}) <-chan stdeureka.AppUpdate {
64 func (c *client) ScheduleUpdates(app string, quitc chan struct{}) <-chan fargo.AppUpdate {
7165 return c.connection.ScheduleAppUpdates(app, false, quitc)
7266 }
7367
74 func (c *client) instanceRegistered(i *stdeureka.Instance) bool {
68 func (c *client) instanceRegistered(i *fargo.Instance) bool {
7569 _, err := c.connection.GetInstance(i.App, i.Id())
7670 return err == nil
7771 }
7872
7973 func (c *client) instanceNotFoundErr(err error) bool {
80 code, ok := stdeureka.HTTPResponseStatusCode(err)
74 code, ok := fargo.HTTPResponseStatusCode(err)
8175 return ok && code == 404
8276 }
33 "errors"
44 "reflect"
55
6 fargo "github.com/hudl/fargo"
7
68 "github.com/go-kit/kit/log"
7 stdeureka "github.com/hudl/fargo"
89 )
910
1011 var (
1112 errTest = errors.New("kaboom")
1213 loggerTest = log.NewNopLogger()
13 instanceTest1 = &stdeureka.Instance{
14 instanceTest1 = &fargo.Instance{
1415 HostName: "server1.acme.org",
1516 Port: 8080,
1617 App: "go-kit",
2021 HealthCheckUrl: "http://server1.acme.org:8080/healthz",
2122 StatusPageUrl: "http://server1.acme.org:8080/status",
2223 HomePageUrl: "http://server1.acme.org:8080/",
23 Status: stdeureka.UP,
24 DataCenterInfo: stdeureka.DataCenterInfo{Name: stdeureka.MyOwn},
25 LeaseInfo: stdeureka.LeaseInfo{RenewalIntervalInSecs: 1},
24 Status: fargo.UP,
25 DataCenterInfo: fargo.DataCenterInfo{Name: fargo.MyOwn},
26 LeaseInfo: fargo.LeaseInfo{RenewalIntervalInSecs: 1},
2627 }
27 instanceTest2 = &stdeureka.Instance{
28 instanceTest2 = &fargo.Instance{
2829 HostName: "server2.acme.org",
2930 Port: 8080,
3031 App: "go-kit",
3435 HealthCheckUrl: "http://server2.acme.org:8080/healthz",
3536 StatusPageUrl: "http://server2.acme.org:8080/status",
3637 HomePageUrl: "http://server2.acme.org:8080/",
37 Status: stdeureka.UP,
38 DataCenterInfo: stdeureka.DataCenterInfo{Name: stdeureka.MyOwn},
38 Status: fargo.UP,
39 DataCenterInfo: fargo.DataCenterInfo{Name: fargo.MyOwn},
3940 }
40 applicationTest = &stdeureka.Application{
41 applicationTest = &fargo.Application{
4142 Name: "go-kit",
42 Instances: []*stdeureka.Instance{instanceTest1, instanceTest2},
43 Instances: []*fargo.Instance{instanceTest1, instanceTest2},
4344 }
4445 )
4546
4647 type testClient struct {
47 instances []*stdeureka.Instance
48 application *stdeureka.Application
48 instances []*fargo.Instance
49 application *fargo.Application
4950 errInstances error
5051 errApplication error
5152 errHeartbeat error
5253 }
5354
54 func (c *testClient) Register(i *stdeureka.Instance) error {
55 func (c *testClient) Register(i *fargo.Instance) error {
5556 for _, instance := range c.instances {
5657 if reflect.DeepEqual(*instance, *i) {
5758 return errors.New("already registered")
6263 return nil
6364 }
6465
65 func (c *testClient) Deregister(i *stdeureka.Instance) error {
66 var newInstances []*stdeureka.Instance
66 func (c *testClient) Deregister(i *fargo.Instance) error {
67 var newInstances []*fargo.Instance
6768 for _, instance := range c.instances {
6869 if reflect.DeepEqual(*instance, *i) {
6970 continue
7879 return nil
7980 }
8081
81 func (c *testClient) Heartbeat(i *stdeureka.Instance) (err error) {
82 func (c *testClient) Heartbeat(i *fargo.Instance) (err error) {
8283 return c.errHeartbeat
8384 }
8485
85 func (c *testClient) Instances(app string) ([]*stdeureka.Instance, error) {
86 func (c *testClient) Instances(app string) ([]*fargo.Instance, error) {
8687 return c.instances, c.errInstances
8788 }
8889
89 func (c *testClient) ScheduleUpdates(service string, quitc chan struct{}) <-chan stdeureka.AppUpdate {
90 updatec := make(chan stdeureka.AppUpdate, 1)
91 updatec <- stdeureka.AppUpdate{App: c.application, Err: c.errApplication}
90 func (c *testClient) ScheduleUpdates(service string, quitc chan struct{}) <-chan fargo.AppUpdate {
91 updatec := make(chan fargo.AppUpdate, 1)
92 updatec <- fargo.AppUpdate{App: c.application, Err: c.errApplication}
9293 return updatec
9394 }
77 "testing"
88 "time"
99
10 fargo "github.com/hudl/fargo"
11
1012 "github.com/go-kit/kit/endpoint"
1113 "github.com/go-kit/kit/log"
12 stdeureka "github.com/hudl/fargo"
1314 )
1415
1516 // Package sd/eureka provides a wrapper around the Netflix Eureka service
2930
3031 var client Client
3132 {
32 var stdConfig stdeureka.Config
33 stdConfig.Eureka.ServiceUrls = []string{eurekaAddr}
34 stdConfig.Eureka.PollIntervalSeconds = 1
33 var fargoConfig fargo.Config
34 fargoConfig.Eureka.ServiceUrls = []string{eurekaAddr}
35 fargoConfig.Eureka.PollIntervalSeconds = 1
3536
36 stdConnection := stdeureka.NewConnFromConfig(stdConfig)
37 client = NewClient(&stdConnection)
37 fargoConnection := fargo.NewConnFromConfig(fargoConfig)
38 client = NewClient(&fargoConnection)
3839 }
3940
4041 logger := log.NewLogfmtLogger(os.Stderr)
11
22 import (
33 "fmt"
4 "sync"
54 "time"
65
7 stdeureka "github.com/hudl/fargo"
6 fargo "github.com/hudl/fargo"
87
98 "github.com/go-kit/kit/log"
109 )
1211 // Registrar maintains service instance liveness information in Eureka.
1312 type Registrar struct {
1413 client Client
15 instance *stdeureka.Instance
14 instance *fargo.Instance
1615 logger log.Logger
17
18 quitmtx sync.Mutex
19 quit chan bool
16 quit chan bool
2017 }
2118
2219 // NewRegistrar returns an Eureka Registrar acting on behalf of the provided
2320 // Fargo instance.
24 func NewRegistrar(client Client, i *stdeureka.Instance, l log.Logger) *Registrar {
21 func NewRegistrar(client Client, i *fargo.Instance, logger log.Logger) *Registrar {
2522 return &Registrar{
2623 client: client,
2724 instance: i,
28 logger: log.With(l, "service", i.App, "address", fmt.Sprintf("%s:%d", i.IPAddr, i.Port)),
25 logger: log.With(logger, "service", i.App, "address", fmt.Sprintf("%s:%d", i.IPAddr, i.Port)),
2926 }
3027 }
3128
3936
4037 if r.instance.LeaseInfo.RenewalIntervalInSecs > 0 {
4138 // User has opted for heartbeat functionality in Eureka.
42 go r.loop()
39 if r.quit == nil {
40 r.quit = make(chan bool)
41 go r.loop()
42 }
4343 }
4444 }
4545
5151 r.logger.Log("action", "deregister")
5252 }
5353
54 r.quitmtx.Lock()
55 defer r.quitmtx.Unlock()
5654 if r.quit != nil {
5755 r.quit <- true
56 r.quit = nil
5857 }
5958 }
6059
6160 func (r *Registrar) loop() {
62 r.quitmtx.Lock()
63 if r.quit != nil {
64 defer r.quitmtx.Unlock()
65 return // Already running.
66 }
67 r.quit = make(chan bool)
68 r.quitmtx.Unlock()
69
7061 tick := time.NewTicker(time.Duration(r.instance.LeaseInfo.RenewalIntervalInSecs) * time.Second)
7162 defer tick.Stop()
7263 for {
7667 r.logger.Log("err", err)
7768 }
7869 case <-r.quit:
79 r.quitmtx.Lock()
80 defer r.quitmtx.Unlock()
81
82 close(r.quit)
83 r.quit = nil
84
8570 return
8671 }
8772 }
33 "testing"
44 "time"
55
6 stdeureka "github.com/hudl/fargo"
6 fargo "github.com/hudl/fargo"
77 )
88
99 func TestRegistrar(t *testing.T) {
1010 client := &testClient{
11 instances: []*stdeureka.Instance{},
11 instances: []*fargo.Instance{},
1212 errHeartbeat: errTest,
1313 }
1414
22 import (
33 "fmt"
44
5 stdeureka "github.com/hudl/fargo"
5 fargo "github.com/hudl/fargo"
66
77 "github.com/go-kit/kit/endpoint"
88 "github.com/go-kit/kit/log"
2424
2525 // NewSubscriber returns a Eureka subscriber. It will start watching the given
2626 // app string for changes, and update the endpoints accordingly.
27 func NewSubscriber(c Client, f sd.Factory, l log.Logger, app string) *Subscriber {
27 func NewSubscriber(c Client, factory sd.Factory, logger log.Logger, app string) *Subscriber {
2828 s := &Subscriber{
2929 client: c,
30 cache: cache.New(f, l),
30 cache: cache.New(factory, logger),
3131 app: app,
32 logger: l,
32 logger: logger,
3333 quitc: make(chan struct{}),
3434 }
3535
4646 }
4747
4848 func (s *Subscriber) getInstances() ([]string, error) {
49 stdInstances, err := s.client.Instances(s.app)
49 fargoInstances, err := s.client.Instances(s.app)
5050 if err != nil {
5151 return nil, err
5252 }
53 return convertStdInstances(stdInstances), nil
53 return convertFargoInstances(fargoInstances), nil
5454 }
5555
5656 func (s *Subscriber) loop() {
6565 continue
6666 }
6767
68 instances := convertStdApplication(u.App)
68 instances := convertFargoApplication(u.App)
6969 s.logger.Log("app", s.app, "instances", len(instances))
7070 s.cache.Update(instances)
7171 }
8282 close(s.quitc)
8383 }
8484
85 func convertStdApplication(stdApplication *stdeureka.Application) (instances []string) {
86 if stdApplication != nil {
87 instances = convertStdInstances(stdApplication.Instances)
85 func convertFargoApplication(fargoApplication *fargo.Application) (instances []string) {
86 if fargoApplication != nil {
87 instances = convertFargoInstances(fargoApplication.Instances)
8888 }
8989 return instances
9090 }
9191
92 func convertStdInstances(stdInstances []*stdeureka.Instance) []string {
93 instances := make([]string, len(stdInstances))
94 for i, stdInstance := range stdInstances {
95 instances[i] = fmt.Sprintf("%s:%d", stdInstance.IPAddr, stdInstance.Port)
92 func convertFargoInstances(fargoInstances []*fargo.Instance) []string {
93 instances := make([]string, len(fargoInstances))
94 for i, fargoInstance := range fargoInstances {
95 instances[i] = fmt.Sprintf("%s:%d", fargoInstance.IPAddr, fargoInstance.Port)
9696 }
9797 return instances
9898 }
44 "testing"
55 "time"
66
7 fargo "github.com/hudl/fargo"
8
79 "github.com/go-kit/kit/endpoint"
8 stdeureka "github.com/hudl/fargo"
910 )
1011
1112 func TestSubscriber(t *testing.T) {
1415 }
1516
1617 client := &testClient{
17 instances: []*stdeureka.Instance{instanceTest1},
18 instances: []*fargo.Instance{instanceTest1},
1819 application: applicationTest,
1920 errApplication: nil,
2021 }
3839 }
3940
4041 client := &testClient{
41 instances: []*stdeureka.Instance{instanceTest1},
42 instances: []*fargo.Instance{instanceTest1},
4243 application: applicationTest,
4344 errApplication: nil,
4445 }
6566 }
6667
6768 client := &testClient{
68 instances: []*stdeureka.Instance{instanceTest1},
69 instances: []*fargo.Instance{instanceTest1},
6970 }
7071
7172 s := NewSubscriber(client, factory, loggerTest, instanceTest1.App)
111112 }
112113
113114 client := &testClient{
114 instances: []*stdeureka.Instance{instanceTest1},
115 instances: []*fargo.Instance{instanceTest1},
115116 application: applicationTest,
116117 errApplication: errTest,
117118 }