Codebase list golang-github-go-kit-kit / 05bcb5b
Merge pull request #61 from josler/jo/hystrix-circuit-breaker add hystrix circuit breaker Peter Bourgon 8 years ago
2 changed file(s) with 156 addition(s) and 0 deletion(s). Raw diff Collapse all Expand all
0 package circuitbreaker
1
2 import (
3 "github.com/afex/hystrix-go/hystrix"
4 "golang.org/x/net/context"
5
6 "github.com/go-kit/kit/endpoint"
7 )
8
9 // Hystrix returns an endpoint.Middleware that implements the circuit
10 // breaker pattern using the afex/hystrix-go package.
11 //
12 // When using this circuit breaker, please configure your commands separately.
13 //
14 // See https://godoc.org/github.com/afex/hystrix-go/hystrix for more
15 // information.
16 func Hystrix(commandName string) endpoint.Middleware {
17 return func(next endpoint.Endpoint) endpoint.Endpoint {
18 return func(ctx context.Context, request interface{}) (response interface{}, err error) {
19 output := make(chan interface{}, 1)
20 errors := hystrix.Go(commandName, func() error {
21 resp, err := next(ctx, request)
22 if err == nil {
23 output <- resp
24 }
25 return err
26 }, nil)
27
28 select {
29 case resp := <-output:
30 return resp, nil
31 case err := <-errors:
32 return nil, err
33 }
34 }
35 }
36 }
0 package circuitbreaker_test
1
2 import (
3 "errors"
4 "testing"
5 "time"
6
7 "github.com/afex/hystrix-go/hystrix"
8 "golang.org/x/net/context"
9
10 "github.com/go-kit/kit/circuitbreaker"
11 "github.com/go-kit/kit/endpoint"
12 )
13
14 func TestHystrixCircuitBreakerOpen(t *testing.T) {
15 var (
16 thru = 0
17 myError = error(nil)
18 ratio = 0.04
19 primeWith = hystrix.DefaultVolumeThreshold * 2
20 shouldPass = func(failed int) bool { return (float64(failed) / float64(primeWith+failed)) <= ratio }
21 extraTries = 10
22 )
23
24 // configure hystrix
25 hystrix.ConfigureCommand("myEndpoint", hystrix.CommandConfig{
26 ErrorPercentThreshold: 5,
27 MaxConcurrentRequests: 200,
28 })
29
30 var e endpoint.Endpoint
31 e = func(context.Context, interface{}) (interface{}, error) { thru++; return struct{}{}, myError }
32 e = circuitbreaker.Hystrix("myEndpoint")(e)
33
34 // prime
35 for i := 0; i < primeWith; i++ {
36 if _, err := e(context.Background(), struct{}{}); err != nil {
37 t.Fatal(err)
38 }
39 }
40
41 // Now we start throwing errors.
42 myError = errors.New(":(")
43
44 // The first few should get thru.
45 var letThru int
46 for i := 0; shouldPass(i); i++ { // off-by-one
47 letThru++
48 if _, err := e(context.Background(), struct{}{}); err != myError {
49 t.Fatalf("want %v, have %v", myError, err)
50 }
51 }
52
53 // But the rest should be blocked by an open circuit.
54 for i := 1; i <= extraTries; i++ {
55 if _, err := e(context.Background(), struct{}{}); err != hystrix.ErrCircuitOpen {
56 t.Errorf("with request #%d, want %v, have %v", primeWith+letThru+i, hystrix.ErrCircuitOpen, err)
57 }
58 }
59
60 // Confirm the rest didn't get through.
61 if want, have := primeWith+letThru, thru; want != have {
62 t.Errorf("want %d, have %d", want, have)
63 }
64 }
65
66 func TestHystrixTimeout(t *testing.T) {
67 var (
68 timeout = time.Millisecond * 0
69 primeWith = hystrix.DefaultVolumeThreshold * 2
70 failNumber = 2 // 5% threshold
71 )
72
73 // configure hystrix
74 hystrix.ConfigureCommand("timeoutEndpoint", hystrix.CommandConfig{
75 ErrorPercentThreshold: 5,
76 MaxConcurrentRequests: 200,
77 SleepWindow: 5, // milliseconds
78 Timeout: 1, // milliseconds
79 })
80
81 var e endpoint.Endpoint
82 e = func(context.Context, interface{}) (interface{}, error) {
83 time.Sleep(2 * timeout)
84 return struct{}{}, nil
85 }
86 e = circuitbreaker.Hystrix("timeoutEndpoint")(e)
87
88 // prime
89 for i := 0; i < primeWith; i++ {
90 if _, err := e(context.Background(), struct{}{}); err != nil {
91 t.Errorf("expecting %v, have %v", nil, err)
92 }
93 }
94
95 // times out
96 timeout = time.Millisecond * 2
97 for i := 0; i < failNumber; i++ {
98 if _, err := e(context.Background(), struct{}{}); err != hystrix.ErrTimeout {
99 t.Errorf("%d expecting %v, have %v", i, hystrix.ErrTimeout, err)
100 }
101 }
102
103 // fix timeout
104 timeout = time.Millisecond * 0
105
106 // fails for a little while still
107 for i := 0; i < failNumber; i++ {
108 if _, err := e(context.Background(), struct{}{}); err != hystrix.ErrCircuitOpen {
109 t.Errorf("expecting %v, have %v", hystrix.ErrCircuitOpen, err)
110 }
111 }
112
113 // back to OK
114 time.Sleep(time.Millisecond * 5)
115 if _, err := e(context.Background(), struct{}{}); err != nil {
116 t.Errorf("expecting %v, have %v", nil, err)
117 }
118 }