Codebase list golang-github-go-kit-kit / 7f427a8
Manually fix up other publishers and consumers Peter Bourgon 8 years ago
5 changed file(s) with 56 addition(s) and 35 deletion(s). Raw diff Collapse all Expand all
6363 }
6464
6565 func factory(ctx context.Context, qps int) loadbalancer.Factory {
66 return func(instance string) (endpoint.Endpoint, error) {
66 return func(instance string) (endpoint.Endpoint, loadbalancer.Closer, error) {
6767 var e endpoint.Endpoint
6868 e = makeUppercaseProxy(ctx, instance)
6969 e = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{}))(e)
7070 e = kitratelimit.NewTokenBucketLimiter(jujuratelimit.NewBucketWithRate(float64(qps), int64(qps)))(e)
71 return e, nil
71 return e, make(loadbalancer.Closer), nil
7272 }
7373 }
7474
2121 // NewPublisher returs a etcd publisher. Etcd will start watching the given
2222 // prefix for changes and update the Publisher endpoints.
2323 func NewPublisher(c Client, prefix string, f loadbalancer.Factory, logger log.Logger) (*Publisher, error) {
24 logger = log.NewContext(logger).With("component", "Etcd Publisher")
25
24 logger = log.NewContext(logger).With("component", "etcd Publisher")
2625 p := &Publisher{
2726 client: c,
2827 prefix: prefix,
3130 endpoints: make(chan []endpoint.Endpoint),
3231 quit: make(chan struct{}),
3332 }
34
3533 entries, err := p.client.GetEntries(prefix)
3634 if err != nil {
3735 return nil, err
4038 return p, nil
4139 }
4240
43 func (p *Publisher) loop(endpoints []endpoint.Endpoint) {
41 func (p *Publisher) loop(endpoints map[string]endpointCloser) {
4442 responseChan := make(chan *etcd.Response)
4543 go p.client.WatchPrefix(p.prefix, responseChan)
46
4744 for {
4845 select {
49 case p.endpoints <- endpoints:
46 case p.endpoints <- flatten(endpoints):
5047
5148 case <-responseChan:
5249 entries, err := p.client.GetEntries(p.prefix)
5451 p.logger.Log("msg", "failed to retrieve entries", "err", err)
5552 continue
5653 }
57 endpoints = makeEndpoints(entries, p.factory, p.logger)
54 endpoints = migrate(endpoints, makeEndpoints(entries, p.factory, p.logger))
5855
5956 case <-p.quit:
6057 return
7774 close(p.quit)
7875 }
7976
80 func makeEndpoints(addrs []string, f loadbalancer.Factory, logger log.Logger) []endpoint.Endpoint {
81 endpoints := make([]endpoint.Endpoint, 0, len(addrs))
82
77 func makeEndpoints(addrs []string, f loadbalancer.Factory, logger log.Logger) map[string]endpointCloser {
78 m := make(map[string]endpointCloser, len(addrs))
8379 for _, addr := range addrs {
84 endpoint, err := f(addr)
80 if _, ok := m[addr]; ok {
81 continue // duplicate
82 }
83 endpoint, closer, err := f(addr)
8584 if err != nil {
8685 logger.Log("instance", addr, "err", err)
8786 continue
8887 }
89 endpoints = append(endpoints, endpoint)
88 m[addr] = endpointCloser{endpoint, closer}
9089 }
91 return endpoints
90 return m
9291 }
92
93 type endpointCloser struct {
94 endpoint.Endpoint
95 loadbalancer.Closer
96 }
97
98 func migrate(prev, curr map[string]endpointCloser) map[string]endpointCloser {
99 for instance, ec := range prev {
100 if _, ok := curr[instance]; !ok {
101 close(ec.Closer)
102 }
103 }
104 return curr
105 }
106
107 func flatten(m map[string]endpointCloser) []endpoint.Endpoint {
108 a := make([]endpoint.Endpoint, 0, len(m))
109 for _, ec := range m {
110 a = append(a, ec.Endpoint)
111 }
112 return a
113 }
3131 e = func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil }
3232 )
3333
34 factory := func(instance string) (endpoint.Endpoint, error) {
35 return e, nil
34 factory := func(string) (endpoint.Endpoint, loadbalancer.Closer, error) {
35 return e, make(loadbalancer.Closer), nil
3636 }
3737
3838 client := &fakeClient{
5151 }
5252
5353 func TestBadFactory(t *testing.T) {
54 var (
55 logger = log.NewNopLogger()
56 e = func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil }
57 )
54 logger := log.NewNopLogger()
5855
59 factory := func(instance string) (endpoint.Endpoint, error) {
60 return e, errors.New("_")
56 factory := func(string) (endpoint.Endpoint, loadbalancer.Closer, error) {
57 return nil, nil, errors.New("kaboom")
6158 }
6259
6360 client := &fakeClient{
8178 }
8279
8380 func TestPublisherStoppped(t *testing.T) {
84 var (
85 logger = log.NewNopLogger()
86 e = func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil }
87 )
81 logger := log.NewNopLogger()
8882
89 factory := func(instance string) (endpoint.Endpoint, error) {
90 return e, errors.New("_")
83 factory := func(string) (endpoint.Endpoint, loadbalancer.Closer, error) {
84 return nil, nil, errors.New("kaboom")
9185 }
9286
9387 client := &fakeClient{
77 )
88
99 // Publisher yields a set of static endpoints as produced by the passed factory.
10 type Publisher struct{ *fixed.Publisher }
10 type Publisher struct{ publisher *fixed.Publisher }
1111
1212 // NewPublisher returns a static endpoint Publisher.
1313 func NewPublisher(instances []string, factory loadbalancer.Factory, logger log.Logger) Publisher {
1414 logger = log.NewContext(logger).With("component", "Fixed Publisher")
1515 endpoints := []endpoint.Endpoint{}
1616 for _, instance := range instances {
17 e, err := factory(instance)
17 e, _, err := factory(instance) // never close
1818 if err != nil {
1919 _ = logger.Log("instance", instance, "err", err)
2020 continue
2121 }
2222 endpoints = append(endpoints, e)
2323 }
24 return Publisher{fixed.NewPublisher(endpoints)}
24 return Publisher{publisher: fixed.NewPublisher(endpoints)}
2525 }
26
27 // Endpoints implements Publisher.
28 func (p Publisher) Endpoints() ([]endpoint.Endpoint, error) {
29 return p.publisher.Endpoints()
30 }
66 "golang.org/x/net/context"
77
88 "github.com/go-kit/kit/endpoint"
9 "github.com/go-kit/kit/loadbalancer"
910 "github.com/go-kit/kit/loadbalancer/static"
1011 "github.com/go-kit/kit/log"
1112 )
1819 "bar": func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil },
1920 "baz": func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil },
2021 }
21 factory = func(instance string) (endpoint.Endpoint, error) {
22 factory = func(instance string) (endpoint.Endpoint, loadbalancer.Closer, error) {
2223 if e, ok := endpoints[instance]; ok {
23 return e, nil
24 return e, make(loadbalancer.Closer), nil
2425 }
25 return nil, fmt.Errorf("%s: not found", instance)
26 return nil, nil, fmt.Errorf("%s: not found", instance)
2627 }
2728 )
2829 p := static.NewPublisher(instances, factory, log.NewNopLogger())