Codebase list golang-github-go-kit-kit / d2ef7d0
loadbalancer/etcd: use EndpointCache Peter Bourgon 8 years ago
3 changed file(s) with 28 addition(s) and 95 deletion(s). Raw diff Collapse all Expand all
66 "github.com/coreos/go-etcd/etcd"
77 )
88
9 // Client is a wrapper arround the etcd client
9 // Client is a wrapper arround the etcd client.
1010 type Client interface {
1111 // GetEntries will query the given prefix in etcd and returns a set of entries.
1212 GetEntries(prefix string) ([]string, error)
4242 return &client{c}, nil
4343 }
4444
45 // GetEntries implements the EtcdClient interface.
45 // GetEntries implements the etcd Client interface.
4646 func (c *client) GetEntries(key string) ([]string, error) {
4747 resp, err := c.Get(key, false, true)
4848 if err != nil {
5656 return entries, nil
5757 }
5858
59 // WatchPrefix implements the EtcdClient interface.
59 // WatchPrefix implements the etcd Client interface.
6060 func (c *client) WatchPrefix(prefix string, responseChan chan *etcd.Response) {
6161 c.Watch(prefix, 0, true, responseChan, nil)
6262 }
88 )
99
1010 // Publisher yield endpoints stored in a certain etcd keyspace. Any kind of
11 // change in that keyspace is watched and wil update the Publisher endpoints.
11 // change in that keyspace is watched and will update the Publisher endpoints.
1212 type Publisher struct {
13 client Client
14 prefix string
15 factory loadbalancer.Factory
16 logger log.Logger
17 endpoints chan []endpoint.Endpoint
18 quit chan struct{}
13 client Client
14 prefix string
15 cache *loadbalancer.EndpointCache
16 logger log.Logger
17 quit chan struct{}
1918 }
2019
2120 // NewPublisher returs a etcd publisher. Etcd will start watching the given
2221 // prefix for changes and update the Publisher endpoints.
2322 func NewPublisher(c Client, prefix string, f loadbalancer.Factory, logger log.Logger) (*Publisher, error) {
24 logger = log.NewContext(logger).With("component", "etcd Publisher")
2523 p := &Publisher{
26 client: c,
27 prefix: prefix,
28 factory: f,
29 logger: logger,
30 endpoints: make(chan []endpoint.Endpoint),
31 quit: make(chan struct{}),
24 client: c,
25 prefix: prefix,
26 cache: loadbalancer.NewEndpointCache(f, logger),
27 logger: logger,
28 quit: make(chan struct{}),
3229 }
33 entries, err := p.client.GetEntries(prefix)
34 if err != nil {
35 return nil, err
30
31 instances, err := p.client.GetEntries(p.prefix)
32 if err == nil {
33 logger.Log(p.prefix, len(instances))
34 } else {
35 logger.Log("msg", "failed to retrieve entries", "err", err)
3636 }
37 go p.loop(makeEndpoints(entries, f, logger))
37 p.cache.Replace(instances)
38
39 go p.loop()
3840 return p, nil
3941 }
4042
41 func (p *Publisher) loop(endpoints map[string]endpointCloser) {
43 func (p *Publisher) loop() {
4244 responseChan := make(chan *etcd.Response)
4345 go p.client.WatchPrefix(p.prefix, responseChan)
4446 for {
4547 select {
46 case p.endpoints <- flatten(endpoints):
47
4848 case <-responseChan:
49 entries, err := p.client.GetEntries(p.prefix)
49 instances, err := p.client.GetEntries(p.prefix)
5050 if err != nil {
5151 p.logger.Log("msg", "failed to retrieve entries", "err", err)
5252 continue
5353 }
54 endpoints = migrate(endpoints, makeEndpoints(entries, p.factory, p.logger))
54 p.cache.Replace(instances)
5555
5656 case <-p.quit:
5757 return
6161
6262 // Endpoints implements the Publisher interface.
6363 func (p *Publisher) Endpoints() ([]endpoint.Endpoint, error) {
64 select {
65 case endpoints := <-p.endpoints:
66 return endpoints, nil
67 case <-p.quit:
68 return nil, loadbalancer.ErrPublisherStopped
69 }
64 return p.cache.Endpoints(), nil
7065 }
7166
72 // Stop terminates the publisher.
67 // Stop terminates the Publisher.
7368 func (p *Publisher) Stop() {
7469 close(p.quit)
7570 }
76
77 func makeEndpoints(addrs []string, f loadbalancer.Factory, logger log.Logger) map[string]endpointCloser {
78 m := make(map[string]endpointCloser, len(addrs))
79 for _, addr := range addrs {
80 if _, ok := m[addr]; ok {
81 continue // duplicate
82 }
83 endpoint, closer, err := f(addr)
84 if err != nil {
85 logger.Log("instance", addr, "err", err)
86 continue
87 }
88 m[addr] = endpointCloser{endpoint, closer}
89 }
90 return m
91 }
92
93 type endpointCloser struct {
94 endpoint.Endpoint
95 loadbalancer.Closer
96 }
97
98 func migrate(prev, curr map[string]endpointCloser) map[string]endpointCloser {
99 for instance, ec := range prev {
100 if _, ok := curr[instance]; !ok {
101 close(ec.Closer)
102 }
103 }
104 return curr
105 }
106
107 func flatten(m map[string]endpointCloser) []endpoint.Endpoint {
108 a := make([]endpoint.Endpoint, 0, len(m))
109 for _, ec := range m {
110 a = append(a, ec.Endpoint)
111 }
112 return a
113 }
7777 }
7878 }
7979
80 func TestPublisherStoppped(t *testing.T) {
81 logger := log.NewNopLogger()
82
83 factory := func(string) (endpoint.Endpoint, loadbalancer.Closer, error) {
84 return nil, nil, errors.New("kaboom")
85 }
86
87 client := &fakeClient{
88 responses: map[string]*stdetcd.Response{"/foo": fakeResponse},
89 }
90
91 p, err := kitetcd.NewPublisher(client, "/foo", factory, logger)
92 if err != nil {
93 t.Fatalf("failed to create new publisher: %v", err)
94 }
95
96 p.Stop()
97
98 _, have := p.Endpoints()
99 if want := loadbalancer.ErrPublisherStopped; want != have {
100 t.Fatalf("want %v, have %v", want, have)
101 }
102 }
103
10480 type fakeClient struct {
10581 responses map[string]*stdetcd.Response
10682 }