Codebase list golang-github-go-kit-kit / 306f6b7
Converted Retry into wrapper for RetryWithCallback. Morgan Hein 7 years ago
2 changed file(s) with 45 addition(s) and 41 deletion(s). Raw diff Collapse all Expand all
1414 // error message if desired. The error message may be nil, but a true/false
1515 // is always expected. In all cases if the error message is supplied, the
1616 // current error will be replaced.
17 type callback func(int, string) (bool, *string)
17 type Callback func(int, string) (bool, *string)
1818
1919 // Retry wraps a service load balancer and returns an endpoint oriented load
2020 // balancer for the specified service method.
2222 // balancer. Requests that return errors will be retried until they succeed,
2323 // up to max times, or until the timeout is elapsed, whichever comes first.
2424 func Retry(max int, timeout time.Duration, b Balancer) endpoint.Endpoint {
25 if b == nil {
26 panic("nil Balancer")
27 }
28 return func(ctx context.Context, request interface{}) (response interface{}, err error) {
29 var (
30 newctx, cancel = context.WithTimeout(ctx, timeout)
31 responses = make(chan interface{}, 1)
32 errs = make(chan error, 1)
33 a = []string{}
34 )
35 defer cancel()
36 for i := 1; i <= max; i++ {
37 go func() {
38 e, err := b.Endpoint()
39 if err != nil {
40 errs <- err
41 return
42 }
43 response, err := e(newctx, request)
44 if err != nil {
45 errs <- err
46 return
47 }
48 responses <- response
49 }()
50
51 select {
52 case <-newctx.Done():
53 return nil, newctx.Err()
54 case response := <-responses:
55 return response, nil
56 case err := <-errs:
57 a = append(a, err.Error())
58 continue
59 }
60 }
61 return nil, fmt.Errorf("retry attempts exceeded (%s)", strings.Join(a, "; "))
62 }
25 return RetryWithCallback(max, timeout, b, func(c int, s string) (bool, *string) { return true, nil })
6326 }
6427
65 func RetryWithCallback(max int, timeout time.Duration, b Balancer, cb callback) endpoint.Endpoint {
28 func RetryWithCallback(max int, timeout time.Duration, b Balancer, cb Callback) endpoint.Endpoint {
6629 if cb == nil {
67 panic("nil callback")
30 panic("nil Callback")
6831 }
6932 if b == nil {
7033 panic("nil Balancer")
8787 t.Errorf("wanted %v, got none", context.DeadlineExceeded)
8888 }
8989 }
90
91 func AbortEarlyCustomMessage_WCB(t *testing.T) {
92 var (
93 cb = func(count int, msg string) (bool, *string) {
94 ret := "Aborting early"
95 return false, &ret
96 }
97 endpoints = sd.FixedSubscriber{} // no endpoints
98 lb = loadbalancer.NewRoundRobin(endpoints)
99 retry = loadbalancer.RetryWithCallback(999, time.Second, lb, cb) // lots of retries
100 ctx = context.Background()
101 )
102 _, err := retry(ctx, struct{}{})
103 if err == nil {
104 t.Errorf("expected error, got none") // should fail
105 }
106 if err.Error() != "Aborting early" {
107 t.Errorf("expected custom error message, got %v", err)
108 }
109 }
110
111 func AbortEarlyOnNTries_WCB(t *testing.T) {
112 var (
113 cb = func(count int, msg string) (bool, *string) {
114 if (count >= 4) {
115 t.Errorf("expected retries to abort at 3 but continued to %v", count)
116 }
117 if (count == 3) {
118 return false, nil
119 }
120 return true, nil
121 }
122 endpoints = sd.FixedSubscriber{} // no endpoints
123 lb = loadbalancer.NewRoundRobin(endpoints)
124 retry = loadbalancer.RetryWithCallback(999, time.Second, lb, cb) // lots of retries
125 ctx = context.Background()
126 )
127 if _, err := retry(ctx, struct{}{}); err == nil {
128 t.Errorf("expected error, got none") // should fail
129 }
130 }