sd/lb: updates to Retry changes
- RetryError captures more sophisticated error behaviors
- Simplify the error branch in the switch significantly
- Clean up the tests a little bit
- Comment cleanup and rewrap
Peter Bourgon
7 years ago
9 | 9 | "github.com/go-kit/kit/endpoint" |
10 | 10 | ) |
11 | 11 | |
12 | // RetryError is an error wrapper that is used by the retry mechanism. All | |
13 | // errors returned by the retry mechanism via its endpoint will be RetryErrors. | |
14 | type RetryError struct { | |
15 | RawErrors []error // all errors encountered from endpoints directly | |
16 | Final error // the final, terminating error | |
17 | } | |
18 | ||
19 | func (e RetryError) Error() string { | |
20 | var suffix string | |
21 | if len(e.RawErrors) > 1 { | |
22 | a := make([]string, len(e.RawErrors)-1) | |
23 | for i := 0; i < len(e.RawErrors)-1; i++ { // last one is Final | |
24 | a[i] = e.RawErrors[i].Error() | |
25 | } | |
26 | suffix = fmt.Sprintf(" (previously: %s)", strings.Join(a, "; ")) | |
27 | } | |
28 | return fmt.Sprintf("%v%s", e.Final, suffix) | |
29 | } | |
30 | ||
12 | 31 | // Callback is a function that is given the current attempt count and the error |
13 | // encountered. Should return whether the Retry function should continue trying, | |
14 | // and a custom error message if desired. The error message may be nil, but a | |
15 | // true/false is always expected. In all cases if the error message is supplied, | |
16 | // the current error will be replaced. | |
17 | type Callback func(n int, received error) (keepTrying bool, cbErr error) | |
32 | // received from the underlying endpoint. It should return whether the Retry | |
33 | // function should continue trying to get a working endpoint, and a custom error | |
34 | // if desired. The error message may be nil, but a true/false is always | |
35 | // expected. In all cases, if the replacement error is supplied, the received | |
36 | // error will be replaced in the calling context. | |
37 | type Callback func(n int, received error) (keepTrying bool, replacement error) | |
18 | 38 | |
19 | 39 | // Retry wraps a service load balancer and returns an endpoint oriented load |
20 | // balancer for the specified service method. | |
21 | // Requests to the endpoint will be automatically load balanced via the load | |
22 | // balancer. Requests that return errors will be retried until they succeed, | |
23 | // up to max times, or until the timeout is elapsed, whichever comes first. | |
40 | // balancer for the specified service method. Requests to the endpoint will be | |
41 | // automatically load balanced via the load balancer. Requests that return | |
42 | // errors will be retried until they succeed, up to max times, or until the | |
43 | // timeout is elapsed, whichever comes first. | |
24 | 44 | func Retry(max int, timeout time.Duration, b Balancer) endpoint.Endpoint { |
25 | 45 | return RetryWithCallback(timeout, b, maxRetries(max)) |
26 | 46 | } |
27 | 47 | |
28 | // maxRetries returns a callback function that enforces max retries. | |
29 | 48 | func maxRetries(max int) Callback { |
30 | return func(n int, err error) (bool, error) { | |
31 | if n < max { | |
32 | return true, nil | |
33 | } | |
34 | return false, nil | |
49 | return func(n int, err error) (keepTrying bool, replacement error) { | |
50 | return n < max, nil | |
35 | 51 | } |
36 | 52 | } |
37 | 53 | |
38 | // RetryWithCallback wraps a service load balancer and returns an endpoint oriented load | |
39 | // balancer for the specified service method. | |
40 | // Requests to the endpoint will be automatically load balanced via the load | |
41 | // balancer. Requests that return errors will be retried until they succeed, | |
42 | // up to max times, until the callback returns false, or until the timeout is elapsed, | |
43 | // whichever comes first. | |
54 | func alwaysRetry(int, error) (keepTrying bool, replacement error) { | |
55 | return true, nil | |
56 | } | |
57 | ||
58 | // RetryWithCallback wraps a service load balancer and returns an endpoint | |
59 | // oriented load balancer for the specified service method. Requests to the | |
60 | // endpoint will be automatically load balanced via the load balancer. Requests | |
61 | // that return errors will be retried until they succeed, up to max times, until | |
62 | // the callback returns false, or until the timeout is elapsed, whichever comes | |
63 | // first. | |
44 | 64 | func RetryWithCallback(timeout time.Duration, b Balancer, cb Callback) endpoint.Endpoint { |
45 | 65 | if cb == nil { |
46 | cb = func(n int, err error) (bool, error) { | |
47 | return true, nil | |
48 | } | |
66 | cb = alwaysRetry | |
49 | 67 | } |
50 | 68 | if b == nil { |
51 | 69 | panic("nil Balancer") |
52 | 70 | } |
71 | ||
53 | 72 | return func(ctx context.Context, request interface{}) (response interface{}, err error) { |
54 | 73 | var ( |
55 | 74 | newctx, cancel = context.WithTimeout(ctx, timeout) |
56 | 75 | responses = make(chan interface{}, 1) |
57 | 76 | errs = make(chan error, 1) |
58 | a = []string{} | |
77 | final RetryError | |
59 | 78 | ) |
60 | 79 | defer cancel() |
80 | ||
61 | 81 | for i := 1; ; i++ { |
62 | 82 | go func() { |
63 | 83 | e, err := b.Endpoint() |
76 | 96 | select { |
77 | 97 | case <-newctx.Done(): |
78 | 98 | return nil, newctx.Err() |
99 | ||
79 | 100 | case response := <-responses: |
80 | 101 | return response, nil |
102 | ||
81 | 103 | case err := <-errs: |
82 | cont, cbErr := cb(i, err) | |
83 | if !cont { | |
84 | if cbErr == nil { | |
85 | return nil, fmt.Errorf("retry attempts exceeded (%s)", strings.Join(a, "; ")) | |
86 | } | |
87 | return nil, cbErr | |
104 | final.RawErrors = append(final.RawErrors, err) | |
105 | keepTrying, replacement := cb(i, err) | |
106 | if replacement != nil { | |
107 | err = replacement | |
88 | 108 | } |
89 | currentErr := err.Error() | |
90 | if cbErr != nil { | |
91 | currentErr = cbErr.Error() | |
109 | if !keepTrying { | |
110 | final.Final = err | |
111 | return nil, final | |
92 | 112 | } |
93 | a = append(a, currentErr) | |
94 | 113 | continue |
95 | 114 | } |
96 | 115 | } |
1 | 1 | |
2 | 2 | import ( |
3 | 3 | "errors" |
4 | "fmt" | |
5 | 4 | "testing" |
6 | 5 | "time" |
7 | 6 | |
9 | 8 | |
10 | 9 | "github.com/go-kit/kit/endpoint" |
11 | 10 | "github.com/go-kit/kit/sd" |
12 | loadbalancer "github.com/go-kit/kit/sd/lb" | |
11 | "github.com/go-kit/kit/sd/lb" | |
13 | 12 | ) |
14 | 13 | |
15 | 14 | func TestRetryMaxTotalFail(t *testing.T) { |
16 | 15 | var ( |
17 | 16 | endpoints = sd.FixedSubscriber{} // no endpoints |
18 | lb = loadbalancer.NewRoundRobin(endpoints) | |
19 | retry = loadbalancer.Retry(999, time.Second, lb) // lots of retries | |
17 | rr = lb.NewRoundRobin(endpoints) | |
18 | retry = lb.Retry(999, time.Second, rr) // lots of retries | |
20 | 19 | ctx = context.Background() |
21 | 20 | ) |
22 | 21 | if _, err := retry(ctx, struct{}{}); err == nil { |
37 | 36 | 2: endpoints[2], |
38 | 37 | } |
39 | 38 | retries = len(endpoints) - 1 // not quite enough retries |
40 | lb = loadbalancer.NewRoundRobin(subscriber) | |
39 | rr = lb.NewRoundRobin(subscriber) | |
41 | 40 | ctx = context.Background() |
42 | 41 | ) |
43 | if _, err := loadbalancer.Retry(retries, time.Second, lb)(ctx, struct{}{}); err == nil { | |
42 | if _, err := lb.Retry(retries, time.Second, rr)(ctx, struct{}{}); err == nil { | |
44 | 43 | t.Errorf("expected error two, got none") |
45 | 44 | } |
46 | 45 | } |
58 | 57 | 2: endpoints[2], |
59 | 58 | } |
60 | 59 | retries = len(endpoints) // exactly enough retries |
61 | lb = loadbalancer.NewRoundRobin(subscriber) | |
60 | rr = lb.NewRoundRobin(subscriber) | |
62 | 61 | ctx = context.Background() |
63 | 62 | ) |
64 | if _, err := loadbalancer.Retry(retries, time.Second, lb)(ctx, struct{}{}); err != nil { | |
63 | if _, err := lb.Retry(retries, time.Second, rr)(ctx, struct{}{}); err != nil { | |
65 | 64 | t.Error(err) |
66 | 65 | } |
67 | 66 | } |
71 | 70 | step = make(chan struct{}) |
72 | 71 | e = func(context.Context, interface{}) (interface{}, error) { <-step; return struct{}{}, nil } |
73 | 72 | timeout = time.Millisecond |
74 | retry = loadbalancer.Retry(999, timeout, loadbalancer.NewRoundRobin(sd.FixedSubscriber{0: e})) | |
73 | retry = lb.Retry(999, timeout, lb.NewRoundRobin(sd.FixedSubscriber{0: e})) | |
75 | 74 | errs = make(chan error, 1) |
76 | 75 | invoke = func() { _, err := retry(context.Background(), struct{}{}); errs <- err } |
77 | 76 | ) |
89 | 88 | } |
90 | 89 | } |
91 | 90 | |
92 | func TestAbortEarlyCustomMessage_WCB(t *testing.T) { | |
91 | func TestAbortEarlyCustomMessage(t *testing.T) { | |
93 | 92 | var ( |
94 | cb = func(count int, err error) (bool, error) { | |
95 | ret := "Aborting early" | |
96 | return false, fmt.Errorf(ret) | |
97 | } | |
93 | myErr = errors.New("aborting early") | |
94 | cb = func(int, error) (bool, error) { return false, myErr } | |
98 | 95 | endpoints = sd.FixedSubscriber{} // no endpoints |
99 | lb = loadbalancer.NewRoundRobin(endpoints) | |
100 | retry = loadbalancer.RetryWithCallback(time.Second, lb, cb) // lots of retries | |
96 | rr = lb.NewRoundRobin(endpoints) | |
97 | retry = lb.RetryWithCallback(time.Second, rr, cb) // lots of retries | |
101 | 98 | ctx = context.Background() |
102 | 99 | ) |
103 | 100 | _, err := retry(ctx, struct{}{}) |
104 | if err == nil { | |
105 | t.Errorf("expected error, got none") // should fail | |
106 | } | |
107 | if err.Error() != "Aborting early" { | |
108 | t.Errorf("expected custom error message, got %v", err) | |
101 | if want, have := myErr, err.(lb.RetryError).Final; want != have { | |
102 | t.Errorf("want %v, have %v", want, have) | |
109 | 103 | } |
110 | 104 | } |
111 | 105 | |
112 | func TestErrorPassedUnchangedToCallback_WCB(t *testing.T) { | |
106 | func TestErrorPassedUnchangedToCallback(t *testing.T) { | |
113 | 107 | var ( |
114 | myErr = errors.New("My custom error.") | |
115 | cb = func(count int, err error) (bool, error) { | |
116 | if err != myErr { | |
117 | t.Errorf("expected 'My custom error', got %s", err) | |
108 | myErr = errors.New("my custom error") | |
109 | cb = func(_ int, err error) (bool, error) { | |
110 | if want, have := myErr, err; want != have { | |
111 | t.Errorf("want %v, have %v", want, have) | |
118 | 112 | } |
119 | 113 | return false, nil |
120 | 114 | } |
122 | 116 | return nil, myErr |
123 | 117 | } |
124 | 118 | endpoints = sd.FixedSubscriber{endpoint} // no endpoints |
125 | lb = loadbalancer.NewRoundRobin(endpoints) | |
126 | retry = loadbalancer.RetryWithCallback(time.Second, lb, cb) // lots of retries | |
119 | rr = lb.NewRoundRobin(endpoints) | |
120 | retry = lb.RetryWithCallback(time.Second, rr, cb) // lots of retries | |
127 | 121 | ctx = context.Background() |
128 | 122 | ) |
129 | _, _ = retry(ctx, struct{}{}) | |
123 | _, err := retry(ctx, struct{}{}) | |
124 | if want, have := myErr, err.(lb.RetryError).Final; want != have { | |
125 | t.Errorf("want %v, have %v", want, have) | |
126 | } | |
130 | 127 | } |
131 | 128 | |
132 | 129 | func TestHandleNilCallback(t *testing.T) { |
134 | 131 | subscriber = sd.FixedSubscriber{ |
135 | 132 | func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil /* OK */ }, |
136 | 133 | } |
137 | lb = loadbalancer.NewRoundRobin(subscriber) | |
134 | rr = lb.NewRoundRobin(subscriber) | |
138 | 135 | ctx = context.Background() |
139 | 136 | ) |
140 | retry := loadbalancer.RetryWithCallback(time.Second, lb, nil) | |
137 | retry := lb.RetryWithCallback(time.Second, rr, nil) | |
141 | 138 | if _, err := retry(ctx, struct{}{}); err != nil { |
142 | 139 | t.Error(err) |
143 | 140 | } |