Codebase list golang-github-go-kit-kit / df3f2b0
Modify morganhein's change to add retry callbacks. rossmcf 7 years ago
2 changed file(s) with 125 addition(s) and 99 deletion(s). Raw diff Collapse all Expand all
00 package lb
11
22 import (
3 "fmt"
4 "strings"
5 "time"
3 "fmt"
4 "strings"
5 "time"
66
7 "golang.org/x/net/context"
7 "golang.org/x/net/context"
88
9 "github.com/go-kit/kit/endpoint"
9 "github.com/go-kit/kit/endpoint"
1010 )
1111
1212 // Callback function that indicates the current attempt count and the error encountered.
13 // Should return whether the Retry function should continue trying, and a custom
14 // error message if desired. The error message may be nil, but a true/false
15 // is always expected. In all cases if the error message is supplied, the
13 // Should return whether the Retry function should continue trying, and a custom
14 // error message if desired. The error message may be nil, but a true/false
15 // is always expected. In all cases if the error message is supplied, the
1616 // current error will be replaced.
17 type Callback func(int, string) (bool, *string)
17 type Callback func(n int, err error) (cont bool, cbErr error)
1818
1919 // Retry wraps a service load balancer and returns an endpoint oriented load
2020 // balancer for the specified service method.
2222 // balancer. Requests that return errors will be retried until they succeed,
2323 // up to max times, or until the timeout is elapsed, whichever comes first.
2424 func Retry(max int, timeout time.Duration, b Balancer) endpoint.Endpoint {
25 return RetryWithCallback(max, timeout, b, func(c int, s string) (bool, *string) { return true, nil })
25 return RetryWithCallback(max, timeout, b, func(c int, err error) (bool, error) { return true, nil })
2626 }
2727
28 // RetryWithCallback wraps a service load balancer and returns an endpoint oriented load
29 // balancer for the specified service method.
30 // Requests to the endpoint will be automatically load balanced via the load
31 // balancer. Requests that return errors will be retried until they succeed,
32 // up to max times, until the callback returns false, or until the timeout is elapsed,
33 // whichever comes first.
2834 func RetryWithCallback(max int, timeout time.Duration, b Balancer, cb Callback) endpoint.Endpoint {
29 if cb == nil {
30 panic("nil Callback")
31 }
32 if b == nil {
33 panic("nil Balancer")
34 }
35 return func(ctx context.Context, request interface{}) (response interface{}, err error) {
36 var (
37 newctx, cancel = context.WithTimeout(ctx, timeout)
38 responses = make(chan interface{}, 1)
39 errs = make(chan error, 1)
40 a = []string{}
41 )
42 defer cancel()
43 for i := 1; i <= max; i++ {
44 go func() {
45 e, err := b.Endpoint()
46 if err != nil {
47 errs <- err
48 return
49 }
50 response, err := e(newctx, request)
51 if err != nil {
52 errs <- err
53 return
54 }
55 responses <- response
56 }()
35 if cb == nil {
36 panic("nil Callback")
37 }
38 if b == nil {
39 panic("nil Balancer")
40 }
41 return func(ctx context.Context, request interface{}) (response interface{}, err error) {
42 var (
43 newctx, cancel = context.WithTimeout(ctx, timeout)
44 responses = make(chan interface{}, 1)
45 errs = make(chan error, 1)
46 a = []string{}
47 )
48 defer cancel()
49 for i := 1; i <= max; i++ {
50 go func() {
51 e, err := b.Endpoint()
52 if err != nil {
53 errs <- err
54 return
55 }
56 response, err := e(newctx, request)
57 if err != nil {
58 errs <- err
59 return
60 }
61 responses <- response
62 }()
5763
58 select {
59 case <-newctx.Done():
60 return nil, newctx.Err()
61 case response := <-responses:
62 return response, nil
63 case err := <-errs:
64 cont, cbErr := cb(i, err.Error())
65 if !cont {
66 if cbErr == nil {
67 return nil, fmt.Errorf("retry attempts exceeded (%s)", strings.Join(a, "; "))
68 } else {
69 return nil, fmt.Errorf(*cbErr)
70 }
71 }
72 currentErr := err.Error()
73 if cbErr != nil {
74 currentErr = *cbErr
75 }
76 a = append(a, currentErr)
77 continue
78 }
79 }
80 return nil, fmt.Errorf("retry attempts exceeded (%s)", strings.Join(a, "; "))
81 }
64 select {
65 case <-newctx.Done():
66 return nil, newctx.Err()
67 case response := <-responses:
68 return response, nil
69 case err := <-errs:
70 cont, cbErr := cb(i, err)
71 if !cont {
72 if cbErr == nil {
73 return nil, fmt.Errorf("retry attempts exceeded (%s)", strings.Join(a, "; "))
74 }
75 return nil, cbErr
76 }
77 currentErr := err.Error()
78 if cbErr != nil {
79 currentErr = cbErr.Error()
80 }
81 a = append(a, currentErr)
82 continue
83 }
84 }
85 return nil, fmt.Errorf("retry attempts exceeded (%s)", strings.Join(a, "; "))
86 }
8287 }
11
22 import (
33 "errors"
4 "fmt"
45 "testing"
56 "time"
67
8889 }
8990 }
9091
91 func AbortEarlyCustomMessage_WCB(t *testing.T) {
92 var (
93 cb = func(count int, msg string) (bool, *string) {
94 ret := "Aborting early"
95 return false, &ret
96 }
97 endpoints = sd.FixedSubscriber{} // no endpoints
98 lb = loadbalancer.NewRoundRobin(endpoints)
99 retry = loadbalancer.RetryWithCallback(999, time.Second, lb, cb) // lots of retries
100 ctx = context.Background()
101 )
102 _, err := retry(ctx, struct{}{})
103 if err == nil {
104 t.Errorf("expected error, got none") // should fail
105 }
106 if err.Error() != "Aborting early" {
107 t.Errorf("expected custom error message, got %v", err)
108 }
92 func TestAbortEarlyCustomMessage_WCB(t *testing.T) {
93 var (
94 cb = func(count int, err error) (bool, error) {
95 ret := "Aborting early"
96 return false, fmt.Errorf(ret)
97 }
98 endpoints = sd.FixedSubscriber{} // no endpoints
99 lb = loadbalancer.NewRoundRobin(endpoints)
100 retry = loadbalancer.RetryWithCallback(999, time.Second, lb, cb) // lots of retries
101 ctx = context.Background()
102 )
103 _, err := retry(ctx, struct{}{})
104 if err == nil {
105 t.Errorf("expected error, got none") // should fail
106 }
107 if err.Error() != "Aborting early" {
108 t.Errorf("expected custom error message, got %v", err)
109 }
109110 }
110111
111 func AbortEarlyOnNTries_WCB(t *testing.T) {
112 var (
113 cb = func(count int, msg string) (bool, *string) {
114 if (count >= 4) {
115 t.Errorf("expected retries to abort at 3 but continued to %v", count)
116 }
117 if (count == 3) {
118 return false, nil
119 }
120 return true, nil
121 }
122 endpoints = sd.FixedSubscriber{} // no endpoints
123 lb = loadbalancer.NewRoundRobin(endpoints)
124 retry = loadbalancer.RetryWithCallback(999, time.Second, lb, cb) // lots of retries
125 ctx = context.Background()
126 )
127 if _, err := retry(ctx, struct{}{}); err == nil {
128 t.Errorf("expected error, got none") // should fail
129 }
112 func TestAbortEarlyOnNTries_WCB(t *testing.T) {
113 var (
114 cb = func(count int, err error) (bool, error) {
115 if count >= 4 {
116 t.Errorf("expected retries to abort at 3 but continued to %v", count)
117 }
118 if count == 3 {
119 return false, nil
120 }
121 return true, nil
122 }
123 endpoints = sd.FixedSubscriber{} // no endpoints
124 lb = loadbalancer.NewRoundRobin(endpoints)
125 retry = loadbalancer.RetryWithCallback(999, time.Second, lb, cb) // lots of retries
126 ctx = context.Background()
127 )
128 if _, err := retry(ctx, struct{}{}); err == nil {
129 t.Errorf("expected error, got none") // should fail
130 }
130131 }
132
133 func TestErrorPassedUnchangedToCallback_WCB(t *testing.T) {
134 var (
135 myErr = errors.New("My custom error.")
136 cb = func(count int, err error) (bool, error) {
137 if err != myErr {
138 t.Errorf("expected 'My custom error', got %s", err)
139 }
140 return false, nil
141 }
142 endpoint = func(ctx context.Context, request interface{}) (interface{}, error) {
143 return nil, myErr
144 }
145 endpoints = sd.FixedSubscriber{endpoint} // no endpoints
146 lb = loadbalancer.NewRoundRobin(endpoints)
147 retry = loadbalancer.RetryWithCallback(999, time.Second, lb, cb) // lots of retries
148 ctx = context.Background()
149 )
150 _, _ = retry(ctx, struct{}{})
151 }