Codebase list golang-github-go-kit-kit / 546eee8
loadbalancer: first draft at Closer for endpoints It's implemented directly in dnssrv, but it should probably be generalized and consumed by all publishers. The semantics will be very similar, independent of source-of-truth. Peter Bourgon 8 years ago
3 changed file(s) with 49 addition(s) and 22 deletion(s). Raw diff Collapse all Expand all
5050 close(p.quit)
5151 }
5252
53 func (p *Publisher) loop(endpoints []endpoint.Endpoint, md5 string) {
53 func (p *Publisher) loop(m map[string]endpointCloser, md5 string) {
5454 t := newTicker(p.ttl)
5555 defer t.Stop()
5656 for {
5757 select {
58 case p.endpoints <- endpoints:
58 case p.endpoints <- flatten(m):
5959
6060 case <-t.C:
6161 // TODO should we do this out-of-band?
6262 addrs, newmd5, err := resolve(p.name)
6363 if err != nil {
6464 p.logger.Log("name", p.name, "err", err)
65 continue // don't replace good endpoints with bad ones
65 continue // don't replace probably-good endpoints with bad ones
6666 }
6767 if newmd5 == md5 {
68 continue // no change
68 continue // optimization: no change
6969 }
70 endpoints = makeEndpoints(addrs, p.factory, p.logger)
70 m = migrate(m, makeEndpoints(addrs, p.factory, p.logger))
7171 md5 = newmd5
7272
7373 case <-p.quit:
9696 if err != nil {
9797 return addrs, "", err
9898 }
99 hostports := make([]string, len(addrs))
99 instances := make([]string, len(addrs))
100100 for i, addr := range addrs {
101 hostports[i] = fmt.Sprintf("%s:%d", addr.Target, addr.Port)
101 instances[i] = addr2instance(addr)
102102 }
103 sort.Sort(sort.StringSlice(hostports))
103 sort.Sort(sort.StringSlice(instances))
104104 h := md5.New()
105 for _, hostport := range hostports {
106 fmt.Fprintf(h, hostport)
105 for _, instance := range instances {
106 fmt.Fprintf(h, instance)
107107 }
108108 return addrs, fmt.Sprintf("%x", h.Sum(nil)), nil
109109 }
110110
111 func makeEndpoints(addrs []*net.SRV, f loadbalancer.Factory, logger log.Logger) []endpoint.Endpoint {
112 endpoints := make([]endpoint.Endpoint, 0, len(addrs))
111 func makeEndpoints(addrs []*net.SRV, f loadbalancer.Factory, logger log.Logger) map[string]endpointCloser {
112 m := make(map[string]endpointCloser, len(addrs))
113113 for _, addr := range addrs {
114 endpoint, err := f(addr2instance(addr))
114 instance := addr2instance(addr)
115 endpoint, closer, err := f(instance)
115116 if err != nil {
116117 logger.Log("instance", addr2instance(addr), "err", err)
117118 continue
118119 }
119 endpoints = append(endpoints, endpoint)
120 m[instance] = endpointCloser{endpoint, closer}
120121 }
121 return endpoints
122 return m
123 }
124
125 func migrate(prev, curr map[string]endpointCloser) map[string]endpointCloser {
126 for instance, ec := range prev {
127 if _, ok := curr[instance]; !ok {
128 close(ec.Closer)
129 }
130 }
131 return curr
122132 }
123133
124134 func addr2instance(addr *net.SRV) string {
125135 return net.JoinHostPort(addr.Target, fmt.Sprint(addr.Port))
126136 }
137
138 func flatten(m map[string]endpointCloser) []endpoint.Endpoint {
139 a := make([]endpoint.Endpoint, 0, len(m))
140 for _, ec := range m {
141 a = append(a, ec.Endpoint)
142 }
143 return a
144 }
145
146 type endpointCloser struct {
147 endpoint.Endpoint
148 loadbalancer.Closer
149 }
2929 defer func() { lookupSRV = oldLookup }()
3030 lookupSRV = mockLookupSRV(addrs, nil, nil)
3131
32 factory := func(instance string) (endpoint.Endpoint, error) {
32 factory := func(instance string) (endpoint.Endpoint, loadbalancer.Closer, error) {
3333 if want, have := addr2instance(addr), instance; want != have {
3434 t.Errorf("want %q, have %q", want, have)
3535 }
36 return e, nil
36 return e, make(loadbalancer.Closer), nil
3737 }
3838
3939 p, err := NewPublisher(name, ttl, factory, logger)
5555 var (
5656 name = "some-name"
5757 ttl = time.Second
58 factory = func(string) (endpoint.Endpoint, error) { return nil, errors.New("unreachable") }
58 factory = func(string) (endpoint.Endpoint, loadbalancer.Closer, error) { return nil, nil, errors.New("false") }
5959 logger = log.NewNopLogger()
6060 )
6161
7070 addrs = []*net.SRV{addr}
7171 name = "some-name"
7272 ttl = time.Second
73 factory = func(string) (endpoint.Endpoint, error) { return nil, errors.New("kaboom") }
73 factory = func(string) (endpoint.Endpoint, loadbalancer.Closer, error) { return nil, nil, errors.New("kaboom") }
7474 logger = log.NewNopLogger()
7575 )
7676
106106 addrs = []*net.SRV{addr}
107107 name = "my-name"
108108 ttl = time.Second
109 factory = func(string) (endpoint.Endpoint, error) { return nil, errors.New("kaboom") }
109 factory = func(string) (endpoint.Endpoint, loadbalancer.Closer, error) { return nil, nil, errors.New("kaboom") }
110110 logger = log.NewNopLogger()
111111 )
112112
139139 var (
140140 name = "my-name"
141141 ttl = time.Second
142 factory = func(string) (endpoint.Endpoint, error) { return nil, errors.New("kaboom") }
142 factory = func(string) (endpoint.Endpoint, loadbalancer.Closer, error) { return nil, nil, errors.New("kaboom") }
143143 logger = log.NewNopLogger()
144144 )
145145
77 // endpoints. Users are expected to provide their own factory functions that
88 // assume specific transports, or can deduce transports by parsing the
99 // instance string.
10 type Factory func(instance string) (endpoint.Endpoint, error)
10 type Factory func(instance string) (endpoint.Endpoint, Closer, error)
11
12 // Closer is returned by factory functions as a way to close a generated
13 // endpoint.
14 type Closer chan struct{}