Codebase list golang-github-go-kit-kit / c6bdff6
Merge pull request #57 from go-kit/ratelimit package ratelimit, with token-bucket throttler Peter Bourgon 8 years ago
3 changed file(s) with 186 addition(s) and 13 deletion(s). Raw diff Collapse all Expand all
0 package ratelimit
1
2 import (
3 "errors"
4 "time"
5
6 juju "github.com/juju/ratelimit"
7 "golang.org/x/net/context"
8
9 "github.com/go-kit/kit/endpoint"
10 )
11
12 // ErrLimited is returned in the request path when the rate limiter is
13 // triggered and the request is rejected.
14 var ErrLimited = errors.New("rate limit exceeded")
15
16 // NewTokenBucketLimiter returns an endpoint.Middleware that acts as a rate
17 // limiter based on a token-bucket algorithm. Requests that would exceed the
18 // maximum request rate are simply rejected with an error.
19 func NewTokenBucketLimiter(options ...TokenBucketLimiterOption) endpoint.Middleware {
20 limiter := tokenBucketLimiter{
21 rate: 100,
22 capacity: 100,
23 take: 1,
24 }
25 for _, option := range options {
26 option(&limiter)
27 }
28 tb := juju.NewBucketWithRate(limiter.rate, limiter.capacity)
29 return func(next endpoint.Endpoint) endpoint.Endpoint {
30 return func(ctx context.Context, request interface{}) (interface{}, error) {
31 if tb.TakeAvailable(limiter.take) == 0 {
32 return nil, ErrLimited
33 }
34 return next(ctx, request)
35 }
36 }
37 }
38
39 type tokenBucketLimiter struct {
40 rate float64
41 capacity int64
42 take int64
43 }
44
45 // TokenBucketLimiterOption sets a parameter on the TokenBucketLimiter.
46 type TokenBucketLimiterOption func(*tokenBucketLimiter)
47
48 // TokenBucketLimiterRate sets the rate (per second) at which tokens are
49 // replenished into the bucket. For most use cases, this should be the same as
50 // the capacity. By default, the rate is 100.
51 func TokenBucketLimiterRate(rate float64) TokenBucketLimiterOption {
52 return func(tb *tokenBucketLimiter) { tb.rate = rate }
53 }
54
55 // TokenBucketLimiterCapacity sets the maximum number of tokens that the
56 // bucket will hold. For most use cases, this should be the same as the rate.
57 // By default, the capacity is 100.
58 func TokenBucketLimiterCapacity(capacity int64) TokenBucketLimiterOption {
59 return func(tb *tokenBucketLimiter) { tb.capacity = capacity }
60 }
61
62 // TokenBucketLimiterTake sets the number of tokens that will be taken from
63 // the bucket with each request. By default, this is 1.
64 func TokenBucketLimiterTake(take int64) TokenBucketLimiterOption {
65 return func(tb *tokenBucketLimiter) { tb.take = take }
66 }
67
68 // NewTokenBucketThrottler returns an endpoint.Middleware that acts as a
69 // request throttler based on a token-bucket algorithm. Requests that would
70 // exceed the maximum request rate are delayed via a parameterized sleep
71 // function.
72 func NewTokenBucketThrottler(options ...TokenBucketThrottlerOption) endpoint.Middleware {
73 throttler := tokenBucketThrottler{
74 tokenBucketLimiter: tokenBucketLimiter{
75 rate: 100,
76 capacity: 100,
77 take: 1,
78 },
79 sleep: time.Sleep,
80 }
81 for _, option := range options {
82 option(&throttler)
83 }
84 tb := juju.NewBucketWithRate(throttler.rate, throttler.capacity)
85 return func(next endpoint.Endpoint) endpoint.Endpoint {
86 return func(ctx context.Context, request interface{}) (interface{}, error) {
87 throttler.sleep(tb.Take(throttler.take))
88 return next(ctx, request)
89 }
90 }
91 }
92
93 type tokenBucketThrottler struct {
94 tokenBucketLimiter
95 sleep func(time.Duration)
96 }
97
98 // TokenBucketThrottlerOption sets a parameter on the TokenBucketThrottler.
99 type TokenBucketThrottlerOption func(*tokenBucketThrottler)
100
101 // TokenBucketThrottlerRate sets the rate (per second) at which tokens are
102 // replenished into the bucket. For most use cases, this should be the same as
103 // the capacity. By default, the rate is 100.
104 func TokenBucketThrottlerRate(rate float64) TokenBucketThrottlerOption {
105 return func(tb *tokenBucketThrottler) { tb.rate = rate }
106 }
107
108 // TokenBucketThrottlerCapacity sets the maximum number of tokens that the
109 // bucket will hold. For most use cases, this should be the same as the rate.
110 // By default, the capacity is 100.
111 func TokenBucketThrottlerCapacity(capacity int64) TokenBucketThrottlerOption {
112 return func(tb *tokenBucketThrottler) { tb.capacity = capacity }
113 }
114
115 // TokenBucketThrottlerTake sets the number of tokens that will be taken from
116 // the bucket with each request. By default, this is 1.
117 func TokenBucketThrottlerTake(take int64) TokenBucketThrottlerOption {
118 return func(tb *tokenBucketThrottler) { tb.take = take }
119 }
120
121 // TokenBucketThrottlerSleep sets the sleep function that's invoked to
122 // throttle requests. By default, it's time.Sleep.
123 func TokenBucketThrottlerSleep(sleep func(time.Duration)) TokenBucketThrottlerOption {
124 return func(tb *tokenBucketThrottler) { tb.sleep = sleep }
125 }
0 package ratelimit_test
1
2 import (
3 "math"
4 "testing"
5 "time"
6
7 "golang.org/x/net/context"
8
9 "github.com/go-kit/kit/endpoint"
10 "github.com/go-kit/kit/ratelimit"
11 )
12
13 func TestTokenBucketLimiter(t *testing.T) {
14 e := func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil }
15 for _, n := range []int{1, 2, 100} {
16 testLimiter(t, ratelimit.NewTokenBucketLimiter(
17 ratelimit.TokenBucketLimiterRate(float64(n)),
18 ratelimit.TokenBucketLimiterCapacity(int64(n)),
19 )(e), int(n))
20 }
21 }
22
23 func TestTokenBucketThrottler(t *testing.T) {
24 d := time.Duration(0)
25 s := func(d0 time.Duration) { d = d0 }
26
27 e := func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil }
28 e = ratelimit.NewTokenBucketThrottler(
29 ratelimit.TokenBucketThrottlerRate(1),
30 ratelimit.TokenBucketThrottlerCapacity(1),
31 ratelimit.TokenBucketThrottlerSleep(s),
32 )(e)
33
34 // First request should go through with no delay.
35 e(context.Background(), struct{}{})
36 if want, have := time.Duration(0), d; want != have {
37 t.Errorf("want %s, have %s", want, have)
38 }
39
40 // Next request should request a ~1s sleep.
41 e(context.Background(), struct{}{})
42 if want, have, tol := time.Second, d, time.Millisecond; math.Abs(float64(want-have)) > float64(tol) {
43 t.Errorf("want %s, have %s", want, have)
44 }
45 }
46
47 func testLimiter(t *testing.T, e endpoint.Endpoint, rate int) {
48 // First <rate> requests should succeed.
49 for i := 0; i < rate; i++ {
50 if _, err := e(context.Background(), struct{}{}); err != nil {
51 t.Fatalf("rate=%d: request %d/%d failed: %v", rate, i+1, rate, err)
52 }
53 }
54
55 // Next request should fail.
56 if _, err := e(context.Background(), struct{}{}); err != ratelimit.ErrLimited {
57 t.Errorf("rate=%d: want %v, have %v", rate, ratelimit.ErrLimited, err)
58 }
59 }
+0
-13
server/README.md less more
0 # package server
1
2 `package server` is a very small package that collects interfaces used by services.
3 Most server-side functionality is actually implemented by surrounding packages.
4
5 # Rationale
6
7 TODO
8
9 # Usage
10
11 As currently defined, you shouldn't need to use `package server` directly.
12 Other gokit components integrate on `package server` interfaces.