Encode max retry logic in callback.
Ross McFarlane
7 years ago
9 | 9 | "github.com/go-kit/kit/endpoint" |
10 | 10 | ) |
11 | 11 | |
12 | // Callback is a function that indicates the current attempt count and the error | |
12 | // Callback is a function that is given the current attempt count and the error | |
13 | 13 | // encountered. Should return whether the Retry function should continue trying, |
14 | 14 | // and a custom error message if desired. The error message may be nil, but a |
15 | 15 | // true/false is always expected. In all cases if the error message is supplied, |
22 | 22 | // balancer. Requests that return errors will be retried until they succeed, |
23 | 23 | // up to max times, or until the timeout is elapsed, whichever comes first. |
24 | 24 | func Retry(max int, timeout time.Duration, b Balancer) endpoint.Endpoint { |
25 | return RetryWithCallback(max, timeout, b, func(c int, err error) (bool, error) { return true, nil }) | |
25 | return RetryWithCallback(timeout, b, maxRetries(max)) | |
26 | } | |
27 | ||
28 | // maxRetries returns a callback function that enforces max retries. | |
29 | func maxRetries(max int) Callback { | |
30 | return func(n int, err error) (bool, error) { | |
31 | if n < max { | |
32 | return true, nil | |
33 | } | |
34 | return false, nil | |
35 | } | |
26 | 36 | } |
27 | 37 | |
28 | 38 | // RetryWithCallback wraps a service load balancer and returns an endpoint oriented load |
31 | 41 | // balancer. Requests that return errors will be retried until they succeed, |
32 | 42 | // up to max times, until the callback returns false, or until the timeout is elapsed, |
33 | 43 | // whichever comes first. |
34 | func RetryWithCallback(max int, timeout time.Duration, b Balancer, cb Callback) endpoint.Endpoint { | |
44 | func RetryWithCallback(timeout time.Duration, b Balancer, cb Callback) endpoint.Endpoint { | |
35 | 45 | if cb == nil { |
36 | 46 | panic("nil Callback") |
37 | 47 | } |
46 | 56 | a = []string{} |
47 | 57 | ) |
48 | 58 | defer cancel() |
49 | for i := 1; i <= max; i++ { | |
59 | for i := 1; ; i++ { | |
50 | 60 | go func() { |
51 | 61 | e, err := b.Endpoint() |
52 | 62 | if err != nil { |
82 | 92 | continue |
83 | 93 | } |
84 | 94 | } |
85 | return nil, fmt.Errorf("retry attempts exceeded (%s)", strings.Join(a, "; ")) | |
86 | 95 | } |
87 | 96 | } |
41 | 41 | ctx = context.Background() |
42 | 42 | ) |
43 | 43 | if _, err := loadbalancer.Retry(retries, time.Second, lb)(ctx, struct{}{}); err == nil { |
44 | t.Errorf("expected error, got none") | |
44 | t.Errorf("expected error two, got none") | |
45 | 45 | } |
46 | 46 | } |
47 | 47 | |
97 | 97 | } |
98 | 98 | endpoints = sd.FixedSubscriber{} // no endpoints |
99 | 99 | lb = loadbalancer.NewRoundRobin(endpoints) |
100 | retry = loadbalancer.RetryWithCallback(999, time.Second, lb, cb) // lots of retries | |
100 | retry = loadbalancer.RetryWithCallback(time.Second, lb, cb) // lots of retries | |
101 | 101 | ctx = context.Background() |
102 | 102 | ) |
103 | 103 | _, err := retry(ctx, struct{}{}) |
106 | 106 | } |
107 | 107 | if err.Error() != "Aborting early" { |
108 | 108 | t.Errorf("expected custom error message, got %v", err) |
109 | } | |
110 | } | |
111 | ||
112 | func TestAbortEarlyOnNTries_WCB(t *testing.T) { | |
113 | var ( | |
114 | cb = func(count int, err error) (bool, error) { | |
115 | if count >= 4 { | |
116 | t.Errorf("expected retries to abort at 3 but continued to %v", count) | |
117 | } | |
118 | if count == 3 { | |
119 | return false, nil | |
120 | } | |
121 | return true, nil | |
122 | } | |
123 | endpoints = sd.FixedSubscriber{} // no endpoints | |
124 | lb = loadbalancer.NewRoundRobin(endpoints) | |
125 | retry = loadbalancer.RetryWithCallback(999, time.Second, lb, cb) // lots of retries | |
126 | ctx = context.Background() | |
127 | ) | |
128 | if _, err := retry(ctx, struct{}{}); err == nil { | |
129 | t.Errorf("expected error, got none") // should fail | |
130 | 109 | } |
131 | 110 | } |
132 | 111 | |
144 | 123 | } |
145 | 124 | endpoints = sd.FixedSubscriber{endpoint} // no endpoints |
146 | 125 | lb = loadbalancer.NewRoundRobin(endpoints) |
147 | retry = loadbalancer.RetryWithCallback(999, time.Second, lb, cb) // lots of retries | |
126 | retry = loadbalancer.RetryWithCallback(time.Second, lb, cb) // lots of retries | |
148 | 127 | ctx = context.Background() |
149 | 128 | ) |
150 | 129 | _, _ = retry(ctx, struct{}{}) |