2 | 2 |
import (
|
3 | 3 |
"context"
|
4 | 4 |
"errors"
|
5 | |
"time"
|
6 | |
|
7 | |
"github.com/juju/ratelimit"
|
8 | 5 |
|
9 | 6 |
"github.com/go-kit/kit/endpoint"
|
10 | 7 |
)
|
|
12 | 9 |
// ErrLimited is returned in the request path when the rate limiter is
|
13 | 10 |
// triggered and the request is rejected.
|
14 | 11 |
var ErrLimited = errors.New("rate limit exceeded")
|
15 | |
|
16 | |
// NewTokenBucketLimiter returns an endpoint.Middleware that acts as a rate
|
17 | |
// limiter based on a token-bucket algorithm. Requests that would exceed the
|
18 | |
// maximum request rate are simply rejected with an error.
|
19 | |
func NewTokenBucketLimiter(tb *ratelimit.Bucket) endpoint.Middleware {
|
20 | |
return NewErroringLimiter(NewAllower(tb))
|
21 | |
}
|
22 | |
|
23 | |
// NewTokenBucketThrottler returns an endpoint.Middleware that acts as a
|
24 | |
// request throttler based on a token-bucket algorithm. Requests that would
|
25 | |
// exceed the maximum request rate are delayed.
|
26 | |
// The parameterized function "_" is kept for backwards-compatiblity of
|
27 | |
// the API, but it is no longer used for anything. You may pass it nil.
|
28 | |
func NewTokenBucketThrottler(tb *ratelimit.Bucket, _ func(time.Duration)) endpoint.Middleware {
|
29 | |
return NewDelayingLimiter(NewWaiter(tb))
|
30 | |
}
|
31 | 12 |
|
32 | 13 |
// Allower dictates whether or not a request is acceptable to run.
|
33 | 14 |
// The Limiter from "golang.org/x/time/rate" already implements this interface,
|
|
80 | 61 |
return f()
|
81 | 62 |
}
|
82 | 63 |
|
83 | |
// NewAllower turns an existing ratelimit.Bucket into an API-compatible form
|
84 | |
func NewAllower(tb *ratelimit.Bucket) Allower {
|
85 | |
return AllowerFunc(func() bool {
|
86 | |
return (tb.TakeAvailable(1) != 0)
|
87 | |
})
|
88 | |
}
|
89 | |
|
90 | 64 |
// WaiterFunc is an adapter that lets a function operate as if
|
91 | 65 |
// it implements Waiter
|
92 | 66 |
type WaiterFunc func(ctx context.Context) error
|
|
95 | 69 |
func (f WaiterFunc) Wait(ctx context.Context) error {
|
96 | 70 |
return f(ctx)
|
97 | 71 |
}
|
98 | |
|
99 | |
// NewWaiter turns an existing ratelimit.Bucket into an API-compatible form
|
100 | |
func NewWaiter(tb *ratelimit.Bucket) Waiter {
|
101 | |
return WaiterFunc(func(ctx context.Context) error {
|
102 | |
dur := tb.Take(1)
|
103 | |
select {
|
104 | |
case <-ctx.Done():
|
105 | |
return ctx.Err()
|
106 | |
case <-time.After(dur):
|
107 | |
// happy path
|
108 | |
}
|
109 | |
return nil
|
110 | |
})
|
111 | |
}
|