Codebase list golang-github-throttled-throttled / 3a0d7fc8-94b5-4d0d-978b-fb8511a58da7/main rate.go
3a0d7fc8-94b5-4d0d-978b-fb8511a58da7/main

Tree @3a0d7fc8-94b5-4d0d-978b-fb8511a58da7/main (Download .tar.gz)

rate.go @3a0d7fc8-94b5-4d0d-978b-fb8511a58da7/mainraw · history · blame

package throttled

import (
	"fmt"
	"time"
)

const (
	// Maximum number of times to retry SetIfNotExists/CompareAndSwap operations
	// before returning an error.
	maxCASAttempts = 10
)

// A RateLimiter manages limiting the rate of actions by key.
type RateLimiter interface {
	// RateLimit checks whether a particular key has exceeded a rate
	// limit. It also returns a RateLimitResult to provide additional
	// information about the state of the RateLimiter.
	//
	// If the rate limit has not been exceeded, the underlying storage
	// is updated by the supplied quantity. For example, a quantity of
	// 1 might be used to rate limit a single request while a greater
	// quantity could rate limit based on the size of a file upload in
	// megabytes. If quantity is 0, no update is performed allowing
	// you to "peek" at the state of the RateLimiter for a given key.
	RateLimit(key string, quantity int) (bool, RateLimitResult, error)
}

// RateLimitResult represents the state of the RateLimiter for a
// given key at the time of the query. This state can be used, for
// example, to communicate information to the client via HTTP
// headers. Negative values indicate that the attribute is not
// relevant to the implementation or state.
type RateLimitResult struct {
	// Limit is the maximum number of requests that could be permitted
	// instantaneously for this key starting from an empty state. For
	// example, if a rate limiter allows 10 requests per second per
	// key, Limit would always be 10.
	Limit int

	// Remaining is the maximum number of requests that could be
	// permitted instantaneously for this key given the current
	// state. For example, if a rate limiter allows 10 requests per
	// second and has already received 6 requests for this key this
	// second, Remaining would be 4.
	Remaining int

	// ResetAfter is the time until the RateLimiter returns to its
	// initial state for a given key. For example, if a rate limiter
	// manages requests per second and received one request 200ms ago,
	// Reset would return 800ms. You can also think of this as the time
	// until Limit and Remaining will be equal.
	ResetAfter time.Duration

	// RetryAfter is the time until the next request will be permitted.
	// It should be -1 unless the rate limit has been exceeded.
	RetryAfter time.Duration
}

type limitResult struct {
	limited bool
}

func (r *limitResult) Limited() bool { return r.limited }

type rateLimitResult struct {
	limitResult

	limit, remaining  int
	reset, retryAfter time.Duration
}

func (r *rateLimitResult) Limit() int                { return r.limit }
func (r *rateLimitResult) Remaining() int            { return r.remaining }
func (r *rateLimitResult) Reset() time.Duration      { return r.reset }
func (r *rateLimitResult) RetryAfter() time.Duration { return r.retryAfter }

// Rate describes a frequency of an activity such as the number of requests
// allowed per minute.
type Rate struct {
	period time.Duration // Time between equally spaced requests at the rate
	count  int           // Used internally for deprecated `RateLimit` interface only
}

// RateQuota describes the number of requests allowed per time period.
// MaxRate specified the maximum sustained rate of requests and must
// be greater than zero. MaxBurst defines the number of requests that
// will be allowed to exceed the rate in a single burst and must be
// greater than or equal to zero.
//
// Rate{PerSec(1), 0} would mean that after each request, no more
// requests will be permitted for that client for one second.
// Rate{PerSec(2), 0} permits one request per 0.5 seconds rather than
// two requests in one second. In practice, you probably want to set
// MaxBurst >0 to provide some flexibility to clients that only need
// to make a handful of requests. In fact a MaxBurst of zero will
// *never* permit a request with a quantity greater than one because
// it will immediately exceed the limit.
type RateQuota struct {
	MaxRate  Rate
	MaxBurst int
}

// PerSec represents a number of requests per second.
func PerSec(n int) Rate { return Rate{time.Second / time.Duration(n), n} }

// PerMin represents a number of requests per minute.
func PerMin(n int) Rate { return Rate{time.Minute / time.Duration(n), n} }

// PerHour represents a number of requests per hour.
func PerHour(n int) Rate { return Rate{time.Hour / time.Duration(n), n} }

