Codebase list golang-github-go-kit-kit / 5ea37e7
All old expressed in terms of new Nelz 6 years ago
2 changed file(s) with 36 addition(s) and 61 deletion(s). Raw diff Collapse all Expand all
2222
2323 // NewTokenBucketThrottler returns an endpoint.Middleware that acts as a
2424 // request throttler based on a token-bucket algorithm. Requests that would
25 // exceed the maximum request rate are delayed via the parameterized sleep
26 // function. By default you may pass time.Sleep.
27 func NewTokenBucketThrottler(tb *ratelimit.Bucket, sleep func(time.Duration)) endpoint.Middleware {
28 // return NewDelayingLimiter(NewWaiter(tb))
29 return func(next endpoint.Endpoint) endpoint.Endpoint {
30 return func(ctx context.Context, request interface{}) (interface{}, error) {
31 sleep(tb.Take(1))
32 return next(ctx, request)
33 }
34 }
25 // exceed the maximum request rate are delayed.
26 // The parameterized function "_" is kept for backwards-compatiblity of
27 // the API, but it is no longer used for anything. You may pass it nil.
28 func NewTokenBucketThrottler(tb *ratelimit.Bucket, _ func(time.Duration)) endpoint.Middleware {
29 return NewDelayingLimiter(NewWaiter(tb))
3530 }
3631
3732 // Allower dictates whether or not a request is acceptable to run.
11
22 import (
33 "context"
4 "math"
54 "strings"
65 "testing"
76 "time"
87
98 jujuratelimit "github.com/juju/ratelimit"
9 "golang.org/x/time/rate"
1010
1111 "github.com/go-kit/kit/endpoint"
1212 "github.com/go-kit/kit/ratelimit"
13 "golang.org/x/time/rate"
1413 )
1514
15 var nopEndpoint = func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil }
16
1617 func TestTokenBucketLimiter(t *testing.T) {
17 e := func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil }
18 for _, n := range []int{1, 2, 100} {
19 tb := jujuratelimit.NewBucketWithRate(float64(n), int64(n))
20 testLimiter(t, ratelimit.NewTokenBucketLimiter(tb)(e), n)
21 }
18 tb := jujuratelimit.NewBucket(time.Minute, 1)
19 testSuccessThenFailure(
20 t,
21 ratelimit.NewTokenBucketLimiter(tb)(nopEndpoint),
22 ratelimit.ErrLimited.Error())
2223 }
2324
2425 func TestTokenBucketThrottler(t *testing.T) {
25 d := time.Duration(0)
26 s := func(d0 time.Duration) { d = d0 }
27
28 e := func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil }
29 e = ratelimit.NewTokenBucketThrottler(jujuratelimit.NewBucketWithRate(1, 1), s)(e)
30
31 // First request should go through with no delay.
32 e(context.Background(), struct{}{})
33 if want, have := time.Duration(0), d; want != have {
34 t.Errorf("want %s, have %s", want, have)
35 }
36
37 // Next request should request a ~1s sleep.
38 e(context.Background(), struct{}{})
39 if want, have, tol := time.Second, d, time.Millisecond; math.Abs(float64(want-have)) > float64(tol) {
40 t.Errorf("want %s, have %s", want, have)
41 }
42 }
43
44 func testLimiter(t *testing.T, e endpoint.Endpoint, rate int) {
45 // First <rate> requests should succeed.
46 for i := 0; i < rate; i++ {
47 if _, err := e(context.Background(), struct{}{}); err != nil {
48 t.Fatalf("rate=%d: request %d/%d failed: %v", rate, i+1, rate, err)
49 }
50 }
51
52 // Next request should fail.
53 if _, err := e(context.Background(), struct{}{}); err != ratelimit.ErrLimited {
54 t.Errorf("rate=%d: want %v, have %v", rate, ratelimit.ErrLimited, err)
55 }
26 tb := jujuratelimit.NewBucket(time.Minute, 1)
27 testSuccessThenFailure(
28 t,
29 ratelimit.NewTokenBucketThrottler(tb, nil)(nopEndpoint),
30 "context deadline exceeded")
5631 }
5732
5833 func TestXRateErroring(t *testing.T) {
5934 limit := rate.NewLimiter(rate.Every(time.Minute), 1)
60 e := func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil }
61 testLimiter(t, ratelimit.NewErroringLimiter(limit)(e), 1)
35 testSuccessThenFailure(
36 t,
37 ratelimit.NewErroringLimiter(limit)(nopEndpoint),
38 ratelimit.ErrLimited.Error())
6239 }
6340
6441 func TestXRateDelaying(t *testing.T) {
6542 limit := rate.NewLimiter(rate.Every(time.Minute), 1)
66 e := func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil }
67 e = ratelimit.NewDelayingLimiter(limit)(e)
43 testSuccessThenFailure(
44 t,
45 ratelimit.NewDelayingLimiter(limit)(nopEndpoint),
46 "exceed context deadline")
47 }
6848
69 _, err := e(context.Background(), struct{}{})
70 if err != nil {
49 func testSuccessThenFailure(t *testing.T, e endpoint.Endpoint, failContains string) {
50 ctx, cxl := context.WithTimeout(context.Background(), 500*time.Millisecond)
51 defer cxl()
52
53 // First request should succeed.
54 if _, err := e(ctx, struct{}{}); err != nil {
7155 t.Errorf("unexpected: %v\n", err)
7256 }
7357
74 dur := 500 * time.Millisecond
75 ctx, cxl := context.WithTimeout(context.Background(), dur)
76 defer cxl()
77
78 _, err = e(ctx, struct{}{})
79 if !strings.Contains(err.Error(), "exceed context deadline") {
80 t.Errorf("expected timeout: %v\n", err)
58 // Next request should fail.
59 if _, err := e(ctx, struct{}{}); !strings.Contains(err.Error(), failContains) {
60 t.Errorf("expected `%s`: %v\n", failContains, err)
8161 }
8262 }