Codebase list golang-github-go-kit-kit / 57c36a7
tsenart/tb -> juju/ratelimit Peter Bourgon 8 years ago
2 changed file(s) with 134 addition(s) and 44 deletion(s). Raw diff Collapse all Expand all
33 "errors"
44 "time"
55
6 "github.com/tsenart/tb"
6 juju "github.com/juju/ratelimit"
77 "golang.org/x/net/context"
88
99 "github.com/go-kit/kit/endpoint"
1010 )
1111
12 // ErrThrottled is returned in the request path when the rate limiter is
12 // ErrLimited is returned in the request path when the rate limiter is
1313 // triggered and the request is rejected.
14 var ErrThrottled = errors.New("throttled")
14 var ErrLimited = errors.New("rate limit exceeded")
1515
16 // NewTokenBucketThrottler returns an endpoint.Middleware that acts as a rate
17 // limiter based on a "token-bucket" algorithm. Requests that would exceed the
18 // maximum request rate are rejected with an error.
19 func NewTokenBucketThrottler(options ...TokenBucketOption) endpoint.Middleware {
20 t := tokenBucketThrottler{
21 freq: 100 * time.Millisecond,
22 key: "",
23 rate: 100,
24 take: 1,
16 // NewTokenBucketLimiter returns an endpoint.Middleware that acts as a rate
17 // limiter based on a token-bucket algorithm. Requests that would exceed the
18 // maximum request rate are simply rejected with an error.
19 func NewTokenBucketLimiter(options ...TokenBucketLimiterOption) endpoint.Middleware {
20 limiter := tokenBucketLimiter{
21 rate: 100,
22 capacity: 100,
23 take: 1,
2524 }
2625 for _, option := range options {
27 option(&t)
26 option(&limiter)
2827 }
29 throttler := tb.NewThrottler(t.freq)
28 tb := juju.NewBucketWithRate(limiter.rate, limiter.capacity)
3029 return func(next endpoint.Endpoint) endpoint.Endpoint {
3130 return func(ctx context.Context, request interface{}) (interface{}, error) {
32 if throttler.Halt(t.key, t.take, t.rate) {
33 return nil, ErrThrottled
31 if tb.TakeAvailable(limiter.take) == 0 {
32 return nil, ErrLimited
3433 }
3534 return next(ctx, request)
3635 }
3736 }
3837 }
3938
40 type tokenBucketThrottler struct {
41 freq time.Duration
42 key string
43 rate int64
44 take int64
39 type tokenBucketLimiter struct {
40 rate float64
41 capacity int64
42 take int64
4543 }
4644
47 // TokenBucketOption sets an option on the token bucket throttler.
48 type TokenBucketOption func(*tokenBucketThrottler)
45 // TokenBucketLimiterOption sets a parameter on the TokenBucketLimiter.
46 type TokenBucketLimiterOption func(*tokenBucketLimiter)
4947
50 // TokenBucketFillFrequency sets the interval at which tokens are replenished
51 // into the bucket. By default, it's 100 milliseconds.
52 func TokenBucketFillFrequency(freq time.Duration) TokenBucketOption {
53 return func(t *tokenBucketThrottler) { t.freq = freq }
48 // TokenBucketLimiterRate sets the rate (per second) at which tokens are
49 // replenished into the bucket. For most use cases, this should be the same as
50 // the capacity. By default, the rate is 100.
51 func TokenBucketLimiterRate(rate float64) TokenBucketLimiterOption {
52 return func(tb *tokenBucketLimiter) { tb.rate = rate }
5453 }
5554
56 // TokenBucketMaxRate sets the maximum allowed request rate.
57 // By default, it's 100.
58 func TokenBucketMaxRate(rate int64) TokenBucketOption {
59 return func(t *tokenBucketThrottler) { t.rate = rate }
55 // TokenBucketLimiterCapacity sets the maximum number of tokens that the
56 // bucket will hold. For most use cases, this should be the same as the rate.
57 // By default, the capacity is 100.
58 func TokenBucketLimiterCapacity(capacity int64) TokenBucketLimiterOption {
59 return func(tb *tokenBucketLimiter) { tb.capacity = capacity }
6060 }
6161
62 // TokenBucketTake sets the number of tokens taken with each request.
63 // By default, it's 1.
64 func TokenBucketTake(take int64) TokenBucketOption {
65 return func(t *tokenBucketThrottler) { t.take = take }
62 // TokenBucketLimiterTake sets the number of tokens that will be taken from
63 // the bucket with each request. By default, this is 1.
64 func TokenBucketLimiterTake(take int64) TokenBucketLimiterOption {
65 return func(tb *tokenBucketLimiter) { tb.take = take }
6666 }
67
68 // NewTokenBucketThrottler returns an endpoint.Middleware that acts as a
69 // request throttler based on a token-bucket algorithm. Requests that would
70 // exceed the maximum request rate are delayed via a parameterized sleep
71 // function.
72 func NewTokenBucketThrottler(options ...TokenBucketThrottlerOption) endpoint.Middleware {
73 throttler := tokenBucketThrottler{
74 tokenBucketLimiter: tokenBucketLimiter{
75 rate: 100,
76 capacity: 100,
77 take: 1,
78 },
79 sleep: time.Sleep,
80 }
81 for _, option := range options {
82 option(&throttler)
83 }
84 tb := juju.NewBucketWithRate(throttler.rate, throttler.capacity)
85 return func(next endpoint.Endpoint) endpoint.Endpoint {
86 return func(ctx context.Context, request interface{}) (interface{}, error) {
87 throttler.sleep(tb.Take(throttler.take))
88 return next(ctx, request)
89 }
90 }
91 }
92
93 type tokenBucketThrottler struct {
94 tokenBucketLimiter
95 sleep func(time.Duration)
96 }
97
98 // TokenBucketThrottlerOption sets a parameter on the TokenBucketThrottler.
99 type TokenBucketThrottlerOption func(*tokenBucketThrottler)
100
101 // TokenBucketThrottlerRate sets the rate (per second) at which tokens are
102 // replenished into the bucket. For most use cases, this should be the same as
103 // the capacity. By default, the rate is 100.
104 func TokenBucketThrottlerRate(rate float64) TokenBucketThrottlerOption {
105 return func(tb *tokenBucketThrottler) { tb.rate = rate }
106 }
107
108 // TokenBucketThrottlerCapacity sets the maximum number of tokens that the
109 // bucket will hold. For most use cases, this should be the same as the rate.
110 // By default, the capacity is 100.
111 func TokenBucketThrottlerCapacity(capacity int64) TokenBucketThrottlerOption {
112 return func(tb *tokenBucketThrottler) { tb.capacity = capacity }
113 }
114
115 // TokenBucketThrottlerTake sets the number of tokens that will be taken from
116 // the bucket with each request. By default, this is 1.
117 func TokenBucketThrottlerTake(take int64) TokenBucketThrottlerOption {
118 return func(tb *tokenBucketThrottler) { tb.take = take }
119 }
120
121 // TokenBucketThrottlerSleep sets the sleep function that's invoked to
122 // throttle requests. By default, it's time.Sleep.
123 func TokenBucketThrottlerSleep(sleep func(time.Duration)) TokenBucketThrottlerOption {
124 return func(tb *tokenBucketThrottler) { tb.sleep = sleep }
125 }
00 package ratelimit_test
11
22 import (
3 "math"
34 "testing"
5 "time"
46
57 "golang.org/x/net/context"
68
810 "github.com/go-kit/kit/ratelimit"
911 )
1012
11 func TestTokenBucketThrottler(t *testing.T) {
13 func TestTokenBucketLimiter(t *testing.T) {
1214 e := func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil }
13 testRateLimit(t, ratelimit.NewTokenBucketThrottler(ratelimit.TokenBucketMaxRate(0))(e), 0) // all fail
14 testRateLimit(t, ratelimit.NewTokenBucketThrottler(ratelimit.TokenBucketMaxRate(1))(e), 1) // first pass
15 testRateLimit(t, ratelimit.NewTokenBucketThrottler(ratelimit.TokenBucketMaxRate(100))(e), 100) // 100 pass
15 for _, n := range []int{1, 2, 100} {
16 testLimiter(t, ratelimit.NewTokenBucketLimiter(
17 ratelimit.TokenBucketLimiterRate(float64(n)),
18 ratelimit.TokenBucketLimiterCapacity(int64(n)),
19 )(e), int(n))
20 }
1621 }
1722
18 func testRateLimit(t *testing.T, e endpoint.Endpoint, rate int) {
19 ctx := context.Background()
23 func TestTokenBucketThrottler(t *testing.T) {
24 d := time.Duration(0)
25 s := func(d0 time.Duration) { d = d0 }
26
27 e := func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil }
28 e = ratelimit.NewTokenBucketThrottler(
29 ratelimit.TokenBucketThrottlerRate(1),
30 ratelimit.TokenBucketThrottlerCapacity(1),
31 ratelimit.TokenBucketThrottlerSleep(s),
32 )(e)
33
34 // First request should go through with no delay.
35 e(context.Background(), struct{}{})
36 if want, have := time.Duration(0), d; want != have {
37 t.Errorf("want %s, have %s", want, have)
38 }
39
40 // Next request should request a ~1s sleep.
41 e(context.Background(), struct{}{})
42 if want, have, tol := time.Second, d, time.Millisecond; math.Abs(float64(want-have)) > float64(tol) {
43 t.Errorf("want %s, have %s", want, have)
44 }
45 }
46
47 func testLimiter(t *testing.T, e endpoint.Endpoint, rate int) {
48 // First <rate> requests should succeed.
2049 for i := 0; i < rate; i++ {
21 if _, err := e(ctx, struct{}{}); err != nil {
50 if _, err := e(context.Background(), struct{}{}); err != nil {
2251 t.Fatalf("rate=%d: request %d/%d failed: %v", rate, i+1, rate, err)
2352 }
2453 }
25 if _, err := e(ctx, struct{}{}); err != ratelimit.ErrThrottled {
26 t.Errorf("rate=%d: want %v, have %v", rate, ratelimit.ErrThrottled, err)
54
55 // Next request should fail.
56 if _, err := e(context.Background(), struct{}{}); err != ratelimit.ErrLimited {
57 t.Errorf("rate=%d: want %v, have %v", rate, ratelimit.ErrLimited, err)
2758 }
2859 }