Codebase list golang-github-go-kit-kit / 60b4d80
Merge pull request #243 from jerome-laforge/master EndpointCache return sorted Endpoints Peter Bourgon 8 years ago
2 changed file(s) with 59 addition(s) and 17 deletion(s). Raw diff Collapse all Expand all
11
22 import (
33 "io"
4 "sort"
45 "sync"
6 "sync/atomic"
57
68 "github.com/go-kit/kit/endpoint"
79 "github.com/go-kit/kit/log"
1820 //
1921 // EndpointCache is designed to be used in your publisher implementation.
2022 type EndpointCache struct {
21 mtx sync.RWMutex
23 mtx sync.Mutex
2224 f Factory
2325 m map[string]endpointCloser
26 cache atomic.Value //[]endpoint.Endpoint
2427 logger log.Logger
2528 }
2629
2831 // strings will be converted to endpoints via the provided factory function.
2932 // The logger is used to log errors.
3033 func NewEndpointCache(f Factory, logger log.Logger) *EndpointCache {
31 return &EndpointCache{
34 endpointCache := &EndpointCache{
3235 f: f,
3336 m: map[string]endpointCloser{},
3437 logger: log.NewContext(logger).With("component", "Endpoint Cache"),
3538 }
39
40 endpointCache.cache.Store(make([]endpoint.Endpoint, 0))
41
42 return endpointCache
3643 }
3744
3845 type endpointCloser struct {
4855 defer t.mtx.Unlock()
4956
5057 // Produce the current set of endpoints.
51 m := make(map[string]endpointCloser, len(instances))
58 oldMap := t.m
59 t.m = make(map[string]endpointCloser, len(instances))
5260 for _, instance := range instances {
5361 // If it already exists, just copy it over.
54 if ec, ok := t.m[instance]; ok {
55 m[instance] = ec
56 delete(t.m, instance)
62 if ec, ok := oldMap[instance]; ok {
63 t.m[instance] = ec
64 delete(oldMap, instance)
5765 continue
5866 }
5967
6371 t.logger.Log("instance", instance, "err", err)
6472 continue
6573 }
66 m[instance] = endpointCloser{endpoint, closer}
74 t.m[instance] = endpointCloser{endpoint, closer}
6775 }
6876
77 t.refreshCache()
78
6979 // Close any leftover endpoints.
70 for _, ec := range t.m {
80 for _, ec := range oldMap {
7181 if ec.Closer != nil {
7282 ec.Closer.Close()
7383 }
7484 }
85 }
7586
76 // Swap and GC.
77 t.m = m
87 func (t *EndpointCache) refreshCache() {
88 var (
89 length = len(t.m)
90 instances = make([]string, 0, length)
91 newCache = make([]endpoint.Endpoint, 0, length)
92 )
93
94 for instance, _ := range t.m {
95 instances = append(instances, instance)
96 }
97 // Sort the instances for ensuring that Endpoints are returned into the same order if no modified.
98 sort.Strings(instances)
99
100 for _, instance := range instances {
101 newCache = append(newCache, t.m[instance].Endpoint)
102 }
103
104 t.cache.Store(newCache)
78105 }
79106
80107 // Endpoints returns the current set of endpoints in undefined order. Satisfies
81108 // Publisher interface.
82109 func (t *EndpointCache) Endpoints() ([]endpoint.Endpoint, error) {
83 t.mtx.RLock()
84 defer t.mtx.RUnlock()
85 a := make([]endpoint.Endpoint, 0, len(t.m))
86 for _, ec := range t.m {
87 a = append(a, ec.Endpoint)
88 }
89 return a, nil
110 return t.cache.Load().([]endpoint.Endpoint), nil
90111 }
6868 type closer chan struct{}
6969
7070 func (c closer) Close() error { close(c); return nil }
71
72 func BenchmarkEndpoints(b *testing.B) {
73 var (
74 e = func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil }
75 ca = make(closer)
76 cb = make(closer)
77 c = map[string]io.Closer{"a": ca, "b": cb}
78 f = func(s string) (endpoint.Endpoint, io.Closer, error) { return e, c[s], nil }
79 ec = loadbalancer.NewEndpointCache(f, log.NewNopLogger())
80 )
81
82 b.ReportAllocs()
83
84 ec.Replace([]string{"a", "b"})
85
86 b.RunParallel(func(pb *testing.PB) {
87 for pb.Next() {
88 ec.Endpoints()
89 }
90 })
91 }