Codebase list golang-github-go-kit-kit / 5967c5c
Draft of EndpointCache Peter Bourgon 8 years ago
4 changed file(s) with 146 addition(s) and 2 deletion(s). Raw diff Collapse all Expand all
0 package loadbalancer
1
2 import (
3 "sync"
4
5 "github.com/go-kit/kit/endpoint"
6 "github.com/go-kit/kit/log"
7 )
8
9 // EndpointCache TODO
10 type EndpointCache struct {
11 mtx sync.RWMutex
12 f Factory
13 m map[string]endpointCloser
14 logger log.Logger
15 }
16
17 // NewEndpointCache TODO
18 func NewEndpointCache(f Factory, logger log.Logger) *EndpointCache {
19 return &EndpointCache{
20 f: f,
21 m: map[string]endpointCloser{},
22 logger: logger,
23 }
24 }
25
26 type endpointCloser struct {
27 endpoint.Endpoint
28 Closer
29 }
30
31 // Replace TODO
32 func (t *EndpointCache) Replace(instances []string) {
33 t.mtx.Lock()
34 defer t.mtx.Unlock()
35
36 // Produce the current set of endpoints.
37 m := make(map[string]endpointCloser, len(instances))
38 for _, instance := range instances {
39 // If it already exists, just copy it over.
40 if ec, ok := t.m[instance]; ok {
41 m[instance] = ec
42 delete(t.m, instance)
43 continue
44 }
45
46 // If it doesn't exist, create it.
47 endpoint, closer, err := t.f(instance)
48 if err != nil {
49 t.logger.Log("instance", instance, "err", err)
50 continue
51 }
52 m[instance] = endpointCloser{endpoint, closer}
53 }
54
55 // Close any leftover endpoints.
56 for _, ec := range t.m {
57 close(ec.Closer)
58 }
59
60 // Swap and GC.
61 t.m = m
62 }
63
64 // Endpoints TODO
65 func (t *EndpointCache) Endpoints() []endpoint.Endpoint {
66 t.mtx.RLock()
67 defer t.mtx.RUnlock()
68 a := make([]endpoint.Endpoint, 0, len(t.m))
69 for _, ec := range t.m {
70 a = append(a, ec.Endpoint)
71 }
72 return a
73 }
0 package loadbalancer_test
1
2 import (
3 "testing"
4 "time"
5
6 "golang.org/x/net/context"
7
8 "github.com/go-kit/kit/endpoint"
9 "github.com/go-kit/kit/loadbalancer"
10 "github.com/go-kit/kit/log"
11 )
12
13 func TestEndpointCache(t *testing.T) {
14 var (
15 e = func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil }
16 ca = make(loadbalancer.Closer)
17 cb = make(loadbalancer.Closer)
18 c = map[string]loadbalancer.Closer{"a": ca, "b": cb}
19 f = func(s string) (endpoint.Endpoint, loadbalancer.Closer, error) { return e, c[s], nil }
20 ec = loadbalancer.NewEndpointCache(f, log.NewNopLogger())
21 )
22
23 // Populate
24 ec.Replace([]string{"a", "b"})
25 select {
26 case <-ca:
27 t.Errorf("endpoint a closed, not good")
28 case <-cb:
29 t.Errorf("endpoint b closed, not good")
30 case <-time.After(time.Millisecond):
31 t.Logf("no closures yet, good")
32 }
33
34 // Duplicate, should be no-op
35 ec.Replace([]string{"a", "b"})
36 select {
37 case <-ca:
38 t.Errorf("endpoint a closed, not good")
39 case <-cb:
40 t.Errorf("endpoint b closed, not good")
41 case <-time.After(time.Millisecond):
42 t.Logf("no closures yet, good")
43 }
44
45 // Delete b
46 go ec.Replace([]string{"a"})
47 select {
48 case <-ca:
49 t.Errorf("endpoint a closed, not good")
50 case <-cb:
51 t.Logf("endpoint b closed, good")
52 case <-time.After(time.Millisecond):
53 t.Errorf("didn't close the deleted instance in time")
54 }
55
56 // Delete a
57 go ec.Replace([]string{""})
58 select {
59 // case <-cb: will succeed, as it's closed
60 case <-ca:
61 t.Logf("endpoint a closed, good")
62 case <-time.After(time.Millisecond):
63 t.Errorf("didn't close the deleted instance in time")
64 }
65 }
3434 if err != nil {
3535 t.Fatal(err)
3636 }
37 e(ctx, struct{}{})
37 if _, err := e(ctx, struct{}{}); err != nil {
38 t.Error(err)
39 }
3840 }
3941
4042 for i, have := range counts {
3535 if err != nil {
3636 t.Fatal(err)
3737 }
38 e(ctx, struct{}{})
38 if _, err := e(ctx, struct{}{}); err != nil {
39 t.Error(err)
40 }
3941 if have := counts; !reflect.DeepEqual(want, have) {
4042 t.Fatalf("%d: want %v, have %v", i, want, have)
4143 }