Use io.Closer, allow nil Closers
Peter Bourgon
8 years ago
2 | 2 | import ( |
3 | 3 | "errors" |
4 | 4 | "fmt" |
5 | "io" | |
5 | 6 | "net/url" |
6 | 7 | "strings" |
7 | 8 | "time" |
63 | 64 | } |
64 | 65 | |
65 | 66 | func factory(ctx context.Context, qps int) loadbalancer.Factory { |
66 | return func(instance string) (endpoint.Endpoint, loadbalancer.Closer, error) { | |
67 | return func(instance string) (endpoint.Endpoint, io.Closer, error) { | |
67 | 68 | var e endpoint.Endpoint |
68 | 69 | e = makeUppercaseProxy(ctx, instance) |
69 | 70 | e = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{}))(e) |
70 | 71 | e = kitratelimit.NewTokenBucketLimiter(jujuratelimit.NewBucketWithRate(float64(qps), int64(qps)))(e) |
71 | return e, make(loadbalancer.Closer), nil | |
72 | return e, nil, nil | |
72 | 73 | } |
73 | 74 | } |
74 | 75 |
1 | 1 | |
2 | 2 | import ( |
3 | 3 | "errors" |
4 | "io" | |
4 | 5 | "net" |
5 | 6 | "sync/atomic" |
6 | 7 | "testing" |
9 | 10 | "golang.org/x/net/context" |
10 | 11 | |
11 | 12 | "github.com/go-kit/kit/endpoint" |
12 | "github.com/go-kit/kit/loadbalancer" | |
13 | 13 | "github.com/go-kit/kit/log" |
14 | 14 | ) |
15 | 15 | |
18 | 18 | name = "foo" |
19 | 19 | ttl = time.Second |
20 | 20 | e = func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil } |
21 | c = make(chan struct{}) | |
22 | factory = func(string) (endpoint.Endpoint, loadbalancer.Closer, error) { return e, c, nil } | |
21 | factory = func(string) (endpoint.Endpoint, io.Closer, error) { return e, nil, nil } | |
23 | 22 | logger = log.NewNopLogger() |
24 | 23 | ) |
25 | 24 | |
40 | 39 | name = "some-name" |
41 | 40 | ttl = time.Second |
42 | 41 | e = func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil } |
43 | c = make(chan struct{}) | |
44 | factory = func(string) (endpoint.Endpoint, loadbalancer.Closer, error) { return e, c, nil } | |
42 | factory = func(string) (endpoint.Endpoint, io.Closer, error) { return e, nil, nil } | |
45 | 43 | logger = log.NewNopLogger() |
46 | 44 | ) |
47 | 45 | |
63 | 61 | addrs = []*net.SRV{addr} |
64 | 62 | name = "some-name" |
65 | 63 | ttl = time.Second |
66 | factory = func(string) (endpoint.Endpoint, loadbalancer.Closer, error) { return nil, nil, errors.New("kaboom") } | |
64 | factory = func(string) (endpoint.Endpoint, io.Closer, error) { return nil, nil, errors.New("kaboom") } | |
67 | 65 | logger = log.NewNopLogger() |
68 | 66 | ) |
69 | 67 | |
96 | 94 | addrs = []*net.SRV{addr} |
97 | 95 | name = "my-name" |
98 | 96 | ttl = time.Second |
99 | factory = func(string) (endpoint.Endpoint, loadbalancer.Closer, error) { return nil, nil, errors.New("kaboom") } | |
97 | factory = func(string) (endpoint.Endpoint, io.Closer, error) { return nil, nil, errors.New("kaboom") } | |
100 | 98 | logger = log.NewNopLogger() |
101 | 99 | ) |
102 | 100 |
0 | 0 | package loadbalancer |
1 | 1 | |
2 | 2 | import ( |
3 | "io" | |
3 | 4 | "sync" |
4 | 5 | |
5 | 6 | "github.com/go-kit/kit/endpoint" |
36 | 37 | |
37 | 38 | type endpointCloser struct { |
38 | 39 | endpoint.Endpoint |
39 | Closer | |
40 | io.Closer | |
40 | 41 | } |
41 | 42 | |
42 | 43 | // Replace replaces the current set of endpoints with endpoints manufactured |
67 | 68 | |
68 | 69 | // Close any leftover endpoints. |
69 | 70 | for _, ec := range t.m { |
70 | close(ec.Closer) | |
71 | if ec.Closer != nil { | |
72 | ec.Closer.Close() | |
73 | } | |
71 | 74 | } |
72 | 75 | |
73 | 76 | // Swap and GC. |
0 | 0 | package loadbalancer_test |
1 | 1 | |
2 | 2 | import ( |
3 | "io" | |
3 | 4 | "testing" |
4 | 5 | "time" |
5 | 6 | |
13 | 14 | func TestEndpointCache(t *testing.T) { |
14 | 15 | var ( |
15 | 16 | e = func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil } |
16 | ca = make(loadbalancer.Closer) | |
17 | cb = make(loadbalancer.Closer) | |
18 | c = map[string]loadbalancer.Closer{"a": ca, "b": cb} | |
19 | f = func(s string) (endpoint.Endpoint, loadbalancer.Closer, error) { return e, c[s], nil } | |
17 | ca = make(closer) | |
18 | cb = make(closer) | |
19 | c = map[string]io.Closer{"a": ca, "b": cb} | |
20 | f = func(s string) (endpoint.Endpoint, io.Closer, error) { return e, c[s], nil } | |
20 | 21 | ec = loadbalancer.NewEndpointCache(f, log.NewNopLogger()) |
21 | 22 | ) |
22 | 23 | |
63 | 64 | t.Errorf("didn't close the deleted instance in time") |
64 | 65 | } |
65 | 66 | } |
67 | ||
68 | type closer chan struct{} | |
69 | ||
70 | func (c closer) Close() error { close(c); return nil } |
1 | 1 | |
2 | 2 | import ( |
3 | 3 | "errors" |
4 | "io" | |
4 | 5 | "testing" |
5 | 6 | |
6 | 7 | stdetcd "github.com/coreos/go-etcd/etcd" |
7 | 8 | "golang.org/x/net/context" |
8 | 9 | |
9 | 10 | "github.com/go-kit/kit/endpoint" |
10 | "github.com/go-kit/kit/loadbalancer" | |
11 | 11 | kitetcd "github.com/go-kit/kit/loadbalancer/etcd" |
12 | 12 | "github.com/go-kit/kit/log" |
13 | 13 | ) |
31 | 31 | e = func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil } |
32 | 32 | ) |
33 | 33 | |
34 | factory := func(string) (endpoint.Endpoint, loadbalancer.Closer, error) { | |
35 | return e, make(loadbalancer.Closer), nil | |
34 | factory := func(string) (endpoint.Endpoint, io.Closer, error) { | |
35 | return e, nil, nil | |
36 | 36 | } |
37 | 37 | |
38 | 38 | client := &fakeClient{ |
53 | 53 | func TestBadFactory(t *testing.T) { |
54 | 54 | logger := log.NewNopLogger() |
55 | 55 | |
56 | factory := func(string) (endpoint.Endpoint, loadbalancer.Closer, error) { | |
56 | factory := func(string) (endpoint.Endpoint, io.Closer, error) { | |
57 | 57 | return nil, nil, errors.New("kaboom") |
58 | 58 | } |
59 | 59 |
0 | 0 | package loadbalancer |
1 | 1 | |
2 | import "github.com/go-kit/kit/endpoint" | |
2 | import ( | |
3 | "io" | |
4 | ||
5 | "github.com/go-kit/kit/endpoint" | |
6 | ) | |
3 | 7 | |
4 | 8 | // Factory is a function that converts an instance string, e.g. a host:port, |
5 | 9 | // to a usable endpoint. Factories are used by load balancers to convert |
7 | 11 | // endpoints. Users are expected to provide their own factory functions that |
8 | 12 | // assume specific transports, or can deduce transports by parsing the |
9 | 13 | // instance string. |
10 | type Factory func(instance string) (endpoint.Endpoint, Closer, error) | |
11 | ||
12 | // Closer is returned by factory functions as a way to close a generated | |
13 | // endpoint. | |
14 | type Closer chan struct{} | |
14 | type Factory func(instance string) (endpoint.Endpoint, io.Closer, error) |
1 | 1 | |
2 | 2 | import ( |
3 | 3 | "fmt" |
4 | "io" | |
4 | 5 | "testing" |
5 | 6 | |
6 | 7 | "golang.org/x/net/context" |
7 | 8 | |
8 | 9 | "github.com/go-kit/kit/endpoint" |
9 | "github.com/go-kit/kit/loadbalancer" | |
10 | 10 | "github.com/go-kit/kit/loadbalancer/static" |
11 | 11 | "github.com/go-kit/kit/log" |
12 | 12 | ) |
19 | 19 | "bar": func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil }, |
20 | 20 | "baz": func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil }, |
21 | 21 | } |
22 | factory = func(instance string) (endpoint.Endpoint, loadbalancer.Closer, error) { | |
22 | factory = func(instance string) (endpoint.Endpoint, io.Closer, error) { | |
23 | 23 | if e, ok := endpoints[instance]; ok { |
24 | return e, make(loadbalancer.Closer), nil | |
24 | return e, nil, nil | |
25 | 25 | } |
26 | 26 | return nil, nil, fmt.Errorf("%s: not found", instance) |
27 | 27 | } |