// PerDay represents a number of requests per day.
func PerDay(n int) Rate { return Rate{24 * time.Hour / time.Duration(n), n} }

// GCRARateLimiter is a RateLimiter that users the generic cell-rate
// algorithm. The algorithm has been slightly modified from its usual
// form to support limiting with an additional quantity parameter, such
// as for limiting the number of bytes uploaded.
type GCRARateLimiter struct {
	limit int

	// Think of the DVT as our flexibility:
	// How far can you deviate from the nominal equally spaced schedule?
	// If you like leaky buckets, think about it as the size of your bucket.
	delayVariationTolerance time.Duration

	// Think of the emission interval as the time between events
	// in the nominal equally spaced schedule. If you like leaky buckets,
	// think of it as how frequently the bucket leaks one unit.
	emissionInterval time.Duration

	store GCRAStore
}

// NewGCRARateLimiter creates a GCRARateLimiter. quota.Count defines
// the maximum number of requests permitted in an instantaneous burst
// and quota.Count / quota.Period defines the maximum sustained
// rate. For example, PerMin(60) permits 60 requests instantly per key
// followed by one request per second indefinitely whereas PerSec(1)
// only permits one request per second with no tolerance for bursts.
func NewGCRARateLimiter(st GCRAStore, quota RateQuota) (*GCRARateLimiter, error) {
	if quota.MaxBurst < 0 {
		return nil, fmt.Errorf("Invalid RateQuota %#v. MaxBurst must be greater than zero.", quota)
	}
	if quota.MaxRate.period <= 0 {
		return nil, fmt.Errorf("Invalid RateQuota %#v. MaxRate must be greater than zero.", quota)
	}

	return &GCRARateLimiter{
		delayVariationTolerance: quota.MaxRate.period * (time.Duration(quota.MaxBurst) + 1),
		emissionInterval:        quota.MaxRate.period,
		limit:                   quota.MaxBurst + 1,
		store:                   st,
	}, nil
}

// RateLimit checks whether a particular key has exceeded a rate
// limit. It also returns a RateLimitResult to provide additional
// information about the state of the RateLimiter.
//
// If the rate limit has not been exceeded, the underlying storage is
// updated by the supplied quantity. For example, a quantity of 1
// might be used to rate limit a single request while a greater
// quantity could rate limit based on the size of a file upload in
// megabytes. If quantity is 0, no update is performed allowing you
// to "peek" at the state of the RateLimiter for a given key.
func (g *GCRARateLimiter) RateLimit(key string, quantity int) (bool, RateLimitResult, error) {
	var tat, newTat, now time.Time
	var ttl time.Duration
	rlc := RateLimitResult{Limit: g.limit, RetryAfter: -1}
	limited := false

	i := 0
	for {
		var err error
		var tatVal int64
		var updated bool

		// tat refers to the theoretical arrival time that would be expected
		// from equally spaced requests at exactly the rate limit.
		tatVal, now, err = g.store.GetWithTime(key)
		if err != nil {
			return false, rlc, err
		}

		if tatVal == -1 {
			tat = now
		} else {
			tat = time.Unix(0, tatVal)
		}

		increment := time.Duration(quantity) * g.emissionInterval
		if now.After(tat) {
			newTat = now.Add(increment)
		} else {
			newTat = tat.Add(increment)
		}

		// Block the request if the next permitted time is in the future
		allowAt := newTat.Add(-(g.delayVariationTolerance))
		if diff := now.Sub(allowAt); diff < 0 {
			if increment <= g.delayVariationTolerance {
				rlc.RetryAfter = -diff
			}
			ttl = tat.Sub(now)
			limited = true
			break
		}

		ttl = newTat.Sub(now)

		if tatVal == -1 {
			updated, err = g.store.SetIfNotExistsWithTTL(key, newTat.UnixNano(), ttl)
		} else {
			updated, err = g.store.CompareAndSwapWithTTL(key, tatVal, newTat.UnixNano(), ttl)
		}

		if err != nil {
			return false, rlc, err
		}
		if updated {
			break
		}

		i++
		if i > maxCASAttempts {
			return false, rlc, fmt.Errorf(
				"Failed to store updated rate limit data for key %s after %d attempts",
				key, i,
			)
		}
	}

	next := g.delayVariationTolerance - ttl
	if next > -g.emissionInterval {
		rlc.Remaining = int(next / g.emissionInterval)
	}
	rlc.ResetAfter = ttl

	return limited, rlc, nil
}