Merge pull request #58 from go-kit/circuit-breaker
package circuitbreaker
Peter Bourgon
8 years ago
0 | package circuitbreaker | |
1 | ||
2 | import ( | |
3 | "github.com/sony/gobreaker" | |
4 | "golang.org/x/net/context" | |
5 | ||
6 | "github.com/go-kit/kit/endpoint" | |
7 | ) | |
8 | ||
9 | // Gobreaker returns an endpoint.Middleware that implements the circuit | |
10 | // breaker pattern using the sony/gobreaker package. Only errors returned by | |
11 | // the wrapped endpoint count against the circuit breaker's error count. | |
12 | // | |
13 | // See http://godoc.org/github.com/sony/gobreaker for more information. | |
14 | func Gobreaker(settings gobreaker.Settings) endpoint.Middleware { | |
15 | cb := gobreaker.NewCircuitBreaker(settings) | |
16 | return func(next endpoint.Endpoint) endpoint.Endpoint { | |
17 | return func(ctx context.Context, request interface{}) (interface{}, error) { | |
18 | return cb.Execute(func() (interface{}, error) { return next(ctx, request) }) | |
19 | } | |
20 | } | |
21 | } |
0 | package circuitbreaker_test | |
1 | ||
2 | import ( | |
3 | "testing" | |
4 | ||
5 | "github.com/sony/gobreaker" | |
6 | ||
7 | "github.com/go-kit/kit/circuitbreaker" | |
8 | ) | |
9 | ||
10 | func TestGobreaker(t *testing.T) { | |
11 | var ( | |
12 | breaker = circuitbreaker.Gobreaker(gobreaker.Settings{}) | |
13 | primeWith = 100 | |
14 | shouldPass = func(n int) bool { return n <= 5 } // https://github.com/sony/gobreaker/blob/bfa846d/gobreaker.go#L76 | |
15 | circuitOpenError = "circuit breaker is open" | |
16 | ) | |
17 | testFailingEndpoint(t, breaker, primeWith, shouldPass, circuitOpenError) | |
18 | } |
0 | package circuitbreaker | |
1 | ||
2 | import ( | |
3 | "time" | |
4 | ||
5 | "github.com/streadway/handy/breaker" | |
6 | "golang.org/x/net/context" | |
7 | ||
8 | "github.com/go-kit/kit/endpoint" | |
9 | ) | |
10 | ||
11 | // HandyBreaker returns an endpoint.Middleware that implements the circuit | |
12 | // breaker pattern using the streadway/handy/breaker package. Only errors | |
13 | // returned by the wrapped endpoint count against the circuit breaker's error | |
14 | // count. | |
15 | // | |
16 | // See http://godoc.org/github.com/streadway/handy/breaker for more | |
17 | // information. | |
18 | func HandyBreaker(failureRatio float64) endpoint.Middleware { | |
19 | b := breaker.NewBreaker(failureRatio) | |
20 | return func(next endpoint.Endpoint) endpoint.Endpoint { | |
21 | return func(ctx context.Context, request interface{}) (response interface{}, err error) { | |
22 | if !b.Allow() { | |
23 | return nil, breaker.ErrCircuitOpen | |
24 | } | |
25 | ||
26 | defer func(begin time.Time) { | |
27 | if err == nil { | |
28 | b.Success(time.Since(begin)) | |
29 | } else { | |
30 | b.Failure(time.Since(begin)) | |
31 | } | |
32 | }(time.Now()) | |
33 | ||
34 | response, err = next(ctx, request) | |
35 | return | |
36 | } | |
37 | } | |
38 | } |
0 | package circuitbreaker_test | |
1 | ||
2 | import ( | |
3 | "testing" | |
4 | ||
5 | handybreaker "github.com/streadway/handy/breaker" | |
6 | ||
7 | "github.com/go-kit/kit/circuitbreaker" | |
8 | ) | |
9 | ||
10 | func TestHandyBreaker(t *testing.T) { | |
11 | var ( | |
12 | failureRatio = 0.05 | |
13 | breaker = circuitbreaker.HandyBreaker(failureRatio) | |
14 | primeWith = handybreaker.DefaultMinObservations * 10 | |
15 | shouldPass = func(n int) bool { return (float64(n) / float64(primeWith+n)) <= failureRatio } | |
16 | openCircuitError = handybreaker.ErrCircuitOpen.Error() | |
17 | ) | |
18 | testFailingEndpoint(t, breaker, primeWith, shouldPass, openCircuitError) | |
19 | } |
0 | package circuitbreaker | |
1 | ||
2 | import ( | |
3 | "github.com/afex/hystrix-go/hystrix" | |
4 | "golang.org/x/net/context" | |
5 | ||
6 | "github.com/go-kit/kit/endpoint" | |
7 | ) | |
8 | ||
9 | // Hystrix returns an endpoint.Middleware that implements the circuit | |
10 | // breaker pattern using the afex/hystrix-go package. | |
11 | // | |
12 | // When using this circuit breaker, please configure your commands separately. | |
13 | // | |
14 | // See https://godoc.org/github.com/afex/hystrix-go/hystrix for more | |
15 | // information. | |
16 | func Hystrix(commandName string) endpoint.Middleware { | |
17 | return func(next endpoint.Endpoint) endpoint.Endpoint { | |
18 | return func(ctx context.Context, request interface{}) (response interface{}, err error) { | |
19 | output := make(chan interface{}, 1) | |
20 | errors := hystrix.Go(commandName, func() error { | |
21 | resp, err := next(ctx, request) | |
22 | if err == nil { | |
23 | output <- resp | |
24 | } | |
25 | return err | |
26 | }, nil) | |
27 | ||
28 | select { | |
29 | case resp := <-output: | |
30 | return resp, nil | |
31 | case err := <-errors: | |
32 | return nil, err | |
33 | } | |
34 | } | |
35 | } | |
36 | } |
0 | package circuitbreaker_test | |
1 | ||
2 | import ( | |
3 | stdlog "log" | |
4 | "os" | |
5 | "testing" | |
6 | ||
7 | "github.com/afex/hystrix-go/hystrix" | |
8 | ||
9 | "github.com/go-kit/kit/circuitbreaker" | |
10 | kitlog "github.com/go-kit/kit/log" | |
11 | ) | |
12 | ||
13 | func TestHystrix(t *testing.T) { | |
14 | logger := kitlog.NewLogfmtLogger(os.Stderr) | |
15 | stdlog.SetOutput(kitlog.NewStdlibAdapter(logger)) | |
16 | ||
17 | const ( | |
18 | commandName = "my-endpoint" | |
19 | errorPercent = 5 | |
20 | maxConcurrent = 1000 | |
21 | ) | |
22 | hystrix.ConfigureCommand(commandName, hystrix.CommandConfig{ | |
23 | ErrorPercentThreshold: errorPercent, | |
24 | MaxConcurrentRequests: maxConcurrent, | |
25 | }) | |
26 | ||
27 | var ( | |
28 | breaker = circuitbreaker.Hystrix(commandName) | |
29 | primeWith = hystrix.DefaultVolumeThreshold * 2 | |
30 | shouldPass = func(n int) bool { return (float64(n) / float64(primeWith+n)) <= (float64(errorPercent-1) / 100.0) } | |
31 | openCircuitError = hystrix.ErrCircuitOpen.Error() | |
32 | ) | |
33 | testFailingEndpoint(t, breaker, primeWith, shouldPass, openCircuitError) | |
34 | } |
0 | package circuitbreaker_test | |
1 | ||
2 | import ( | |
3 | "errors" | |
4 | "testing" | |
5 | ||
6 | "golang.org/x/net/context" | |
7 | ||
8 | "github.com/go-kit/kit/endpoint" | |
9 | ) | |
10 | ||
11 | func testFailingEndpoint(t *testing.T, breaker endpoint.Middleware, primeWith int, shouldPass func(int) bool, openCircuitError string) { | |
12 | // Create a mock endpoint and wrap it with the breaker. | |
13 | m := mock{} | |
14 | var e endpoint.Endpoint | |
15 | e = m.endpoint | |
16 | e = breaker(e) | |
17 | ||
18 | // Prime the endpoint with successful requests. | |
19 | for i := 0; i < primeWith; i++ { | |
20 | if _, err := e(context.Background(), struct{}{}); err != nil { | |
21 | t.Fatalf("during priming, got error: %v", err) | |
22 | } | |
23 | } | |
24 | ||
25 | // Switch the endpoint to start throwing errors. | |
26 | m.err = errors.New("tragedy+disaster") | |
27 | m.thru = 0 | |
28 | ||
29 | // The first several should be allowed through and yield our error. | |
30 | for i := 0; shouldPass(i); i++ { | |
31 | if _, err := e(context.Background(), struct{}{}); err != m.err { | |
32 | t.Fatalf("want %v, have %v", m.err, err) | |
33 | } | |
34 | } | |
35 | thru := m.thru | |
36 | ||
37 | // But the rest should be blocked by an open circuit. | |
38 | for i := 0; i < 10; i++ { | |
39 | if _, err := e(context.Background(), struct{}{}); err.Error() != openCircuitError { | |
40 | t.Fatalf("want %q, have %q", openCircuitError, err.Error()) | |
41 | } | |
42 | } | |
43 | ||
44 | // Make sure none of those got through. | |
45 | if want, have := thru, m.thru; want != have { | |
46 | t.Errorf("want %d, have %d", want, have) | |
47 | } | |
48 | } | |
49 | ||
50 | type mock struct { | |
51 | thru int | |
52 | err error | |
53 | } | |
54 | ||
55 | func (m *mock) endpoint(context.Context, interface{}) (interface{}, error) { | |
56 | m.thru++ | |
57 | return struct{}{}, m.err | |
58 | } |