Codebase list golang-github-go-kit-kit / 576ad0b
Add README and Retry component Peter Bourgon 8 years ago
8 changed file(s) with 264 addition(s) and 8 deletion(s). Raw diff Collapse all Expand all
0 # package loadbalancer
1
2 `package loadbalancer` provides a client-side load balancer abstraction.
3
4 A publisher is responsible for emitting the most recent set of endpoints for a
5 single logical service. Publishers exist for static endpoints, and endpoints
6 discovered via periodic DNS SRV lookups on a single logical name. Consul and
7 etcd publishers are planned.
8
9 Different load balancers are implemented on top of publishers. Go kit
10 currently provides random and round-robin load balancers. Smarter behaviors,
11 e.g. load balancing based on underlying endpoint priority/weight, is planned.
12
13 ## Rationale
14
15 TODO
16
17 ## Usage
18
19 In your client, construct a publisher for a specific remote service, and pass
20 it to a load balancer. Then, request an endpoint from the load balancer
21 whenever you need to make a request to that remote service.
22
23 ```go
24 import (
25 "github.com/go-kit/kit/loadbalancer"
26 "github.com/go-kit/kit/loadbalancer/dnssrv"
27 )
28
29 func main() {
30 // Construct a load balancer for foosvc, which gets foosvc instances by
31 // polling a specific DNS SRV name.
32 p := dnssrv.NewPublisher("foosvc.internal.domain", 5*time.Second, fooFactory, logger)
33 lb := loadbalancer.NewRoundRobin(p)
34
35 // Get a new endpoint from the load balancer.
36 endpoint, err := lb.Endpoint()
37 if err != nil {
38 panic(err)
39 }
40
41 // Use the endpoint to make a request.
42 response, err := endpoint(ctx, request)
43 }
44
45 func fooFactory(instance string) (endpoint.Endpoint, error) {
46 // Convert an instance (host:port) to an endpoint, via a defined transport binding.
47 }
48 ```
49
50 It's also possible to wrap a load balancer with a retry strategy, so that it
51 can used as an endpoint directly. This may make load balancers more convenient
52 to use, at the cost of fine-grained control of failure.
53
54 ```go
55 func main() {
56 p := dnssrv.NewPublisher("foosvc.internal.domain", 5*time.Second, fooFactory, logger)
57 lb := loadbalancer.NewRoundRobin(p)
58 endpoint := loadbalancer.Retry(3, 5*time.Seconds, lb)
59
60 response, err := endpoint(ctx, request) // requests will be automatically load balanced
61 }
62 ```
00 package loadbalancer
11
2 import "errors"
2 import (
3 "errors"
4
5 "github.com/go-kit/kit/endpoint"
6 )
7
8 // LoadBalancer describes something that can yield endpoints for a remote
9 // service method.
10 type LoadBalancer interface {
11 Endpoint() (endpoint.Endpoint, error)
12 }
313
414 // ErrNoEndpoints is returned when a load balancer (or one of its components)
515 // has no endpoints to return. In a request lifecycle, this is usually a fatal
2727 endpoints[i] = func(context.Context, interface{}) (interface{}, error) { counts[i0]++; return struct{}{}, nil }
2828 }
2929
30 lb := loadbalancer.NewRandom(static.Publisher(endpoints), seed)
30 lb := loadbalancer.NewRandom(static.NewPublisher(endpoints), seed)
3131
3232 for i := 0; i < iterations; i++ {
3333 e, err := lb.Endpoint()
4949 }
5050
5151 func TestRandomNoEndpoints(t *testing.T) {
52 lb := loadbalancer.NewRandom(static.Publisher([]endpoint.Endpoint{}), 123)
52 lb := loadbalancer.NewRandom(static.NewPublisher([]endpoint.Endpoint{}), 123)
5353 _, have := lb.Endpoint()
5454 if want := loadbalancer.ErrNoEndpoints; want != have {
5555 t.Errorf("want %q, have %q", want, have)
0 package loadbalancer
1
2 import (
3 "fmt"
4 "strings"
5 "time"
6
7 "golang.org/x/net/context"
8
9 "github.com/go-kit/kit/endpoint"
10 )
11
12 // Retry wraps the load balancer to make it behave like a simple endpoint.
13 // Requests to the endpoint will be automatically load balanced via the load
14 // balancer. Requests that return errors will be retried until they succeed,
15 // up to max times, or until the timeout is elapsed, whichever comes first.
16 func Retry(max int, timeout time.Duration, lb LoadBalancer) endpoint.Endpoint {
17 return func(ctx context.Context, request interface{}) (interface{}, error) {
18 var (
19 newctx, cancel = context.WithTimeout(ctx, timeout)
20 responses = make(chan interface{}, 1)
21 errs = make(chan error, 1)
22 a = []string{}
23 )
24 defer cancel()
25 for i := 1; i <= max; i++ {
26 go func() {
27 e, err := lb.Endpoint()
28 if err != nil {
29 errs <- err
30 return
31 }
32 response, err := e(newctx, request)
33 if err != nil {
34 errs <- err
35 return
36 }
37 responses <- response
38 }()
39
40 select {
41 case <-newctx.Done():
42 return nil, newctx.Err()
43 case response := <-responses:
44 return response, nil
45 case err := <-errs:
46 a = append(a, err.Error())
47 continue
48 }
49 }
50 return nil, fmt.Errorf("retry attempts exceeded (%s)", strings.Join(a, "; "))
51 }
52 }
0 package loadbalancer_test
1
2 import (
3 "errors"
4 "testing"
5 "time"
6
7 "golang.org/x/net/context"
8
9 "github.com/go-kit/kit/endpoint"
10 "github.com/go-kit/kit/loadbalancer"
11 "github.com/go-kit/kit/loadbalancer/static"
12 )
13
14 func TestRetryMaxTotalFail(t *testing.T) {
15 var (
16 endpoints = []endpoint.Endpoint{} // no endpoints
17 p = static.NewPublisher(endpoints)
18 lb = loadbalancer.NewRoundRobin(p)
19 retry = loadbalancer.Retry(999, time.Second, lb) // lots of retries
20 ctx = context.Background()
21 )
22 if _, err := retry(ctx, struct{}{}); err == nil {
23 t.Errorf("expected error, got none") // should fail
24 }
25 }
26
27 func TestRetryMaxPartialFail(t *testing.T) {
28 var (
29 endpoints = []endpoint.Endpoint{
30 func(context.Context, interface{}) (interface{}, error) { return nil, errors.New("error one") },
31 func(context.Context, interface{}) (interface{}, error) { return nil, errors.New("error two") },
32 func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil /* OK */ },
33 }
34 retries = len(endpoints) - 1 // not quite enough retries
35 p = static.NewPublisher(endpoints)
36 lb = loadbalancer.NewRoundRobin(p)
37 ctx = context.Background()
38 )
39 if _, err := loadbalancer.Retry(retries, time.Second, lb)(ctx, struct{}{}); err == nil {
40 t.Errorf("expected error, got none")
41 }
42 }
43
44 func TestRetryMaxSuccess(t *testing.T) {
45 var (
46 endpoints = []endpoint.Endpoint{
47 func(context.Context, interface{}) (interface{}, error) { return nil, errors.New("error one") },
48 func(context.Context, interface{}) (interface{}, error) { return nil, errors.New("error two") },
49 func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil /* OK */ },
50 }
51 retries = len(endpoints) // exactly enough retries
52 p = static.NewPublisher(endpoints)
53 lb = loadbalancer.NewRoundRobin(p)
54 ctx = context.Background()
55 )
56 if _, err := loadbalancer.Retry(retries, time.Second, lb)(ctx, struct{}{}); err != nil {
57 t.Error(err)
58 }
59 }
60
61 func TestRetryTimeout(t *testing.T) {
62 var (
63 step = make(chan struct{})
64 e = func(context.Context, interface{}) (interface{}, error) { <-step; return struct{}{}, nil }
65 timeout = time.Millisecond
66 retry = loadbalancer.Retry(999, timeout, loadbalancer.NewRoundRobin(static.NewPublisher([]endpoint.Endpoint{e})))
67 errs = make(chan error, 1)
68 invoke = func() { _, err := retry(context.Background(), struct{}{}); errs <- err }
69 )
70
71 go func() { step <- struct{}{} }() // queue up a flush of the endpoint
72 invoke() // invoke the endpoint and trigger the flush
73 if err := <-errs; err != nil { // that should succeed
74 t.Error(err)
75 }
76
77 go func() { time.Sleep(10 * timeout); step <- struct{}{} }() // a delayed flush
78 invoke() // invoke the endpoint
79 if err := <-errs; err != context.DeadlineExceeded { // that should not succeed
80 t.Errorf("wanted %v, got none", context.DeadlineExceeded)
81 }
82 }
2020 }
2121 )
2222
23 lb := loadbalancer.NewRoundRobin(static.Publisher(endpoints))
23 lb := loadbalancer.NewRoundRobin(static.NewPublisher(endpoints))
2424
2525 for i, want := range [][]int{
2626 {1, 0, 0},
00 package static
11
2 import "github.com/go-kit/kit/endpoint"
2 import (
3 "sync"
4
5 "github.com/go-kit/kit/endpoint"
6 )
37
48 // Publisher yields the same set of static endpoints.
5 type Publisher []endpoint.Endpoint
9 type Publisher struct {
10 mtx sync.RWMutex
11 endpoints []endpoint.Endpoint
12 }
13
14 // NewPublisher returns a static endpoint Publisher.
15 func NewPublisher(endpoints []endpoint.Endpoint) *Publisher {
16 return &Publisher{
17 endpoints: endpoints,
18 }
19 }
620
721 // Endpoints implements the Publisher interface.
8 func (p Publisher) Endpoints() ([]endpoint.Endpoint, error) { return p, nil }
22 func (p *Publisher) Endpoints() ([]endpoint.Endpoint, error) {
23 p.mtx.RLock()
24 defer p.mtx.RUnlock()
25 return p.endpoints, nil
26 }
27
28 // Replace is a utility method to swap out the underlying endpoints of an
29 // existing static publisher. It's useful mostly for testing.
30 func (p *Publisher) Replace(endpoints []endpoint.Endpoint) {
31 p.mtx.Lock()
32 defer p.mtx.Unlock()
33 p.endpoints = endpoints
34 }
1515 e2 = func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil }
1616 endpoints = []endpoint.Endpoint{e1, e2}
1717 )
18 p := static.Publisher(endpoints)
18 p := static.NewPublisher(endpoints)
1919 have, err := p.Endpoints()
2020 if err != nil {
2121 t.Fatal(err)
2424 t.Fatalf("want %#+v, have %#+v", want, have)
2525 }
2626 }
27
28 func TestStaticReplace(t *testing.T) {
29 p := static.NewPublisher([]endpoint.Endpoint{
30 func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil },
31 })
32 have, err := p.Endpoints()
33 if err != nil {
34 t.Fatal(err)
35 }
36 if want, have := 1, len(have); want != have {
37 t.Fatalf("want %d, have %d", want, have)
38 }
39 p.Replace([]endpoint.Endpoint{})
40 have, err = p.Endpoints()
41 if err != nil {
42 t.Fatal(err)
43 }
44 if want, have := 0, len(have); want != have {
45 t.Fatalf("want %d, have %d", want, have)
46 }
47 }