Codebase list golang-github-go-kit-kit / 569b930
Add streadway/handy/breaker Peter Bourgon 8 years ago
6 changed file(s) with 202 addition(s) and 98 deletion(s). Raw diff Collapse all Expand all
0 package circuitbreaker
1
2 import (
3 "github.com/sony/gobreaker"
4 "golang.org/x/net/context"
5
6 "github.com/go-kit/kit/endpoint"
7 )
8
9 // Gobreaker returns an endpoint.Middleware that implements the circuit
10 // breaker pattern using the sony/gobreaker package. Only errors returned by
11 // the wrapped endpoint count against the circuit breaker's error count.
12 //
13 // See http://godoc.org/github.com/sony/gobreaker for more information.
14 func Gobreaker(settings gobreaker.Settings) endpoint.Middleware {
15 cb := gobreaker.NewCircuitBreaker(settings)
16 return func(next endpoint.Endpoint) endpoint.Endpoint {
17 return func(ctx context.Context, request interface{}) (interface{}, error) {
18 return cb.Execute(func() (interface{}, error) { return next(ctx, request) })
19 }
20 }
21 }
0 package circuitbreaker_test
1
2 import (
3 "errors"
4 "testing"
5 "time"
6
7 "github.com/sony/gobreaker"
8 "golang.org/x/net/context"
9
10 "github.com/go-kit/kit/circuitbreaker"
11 "github.com/go-kit/kit/endpoint"
12 )
13
14 func TestGobreaker(t *testing.T) {
15 var (
16 thru int
17 last gobreaker.State
18 myError = errors.New("❤️")
19 timeout = time.Millisecond
20 stateChange = func(_ string, from, to gobreaker.State) { last = to }
21 )
22
23 var e endpoint.Endpoint
24 e = func(context.Context, interface{}) (interface{}, error) { thru++; return struct{}{}, myError }
25 e = circuitbreaker.Gobreaker(gobreaker.Settings{
26 Timeout: timeout,
27 OnStateChange: stateChange,
28 })(e)
29
30 // "Default ReadyToTrip returns true when the number of consecutive
31 // failures is more than 5."
32 // https://github.com/sony/gobreaker/blob/bfa846d/gobreaker.go#L76
33 for i := 0; i < 5; i++ {
34 if _, err := e(context.Background(), struct{}{}); err != myError {
35 t.Errorf("want %v, have %v", myError, err)
36 }
37 }
38
39 if want, have := 5, thru; want != have {
40 t.Errorf("want %d, have %d", want, have)
41 }
42
43 e(context.Background(), struct{}{})
44 if want, have := 6, thru; want != have { // got thru
45 t.Errorf("want %d, have %d", want, have)
46 }
47 if want, have := gobreaker.StateOpen, last; want != have { // tripped
48 t.Errorf("want %v, have %v", want, have)
49 }
50
51 e(context.Background(), struct{}{})
52 if want, have := 6, thru; want != have { // didn't get thru
53 t.Errorf("want %d, have %d", want, have)
54 }
55
56 time.Sleep(2 * timeout)
57
58 e(context.Background(), struct{}{})
59 if want, have := 7, thru; want != have { // got thru via halfopen
60 t.Errorf("want %d, have %d", want, have)
61 }
62 if want, have := gobreaker.StateOpen, last; want != have { // re-tripped
63 t.Errorf("want %v, have %v", want, have)
64 }
65
66 time.Sleep(2 * timeout)
67
68 myError = nil
69 e(context.Background(), struct{}{})
70 if want, have := 8, thru; want != have { // got thru via halfopen
71 t.Errorf("want %d, have %d", want, have)
72 }
73 if want, have := gobreaker.StateClosed, last; want != have { // now it's good
74 t.Errorf("want %v, have %v", want, have)
75 }
76 }
0 package circuitbreaker
1
2 import (
3 "errors"
4 "time"
5
6 "github.com/streadway/handy/breaker"
7 "golang.org/x/net/context"
8
9 "github.com/go-kit/kit/endpoint"
10 )
11
12 // ErrCircuitBreakerOpen is returned when the HandyBreaker's circuit is open
13 // and the request is stopped from proceeding.
14 var ErrCircuitBreakerOpen = errors.New("circuit breaker open")
15
16 // HandyBreaker returns an endpoint.Middleware that implements the circuit
17 // breaker pattern using the streadway/handy/breaker package. Only errors
18 // returned by the wrapped endpoint count against the circuit breaker's error
19 // count.
20 //
21 // See http://godoc.org/github.com/streadway/handy/breaker for more
22 // information.
23 func HandyBreaker(failureRatio float64) endpoint.Middleware {
24 b := breaker.NewBreaker(failureRatio)
25 return func(next endpoint.Endpoint) endpoint.Endpoint {
26 return func(ctx context.Context, request interface{}) (response interface{}, err error) {
27 if !b.Allow() {
28 return nil, ErrCircuitBreakerOpen
29 }
30
31 defer func(begin time.Time) {
32 if err == nil {
33 b.Success(time.Since(begin))
34 } else {
35 b.Failure(time.Since(begin))
36 }
37 }(time.Now())
38
39 response, err = next(ctx, request)
40 return
41 }
42 }
43 }
0 package circuitbreaker_test
1
2 import (
3 "errors"
4 "testing"
5
6 "github.com/streadway/handy/breaker"
7
8 "golang.org/x/net/context"
9
10 "github.com/go-kit/kit/circuitbreaker"
11 "github.com/go-kit/kit/endpoint"
12 )
13
14 func TestHandyBreaker(t *testing.T) {
15 var (
16 thru = 0
17 myError = error(nil)
18 ratio = 0.05
19 primeWith = breaker.DefaultMinObservations * 10
20 shouldPass = func(failed int) bool { return (float64(failed) / float64(primeWith+failed)) <= ratio }
21 extraTries = 10
22 )
23
24 var e endpoint.Endpoint
25 e = func(context.Context, interface{}) (interface{}, error) { thru++; return struct{}{}, myError }
26 e = circuitbreaker.HandyBreaker(ratio)(e)
27
28 // Prime with some successes.
29 for i := 0; i < primeWith; i++ {
30 if _, err := e(context.Background(), struct{}{}); err != nil {
31 t.Fatal(err)
32 }
33 }
34
35 // Now we start throwing errors.
36 myError = errors.New(":(")
37
38 // The first few should get thru.
39 var letThru int
40 for i := 0; shouldPass(i); i++ { // off-by-one
41 letThru++
42 if _, err := e(context.Background(), struct{}{}); err != myError {
43 t.Fatalf("want %v, have %v", myError, err)
44 }
45 }
46
47 // But the rest should be blocked by an open circuit.
48 for i := 1; i <= extraTries; i++ {
49 if _, err := e(context.Background(), struct{}{}); err != circuitbreaker.ErrCircuitBreakerOpen {
50 t.Errorf("with request #%d, want %v, have %v", primeWith+letThru+i, circuitbreaker.ErrCircuitBreakerOpen, err)
51 }
52 }
53
54 // Confirm the rest didn't get through.
55 if want, have := primeWith+letThru, thru; want != have {
56 t.Errorf("want %d, have %d", want, have)
57 }
58 }
+0
-21
circuitbreaker/sony_gobreaker.go less more
0 package circuitbreaker
1
2 import (
3 "github.com/sony/gobreaker"
4 "golang.org/x/net/context"
5
6 "github.com/go-kit/kit/endpoint"
7 )
8
9 // NewSonyCircuitBreaker returns an endpoint.Middleware that permits the
10 // request if the underlying circuit breaker allows it. Only errors returned
11 // by the wrapped endpoint count against the circuit breaker's error count.
12 // See github.com/sony/gobreaker for more information.
13 func NewSonyCircuitBreaker(settings gobreaker.Settings) endpoint.Middleware {
14 cb := gobreaker.NewCircuitBreaker(settings)
15 return func(next endpoint.Endpoint) endpoint.Endpoint {
16 return func(ctx context.Context, request interface{}) (interface{}, error) {
17 return cb.Execute(func() (interface{}, error) { return next(ctx, request) })
18 }
19 }
20 }
+0
-77
circuitbreaker/sony_gobreaker_test.go less more
0 package circuitbreaker_test
1
2 import (
3 "errors"
4 "testing"
5 "time"
6
7 "github.com/sony/gobreaker"
8 "golang.org/x/net/context"
9
10 "github.com/go-kit/kit/circuitbreaker"
11 "github.com/go-kit/kit/endpoint"
12 )
13
14 func TestSonyCircuitBreaker(t *testing.T) {
15 var (
16 thru int
17 last gobreaker.State
18 myError = errors.New("❤️")
19 timeout = time.Millisecond
20 stateChange = func(_ string, from, to gobreaker.State) { last = to }
21 )
22
23 var e endpoint.Endpoint
24 e = func(context.Context, interface{}) (interface{}, error) { thru++; return struct{}{}, myError }
25 e = circuitbreaker.NewSonyCircuitBreaker(gobreaker.Settings{
26 Timeout: timeout,
27 OnStateChange: stateChange,
28 })(e)
29
30 // "Default ReadyToTrip returns true when the number of consecutive
31 // failures is more than 5."
32 // https://github.com/sony/gobreaker/blob/bfa846d/gobreaker.go#L76
33 for i := 0; i < 5; i++ {
34 if _, err := e(context.Background(), struct{}{}); err != myError {
35 t.Errorf("want %v, have %v", myError, err)
36 }
37 }
38
39 if want, have := 5, thru; want != have {
40 t.Errorf("want %d, have %d", want, have)
41 }
42
43 e(context.Background(), struct{}{})
44 if want, have := 6, thru; want != have { // got thru
45 t.Errorf("want %d, have %d", want, have)
46 }
47 if want, have := gobreaker.StateOpen, last; want != have { // tripped
48 t.Errorf("want %v, have %v", want, have)
49 }
50
51 e(context.Background(), struct{}{})
52 if want, have := 6, thru; want != have { // didn't get thru
53 t.Errorf("want %d, have %d", want, have)
54 }
55
56 time.Sleep(2 * timeout)
57
58 e(context.Background(), struct{}{})
59 if want, have := 7, thru; want != have { // got thru via halfopen
60 t.Errorf("want %d, have %d", want, have)
61 }
62 if want, have := gobreaker.StateOpen, last; want != have { // re-tripped
63 t.Errorf("want %v, have %v", want, have)
64 }
65
66 time.Sleep(2 * timeout)
67
68 myError = nil
69 e(context.Background(), struct{}{})
70 if want, have := 8, thru; want != have { // got thru via halfopen
71 t.Errorf("want %d, have %d", want, have)
72 }
73 if want, have := gobreaker.StateClosed, last; want != have { // now it's good
74 t.Errorf("want %v, have %v", want, have)
75 }
76 }