Codebase list golang-github-go-kit-kit / 63e2346
Merge morganhein/kit rossmcf 7 years ago
2 changed file(s) with 109 addition(s) and 42 deletion(s). Raw diff Collapse all Expand all
00 package lb
11
22 import (
3 "fmt"
4 "strings"
5 "time"
3 "fmt"
4 "strings"
5 "time"
66
7 "golang.org/x/net/context"
7 "golang.org/x/net/context"
88
9 "github.com/go-kit/kit/endpoint"
9 "github.com/go-kit/kit/endpoint"
1010 )
11
12 // Callback function that indicates the current attempt count and the error encountered.
13 // Should return whether the Retry function should continue trying, and a custom
14 // error message if desired. The error message may be nil, but a true/false
15 // is always expected. In all cases if the error message is supplied, the
16 // current error will be replaced.
17 type Callback func(int, string) (bool, *string)
1118
1219 // Retry wraps a service load balancer and returns an endpoint oriented load
1320 // balancer for the specified service method.
1522 // balancer. Requests that return errors will be retried until they succeed,
1623 // up to max times, or until the timeout is elapsed, whichever comes first.
1724 func Retry(max int, timeout time.Duration, b Balancer) endpoint.Endpoint {
18 if b == nil {
19 panic("nil Balancer")
20 }
21 return func(ctx context.Context, request interface{}) (response interface{}, err error) {
22 var (
23 newctx, cancel = context.WithTimeout(ctx, timeout)
24 responses = make(chan interface{}, 1)
25 errs = make(chan error, 1)
26 a = []string{}
27 )
28 defer cancel()
29 for i := 1; i <= max; i++ {
30 go func() {
31 e, err := b.Endpoint()
32 if err != nil {
33 errs <- err
34 return
35 }
36 response, err := e(newctx, request)
37 if err != nil {
38 errs <- err
39 return
40 }
41 responses <- response
42 }()
25 return RetryWithCallback(max, timeout, b, func(c int, s string) (bool, *string) { return true, nil })
26 }
4327
44 select {
45 case <-newctx.Done():
46 return nil, newctx.Err()
47 case response := <-responses:
48 return response, nil
49 case err := <-errs:
50 a = append(a, err.Error())
51 continue
52 }
53 }
54 return nil, fmt.Errorf("retry attempts exceeded (%s)", strings.Join(a, "; "))
55 }
28 func RetryWithCallback(max int, timeout time.Duration, b Balancer, cb Callback) endpoint.Endpoint {
29 if cb == nil {
30 panic("nil Callback")
31 }
32 if b == nil {
33 panic("nil Balancer")
34 }
35 return func(ctx context.Context, request interface{}) (response interface{}, err error) {
36 var (
37 newctx, cancel = context.WithTimeout(ctx, timeout)
38 responses = make(chan interface{}, 1)
39 errs = make(chan error, 1)
40 a = []string{}
41 )
42 defer cancel()
43 for i := 1; i <= max; i++ {
44 go func() {
45 e, err := b.Endpoint()
46 if err != nil {
47 errs <- err
48 return
49 }
50 response, err := e(newctx, request)
51 if err != nil {
52 errs <- err
53 return
54 }
55 responses <- response
56 }()
57
58 select {
59 case <-newctx.Done():
60 return nil, newctx.Err()
61 case response := <-responses:
62 return response, nil
63 case err := <-errs:
64 cont, cbErr := cb(i, err.Error())
65 if !cont {
66 if cbErr == nil {
67 return nil, fmt.Errorf("retry attempts exceeded (%s)", strings.Join(a, "; "))
68 } else {
69 return nil, fmt.Errorf(*cbErr)
70 }
71 }
72 currentErr := err.Error()
73 if cbErr != nil {
74 currentErr = *cbErr
75 }
76 a = append(a, currentErr)
77 continue
78 }
79 }
80 return nil, fmt.Errorf("retry attempts exceeded (%s)", strings.Join(a, "; "))
81 }
5682 }
8787 t.Errorf("wanted %v, got none", context.DeadlineExceeded)
8888 }
8989 }
90
91 func AbortEarlyCustomMessage_WCB(t *testing.T) {
92 var (
93 cb = func(count int, msg string) (bool, *string) {
94 ret := "Aborting early"
95 return false, &ret
96 }
97 endpoints = sd.FixedSubscriber{} // no endpoints
98 lb = loadbalancer.NewRoundRobin(endpoints)
99 retry = loadbalancer.RetryWithCallback(999, time.Second, lb, cb) // lots of retries
100 ctx = context.Background()
101 )
102 _, err := retry(ctx, struct{}{})
103 if err == nil {
104 t.Errorf("expected error, got none") // should fail
105 }
106 if err.Error() != "Aborting early" {
107 t.Errorf("expected custom error message, got %v", err)
108 }
109 }
110
111 func AbortEarlyOnNTries_WCB(t *testing.T) {
112 var (
113 cb = func(count int, msg string) (bool, *string) {
114 if (count >= 4) {
115 t.Errorf("expected retries to abort at 3 but continued to %v", count)
116 }
117 if (count == 3) {
118 return false, nil
119 }
120 return true, nil
121 }
122 endpoints = sd.FixedSubscriber{} // no endpoints
123 lb = loadbalancer.NewRoundRobin(endpoints)
124 retry = loadbalancer.RetryWithCallback(999, time.Second, lb, cb) // lots of retries
125 ctx = context.Background()
126 )
127 if _, err := retry(ctx, struct{}{}); err == nil {
128 t.Errorf("expected error, got none") // should fail
129 }
130 }