Package list golang-github-go-kit-kit / 9715034
More generic API to work with x/time/rate Nelz 4 years ago
2 changed file(s) with 110 addition(s) and 7 deletion(s). Raw diff Collapse all Expand all
1717 // limiter based on a token-bucket algorithm. Requests that would exceed the
1818 // maximum request rate are simply rejected with an error.
1919 func NewTokenBucketLimiter(tb *ratelimit.Bucket) endpoint.Middleware {
20 return NewErroringLimiter(NewAllower(tb))
21 }
22
23 // NewTokenBucketThrottler returns an endpoint.Middleware that acts as a
24 // request throttler based on a token-bucket algorithm. Requests that would
25 // exceed the maximum request rate are delayed via the parameterized sleep
26 // function. By default you may pass time.Sleep.
27 func NewTokenBucketThrottler(tb *ratelimit.Bucket, sleep func(time.Duration)) endpoint.Middleware {
28 // return NewDelayingLimiter(NewWaiter(tb))
2029 return func(next endpoint.Endpoint) endpoint.Endpoint {
2130 return func(ctx context.Context, request interface{}) (interface{}, error) {
22 if tb.TakeAvailable(1) == 0 {
31 sleep(tb.Take(1))
32 return next(ctx, request)
33 }
34 }
35 }
36
37 // Allower dictates whether or not a request is acceptable to run.
38 // The Limiter from "golang.org/x/time/rate" already implements this interface,
39 // one is able to use that in NewErroringLimiter without any modifications.
40 type Allower interface {
41 Allow() bool
42 }
43
44 // NewErroringLimiter returns an endpoint.Middleware that acts as a rate
45 // limiter. Requests that would exceed the
46 // maximum request rate are simply rejected with an error.
47 func NewErroringLimiter(limit Allower) endpoint.Middleware {
48 return func(next endpoint.Endpoint) endpoint.Endpoint {
49 return func(ctx context.Context, request interface{}) (interface{}, error) {
50 if !limit.Allow() {
2351 return nil, ErrLimited
2452 }
2553 return next(ctx, request)
2755 }
2856 }
2957
30 // NewTokenBucketThrottler returns an endpoint.Middleware that acts as a
31 // request throttler based on a token-bucket algorithm. Requests that would
32 // exceed the maximum request rate are delayed via the parameterized sleep
33 // function. By default you may pass time.Sleep.
34 func NewTokenBucketThrottler(tb *ratelimit.Bucket, sleep func(time.Duration)) endpoint.Middleware {
58 // Waiter dictates how long a request must be delayed.
59 // The Limiter from "golang.org/x/time/rate" already implements this interface,
60 // one is able to use that in NewDelayingLimiter without any modifications.
61 type Waiter interface {
62 Wait(ctx context.Context) error
63 }
64
65 // NewDelayingLimiter returns an endpoint.Middleware that acts as a
66 // request throttler. Requests that would
67 // exceed the maximum request rate are delayed via the Waiter function
68 func NewDelayingLimiter(limit Waiter) endpoint.Middleware {
3569 return func(next endpoint.Endpoint) endpoint.Endpoint {
3670 return func(ctx context.Context, request interface{}) (interface{}, error) {
37 sleep(tb.Take(1))
71 if err := limit.Wait(ctx); err != nil {
72 return nil, err
73 }
3874 return next(ctx, request)
3975 }
4076 }
4177 }
78
79 // AllowerFunc is an adapter that lets a function operate as if
80 // it implements Allower
81 type AllowerFunc func() bool
82
83 // Allow makes the adapter implement Allower
84 func (f AllowerFunc) Allow() bool {
85 return f()
86 }
87
88 // NewAllower turns an existing ratelimit.Bucket into an API-compatible form
89 func NewAllower(tb *ratelimit.Bucket) Allower {
90 return AllowerFunc(func() bool {
91 return (tb.TakeAvailable(1) != 0)
92 })
93 }
94
95 // WaiterFunc is an adapter that lets a function operate as if
96 // it implements Waiter
97 type WaiterFunc func(ctx context.Context) error
98
99 // Wait makes the adapter implement Waiter
100 func (f WaiterFunc) Wait(ctx context.Context) error {
101 return f(ctx)
102 }
103
104 // NewWaiter turns an existing ratelimit.Bucket into an API-compatible form
105 func NewWaiter(tb *ratelimit.Bucket) Waiter {
106 return WaiterFunc(func(ctx context.Context) error {
107 dur := tb.Take(1)
108 select {
109 case <-ctx.Done():
110 return ctx.Err()
111 case <-time.After(dur):
112 // happy path
113 }
114 return nil
115 })
116 }
22 import (
33 "context"
44 "math"
5 "strings"
56 "testing"
67 "time"
78
910
1011 "github.com/go-kit/kit/endpoint"
1112 "github.com/go-kit/kit/ratelimit"
13 "golang.org/x/time/rate"
1214 )
1315
1416 func TestTokenBucketLimiter(t *testing.T) {
5254 t.Errorf("rate=%d: want %v, have %v", rate, ratelimit.ErrLimited, err)
5355 }
5456 }
57
58 func TestXRateErroring(t *testing.T) {
59 limit := rate.NewLimiter(rate.Every(time.Minute), 1)
60 e := func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil }
61 testLimiter(t, ratelimit.NewErroringLimiter(limit)(e), 1)
62 }
63
64 func TestXRateDelaying(t *testing.T) {
65 limit := rate.NewLimiter(rate.Every(time.Minute), 1)
66 e := func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil }
67 e = ratelimit.NewDelayingLimiter(limit)(e)
68
69 _, err := e(context.Background(), struct{}{})
70 if err != nil {
71 t.Errorf("unexpected: %v\n", err)
72 }
73
74 dur := 500 * time.Millisecond
75 ctx, cxl := context.WithTimeout(context.Background(), dur)
76 defer cxl()
77
78 _, err = e(ctx, struct{}{})
79 if !strings.Contains(err.Error(), "exceed context deadline") {
80 t.Errorf("expected timeout: %v\n", err)
81 }
82 }