Codebase list golang-github-go-kit-kit / cde6434
ratelimit: pass TokenBucket directly Peter Bourgon 8 years ago
2 changed file(s) with 11 addition(s) and 100 deletion(s). Raw diff Collapse all Expand all
33 "errors"
44 "time"
55
6 juju "github.com/juju/ratelimit"
6 "github.com/juju/ratelimit"
77 "golang.org/x/net/context"
88
99 "github.com/go-kit/kit/endpoint"
1616 // NewTokenBucketLimiter returns an endpoint.Middleware that acts as a rate
1717 // limiter based on a token-bucket algorithm. Requests that would exceed the
1818 // maximum request rate are simply rejected with an error.
19 func NewTokenBucketLimiter(options ...TokenBucketLimiterOption) endpoint.Middleware {
20 limiter := tokenBucketLimiter{
21 rate: 100,
22 capacity: 100,
23 take: 1,
24 }
25 for _, option := range options {
26 option(&limiter)
27 }
28 tb := juju.NewBucketWithRate(limiter.rate, limiter.capacity)
19 func NewTokenBucketLimiter(tb *ratelimit.Bucket) endpoint.Middleware {
2920 return func(next endpoint.Endpoint) endpoint.Endpoint {
3021 return func(ctx context.Context, request interface{}) (interface{}, error) {
31 if tb.TakeAvailable(limiter.take) == 0 {
22 if tb.TakeAvailable(1) == 0 {
3223 return nil, ErrLimited
3324 }
3425 return next(ctx, request)
3627 }
3728 }
3829
39 type tokenBucketLimiter struct {
40 rate float64
41 capacity int64
42 take int64
43 }
44
45 // TokenBucketLimiterOption sets a parameter on the TokenBucketLimiter.
46 type TokenBucketLimiterOption func(*tokenBucketLimiter)
47
48 // TokenBucketLimiterRate sets the rate (per second) at which tokens are
49 // replenished into the bucket. For most use cases, this should be the same as
50 // the capacity. By default, the rate is 100.
51 func TokenBucketLimiterRate(rate float64) TokenBucketLimiterOption {
52 return func(tb *tokenBucketLimiter) { tb.rate = rate }
53 }
54
55 // TokenBucketLimiterCapacity sets the maximum number of tokens that the
56 // bucket will hold. For most use cases, this should be the same as the rate.
57 // By default, the capacity is 100.
58 func TokenBucketLimiterCapacity(capacity int64) TokenBucketLimiterOption {
59 return func(tb *tokenBucketLimiter) { tb.capacity = capacity }
60 }
61
62 // TokenBucketLimiterTake sets the number of tokens that will be taken from
63 // the bucket with each request. By default, this is 1.
64 func TokenBucketLimiterTake(take int64) TokenBucketLimiterOption {
65 return func(tb *tokenBucketLimiter) { tb.take = take }
66 }
67
6830 // NewTokenBucketThrottler returns an endpoint.Middleware that acts as a
6931 // request throttler based on a token-bucket algorithm. Requests that would
70 // exceed the maximum request rate are delayed via a parameterized sleep
71 // function.
72 func NewTokenBucketThrottler(options ...TokenBucketThrottlerOption) endpoint.Middleware {
73 throttler := tokenBucketThrottler{
74 tokenBucketLimiter: tokenBucketLimiter{
75 rate: 100,
76 capacity: 100,
77 take: 1,
78 },
79 sleep: time.Sleep,
80 }
81 for _, option := range options {
82 option(&throttler)
83 }
84 tb := juju.NewBucketWithRate(throttler.rate, throttler.capacity)
32 // exceed the maximum request rate are delayed via the parameterized sleep
33 // function. By default you may pass time.Sleep.
34 func NewTokenBucketThrottler(tb *ratelimit.Bucket, sleep func(time.Duration)) endpoint.Middleware {
8535 return func(next endpoint.Endpoint) endpoint.Endpoint {
8636 return func(ctx context.Context, request interface{}) (interface{}, error) {
87 throttler.sleep(tb.Take(throttler.take))
37 sleep(tb.Take(1))
8838 return next(ctx, request)
8939 }
9040 }
9141 }
92
93 type tokenBucketThrottler struct {
94 tokenBucketLimiter
95 sleep func(time.Duration)
96 }
97
98 // TokenBucketThrottlerOption sets a parameter on the TokenBucketThrottler.
99 type TokenBucketThrottlerOption func(*tokenBucketThrottler)
100
101 // TokenBucketThrottlerRate sets the rate (per second) at which tokens are
102 // replenished into the bucket. For most use cases, this should be the same as
103 // the capacity. By default, the rate is 100.
104 func TokenBucketThrottlerRate(rate float64) TokenBucketThrottlerOption {
105 return func(tb *tokenBucketThrottler) { tb.rate = rate }
106 }
107
108 // TokenBucketThrottlerCapacity sets the maximum number of tokens that the
109 // bucket will hold. For most use cases, this should be the same as the rate.
110 // By default, the capacity is 100.
111 func TokenBucketThrottlerCapacity(capacity int64) TokenBucketThrottlerOption {
112 return func(tb *tokenBucketThrottler) { tb.capacity = capacity }
113 }
114
115 // TokenBucketThrottlerTake sets the number of tokens that will be taken from
116 // the bucket with each request. By default, this is 1.
117 func TokenBucketThrottlerTake(take int64) TokenBucketThrottlerOption {
118 return func(tb *tokenBucketThrottler) { tb.take = take }
119 }
120
121 // TokenBucketThrottlerSleep sets the sleep function that's invoked to
122 // throttle requests. By default, it's time.Sleep.
123 func TokenBucketThrottlerSleep(sleep func(time.Duration)) TokenBucketThrottlerOption {
124 return func(tb *tokenBucketThrottler) { tb.sleep = sleep }
125 }
44 "testing"
55 "time"
66
7 jujuratelimit "github.com/juju/ratelimit"
78 "golang.org/x/net/context"
89
910 "github.com/go-kit/kit/endpoint"
1314 func TestTokenBucketLimiter(t *testing.T) {
1415 e := func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil }
1516 for _, n := range []int{1, 2, 100} {
16 testLimiter(t, ratelimit.NewTokenBucketLimiter(
17 ratelimit.TokenBucketLimiterRate(float64(n)),
18 ratelimit.TokenBucketLimiterCapacity(int64(n)),
19 )(e), int(n))
17 tb := jujuratelimit.NewBucketWithRate(float64(n), int64(n))
18 testLimiter(t, ratelimit.NewTokenBucketLimiter(tb)(e), n)
2019 }
2120 }
2221
2524 s := func(d0 time.Duration) { d = d0 }
2625
2726 e := func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil }
28 e = ratelimit.NewTokenBucketThrottler(
29 ratelimit.TokenBucketThrottlerRate(1),
30 ratelimit.TokenBucketThrottlerCapacity(1),
31 ratelimit.TokenBucketThrottlerSleep(s),
32 )(e)
27 e = ratelimit.NewTokenBucketThrottler(jujuratelimit.NewBucketWithRate(1, 1), s)(e)
3328
3429 // First request should go through with no delay.
3530 e(context.Background(), struct{}{})