Codebase list golang-github-go-kit-kit / 4968948
loadbalancer/dnssrv: use EndpointCache Peter Bourgon 8 years ago
4 changed file(s) with 57 addition(s) and 120 deletion(s). Raw diff Collapse all Expand all
00 package dnssrv
11
22 import (
3 "crypto/md5"
43 "fmt"
54 "net"
6 "sort"
75 "time"
86
97 "github.com/go-kit/kit/endpoint"
1614 type Publisher struct {
1715 name string
1816 ttl time.Duration
19 factory loadbalancer.Factory
17 cache *loadbalancer.EndpointCache
2018 logger log.Logger
2119 endpoints chan []endpoint.Endpoint
2220 quit chan struct{}
2725 // constructor will return an error. The factory is used to convert a
2826 // host:port to a usable endpoint. The logger is used to report DNS and
2927 // factory errors.
30 func NewPublisher(name string, ttl time.Duration, f loadbalancer.Factory, logger log.Logger) (*Publisher, error) {
31 logger = log.NewContext(logger).With("component", "DNS SRV Publisher")
32 addrs, md5, err := resolve(name)
33 if err != nil {
34 return nil, err
35 }
28 func NewPublisher(name string, ttl time.Duration, factory loadbalancer.Factory, logger log.Logger) *Publisher {
3629 p := &Publisher{
3730 name: name,
3831 ttl: ttl,
39 factory: f,
32 cache: loadbalancer.NewEndpointCache(factory, logger),
4033 logger: logger,
4134 endpoints: make(chan []endpoint.Endpoint),
4235 quit: make(chan struct{}),
4336 }
44 go p.loop(makeEndpoints(addrs, f, logger), md5)
45 return p, nil
37
38 instances, err := p.resolve()
39 if err != nil {
40 logger.Log(name, len(instances))
41 } else {
42 logger.Log(name, err)
43 }
44 p.cache.Replace(instances)
45
46 go p.loop()
47 return p
4648 }
4749
4850 // Stop terminates the publisher.
5052 close(p.quit)
5153 }
5254
53 func (p *Publisher) loop(m map[string]endpointCloser, md5 string) {
55 func (p *Publisher) loop() {
5456 t := newTicker(p.ttl)
5557 defer t.Stop()
5658 for {
5759 select {
58 case p.endpoints <- flatten(m):
60 case p.endpoints <- p.cache.Endpoints():
5961
6062 case <-t.C:
61 // TODO should we do this out-of-band?
62 addrs, newmd5, err := resolve(p.name)
63 instances, err := p.resolve()
6364 if err != nil {
64 p.logger.Log("name", p.name, "err", err)
65 continue // don't replace probably-good endpoints with bad ones
65 p.logger.Log(p.name, err)
66 continue // don't replace potentially-good with bad
6667 }
67 if newmd5 == md5 {
68 continue // optimization: no change
69 }
70 m = migrate(m, makeEndpoints(addrs, p.factory, p.logger))
71 md5 = newmd5
68 p.cache.Replace(instances)
7269
7370 case <-p.quit:
7471 return
9188 newTicker = time.NewTicker
9289 )
9390
94 func resolve(name string) (addrs []*net.SRV, md5sum string, err error) {
95 _, addrs, err = lookupSRV("", "", name)
91 func (p *Publisher) resolve() ([]string, error) {
92 _, addrs, err := lookupSRV("", "", p.name)
9693 if err != nil {
97 return addrs, "", err
94 return []string{}, err
9895 }
9996 instances := make([]string, len(addrs))
10097 for i, addr := range addrs {
101 instances[i] = addr2instance(addr)
98 instances[i] = net.JoinHostPort(addr.Target, fmt.Sprint(addr.Port))
10299 }
103 sort.Sort(sort.StringSlice(instances))
104 h := md5.New()
105 for _, instance := range instances {
106 fmt.Fprintf(h, instance)
107 }
108 return addrs, fmt.Sprintf("%x", h.Sum(nil)), nil
100 return instances, nil
109101 }
110
111 func makeEndpoints(addrs []*net.SRV, f loadbalancer.Factory, logger log.Logger) map[string]endpointCloser {
112 m := make(map[string]endpointCloser, len(addrs))
113 for _, addr := range addrs {
114 instance := addr2instance(addr)
115 endpoint, closer, err := f(instance)
116 if err != nil {
117 logger.Log("instance", addr2instance(addr), "err", err)
118 continue
119 }
120 m[instance] = endpointCloser{endpoint, closer}
121 }
122 return m
123 }
124
125 func migrate(prev, curr map[string]endpointCloser) map[string]endpointCloser {
126 for instance, ec := range prev {
127 if _, ok := curr[instance]; !ok {
128 close(ec.Closer)
129 }
130 }
131 return curr
132 }
133
134 func addr2instance(addr *net.SRV) string {
135 return net.JoinHostPort(addr.Target, fmt.Sprint(addr.Port))
136 }
137
138 func flatten(m map[string]endpointCloser) []endpoint.Endpoint {
139 a := make([]endpoint.Endpoint, 0, len(m))
140 for _, ec := range m {
141 a = append(a, ec.Endpoint)
142 }
143 return a
144 }
145
146 type endpointCloser struct {
147 endpoint.Endpoint
148 loadbalancer.Closer
149 }
1515
1616 func TestPublisher(t *testing.T) {
1717 var (
18 target = "my-target"
19 port = uint16(1234)
20 addr = &net.SRV{Target: target, Port: port}
21 addrs = []*net.SRV{addr}
22 name = "my-name"
23 ttl = time.Second
24 logger = log.NewNopLogger()
25 e = func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil }
18 name = "foo"
19 ttl = time.Second
20 e = func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil }
21 c = make(chan struct{})
22 factory = func(string) (endpoint.Endpoint, loadbalancer.Closer, error) { return e, c, nil }
23 logger = log.NewNopLogger()
2624 )
2725
28 oldLookup := lookupSRV
29 defer func() { lookupSRV = oldLookup }()
30 lookupSRV = mockLookupSRV(addrs, nil, nil)
31
32 factory := func(instance string) (endpoint.Endpoint, loadbalancer.Closer, error) {
33 if want, have := addr2instance(addr), instance; want != have {
34 t.Errorf("want %q, have %q", want, have)
35 }
36 return e, make(loadbalancer.Closer), nil
37 }
38
39 p, err := NewPublisher(name, ttl, factory, logger)
40 if err != nil {
41 t.Fatal(err)
42 }
26 p := NewPublisher(name, ttl, factory, logger)
4327 defer p.Stop()
4428
4529 if _, err := p.Endpoints(); err != nil {
5539 var (
5640 name = "some-name"
5741 ttl = time.Second
58 factory = func(string) (endpoint.Endpoint, loadbalancer.Closer, error) { return nil, nil, errors.New("false") }
42 e = func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil }
43 c = make(chan struct{})
44 factory = func(string) (endpoint.Endpoint, loadbalancer.Closer, error) { return e, c, nil }
5945 logger = log.NewNopLogger()
6046 )
6147
62 if _, err := NewPublisher(name, ttl, factory, logger); err == nil {
63 t.Fatal("wanted error, got none")
48 p := NewPublisher(name, ttl, factory, logger)
49 defer p.Stop()
50
51 endpoints, err := p.Endpoints()
52 if err != nil {
53 t.Error(err)
54 }
55 if want, have := 0, len(endpoints); want != have {
56 t.Errorf("want %d, have %d", want, have)
6457 }
6558 }
6659
7871 defer func() { lookupSRV = oldLookup }()
7972 lookupSRV = mockLookupSRV(addrs, nil, nil)
8073
81 p, err := NewPublisher(name, ttl, factory, logger)
82 if err != nil {
83 t.Fatal(err)
84 }
74 p := NewPublisher(name, ttl, factory, logger)
8575 defer p.Stop()
8676
8777 endpoints, err := p.Endpoints()
8878 if err != nil {
89 t.Fatal(err)
79 t.Error(err)
9080 }
9181 if want, have := 0, len(endpoints); want != have {
9282 t.Errorf("want %q, have %q", want, have)
119109 defer func() { lookupSRV = oldLookup }()
120110 lookupSRV = mockLookupSRV(addrs, nil, &resolves)
121111
122 p, err := NewPublisher(name, ttl, factory, logger)
123 if err != nil {
124 t.Fatal(err)
125 }
112 p := NewPublisher(name, ttl, factory, logger)
126113 defer p.Stop()
127114
128115 tick <- time.Now()
147134 defer func() { lookupSRV = oldLookup }()
148135 lookupSRV = mockLookupSRV([]*net.SRV{}, nil, nil)
149136
150 p, err := NewPublisher(name, ttl, factory, logger)
151 if err != nil {
152 t.Fatal(err)
153 }
137 p := NewPublisher(name, ttl, factory, logger)
154138
155139 p.Stop()
156140 _, have := p.Endpoints()
66 "github.com/go-kit/kit/log"
77 )
88
9 // EndpointCache caches resource-managed endpoints. Clients update the cache
10 // by providing a current set of instance strings. The cache converts each
11 // instance string to an endpoint and a closer via the factory function.
9 // EndpointCache caches endpoints that need to be deallocated when they're no
10 // longer useful. Clients update the cache by providing a current set of
11 // instance strings. The cache converts each instance string to an endpoint
12 // and a closer via the factory function.
1213 //
13 // Instance strings are assumed to be unique and used as keys. Endpoints that
14 // were in the previous set of instances and removed from the current set are
15 // considered invalid and closed.
14 // Instance strings are assumed to be unique and are used as keys. Endpoints
15 // that were in the previous set of instances and are not in the current set
16 // are considered invalid and closed.
1617 //
1718 // EndpointCache is designed to be used in your publisher implementation.
1819 type EndpointCache struct {
2930 return &EndpointCache{
3031 f: f,
3132 m: map[string]endpointCloser{},
32 logger: logger,
33 logger: log.NewContext(logger).With("component", "Endpoint Cache"),
3334 }
3435 }
3536
1111
1212 // NewPublisher returns a static endpoint Publisher.
1313 func NewPublisher(instances []string, factory loadbalancer.Factory, logger log.Logger) Publisher {
14 logger = log.NewContext(logger).With("component", "Fixed Publisher")
14 logger = log.NewContext(logger).With("component", "Static Publisher")
1515 endpoints := []endpoint.Endpoint{}
1616 for _, instance := range instances {
1717 e, _, err := factory(instance) // never close