package ttlcache
import (
"sync"
"time"
"golang.org/x/sync/singleflight"
)
// CheckExpireCallback is used as a callback for an external check on item expiration
type CheckExpireCallback func(key string, value interface{}) bool
// ExpireCallback is used as a callback on item expiration or when notifying of an item new to the cache
// Note that ExpireReasonCallback will be the succesor of this function in the next major release.
type ExpireCallback func(key string, value interface{})
// ExpireReasonCallback is used as a callback on item expiration with extra information why the item expired.
type ExpireReasonCallback func(key string, reason EvictionReason, value interface{})
// LoaderFunction can be supplied to retrieve an item where a cache miss occurs. Supply an item specific ttl or Duration.Zero
type LoaderFunction func(key string) (data interface{}, ttl time.Duration, err error)
// SimpleCache interface enables a quick-start. Interface for basic usage.
type SimpleCache interface {
Get(key string) (interface{}, error)
GetWithTTL(key string) (interface{}, time.Duration, error)
Set(key string, data interface{}) error
SetTTL(ttl time.Duration) error
SetWithTTL(key string, data interface{}, ttl time.Duration) error
Remove(key string) error
Close() error
Purge() error
}
// Cache is a synchronized map of items that can auto-expire once stale
type Cache struct {
mutex sync.Mutex
ttl time.Duration
items map[string]*item
loaderLock *singleflight.Group
expireCallback ExpireCallback
expireReasonCallback ExpireReasonCallback
checkExpireCallback CheckExpireCallback
newItemCallback ExpireCallback
priorityQueue *priorityQueue
expirationNotification chan bool
expirationTime time.Time
skipTTLExtension bool
shutdownSignal chan (chan struct{})
isShutDown bool
loaderFunction LoaderFunction
sizeLimit int
metrics Metrics
}
// EvictionReason is an enum that explains why an item was evicted
type EvictionReason int
const (
// Removed : explicitly removed from cache via API call
Removed EvictionReason = iota
// EvictedSize : evicted due to exceeding the cache size
EvictedSize
// Expired : the time to live is zero and therefore the item is removed
Expired
// Closed : the cache was closed
Closed
)
const (
// ErrClosed is raised when operating on a cache where Close() has already been called.
ErrClosed = constError("cache already closed")
// ErrNotFound indicates that the requested key is not present in the cache
ErrNotFound = constError("key not found")
)
type constError string
func (err constError) Error() string {
return string(err)
}
func (cache *Cache) getItem(key string) (*item, bool, bool) {
item, exists := cache.items[key]
if !exists || item.expired() {
return nil, false, false
}
if item.ttl >= 0 && (item.ttl > 0 || cache.ttl > 0) {
if cache.ttl > 0 && item.ttl == 0 {
item.ttl = cache.ttl
}
if !cache.skipTTLExtension {
item.touch()
}
cache.priorityQueue.update(item)
}
expirationNotification := false
if cache.expirationTime.After(time.Now().Add(item.ttl)) {
expirationNotification = true
}
return item, exists, expirationNotification
}
func (cache *Cache) startExpirationProcessing() {
timer := time.NewTimer(time.Hour)
for {
var sleepTime time.Duration
cache.mutex.Lock()
if cache.priorityQueue.Len() > 0 {
sleepTime = time.Until(cache.priorityQueue.items[0].expireAt)
if sleepTime < 0 && cache.priorityQueue.items[0].expireAt.IsZero() {
sleepTime = time.Hour
} else if sleepTime < 0 {
sleepTime = time.Microsecond
}
if cache.ttl > 0 {
sleepTime = min(sleepTime, cache.ttl)
}
} else if cache.ttl > 0 {
sleepTime = cache.ttl
} else {
sleepTime = time.Hour
}
cache.expirationTime = time.Now().Add(sleepTime)
cache.mutex.Unlock()
timer.Reset(sleepTime)
select {
case shutdownFeedback := <-cache.shutdownSignal:
timer.Stop()
cache.mutex.Lock()
if cache.priorityQueue.Len() > 0 {
cache.evictjob(Closed)
}
cache.mutex.Unlock()
shutdownFeedback <- struct{}{}
return
case <-timer.C:
timer.Stop()
cache.mutex.Lock()
if cache.priorityQueue.Len() == 0 {
cache.mutex.Unlock()
continue
}
cache.cleanjob()
cache.mutex.Unlock()
case <-cache.expirationNotification:
timer.Stop()
continue
}
}
}
func (cache *Cache) checkExpirationCallback(item *item, reason EvictionReason) {
if cache.expireCallback != nil {
go cache.expireCallback(item.key, item.data)
}
if cache.expireReasonCallback != nil {
go cache.expireReasonCallback(item.key, reason, item.data)
}
}
func (cache *Cache) removeItem(item *item, reason EvictionReason) {
cache.metrics.Evicted++
cache.checkExpirationCallback(item, reason)
cache.priorityQueue.remove(item)
delete(cache.items, item.key)
}
func (cache *Cache) evictjob(reason EvictionReason) {
// index will only be advanced if the current entry will not be evicted
i := 0
for item := cache.priorityQueue.items[i]; ; item = cache.priorityQueue.items[i] {
cache.removeItem(item, reason)
if cache.priorityQueue.Len() == 0 {
return
}
}
}
func (cache *Cache) cleanjob() {
// index will only be advanced if the current entry will not be evicted
i := 0
for item := cache.priorityQueue.items[i]; item.expired(); item = cache.priorityQueue.items[i] {
if cache.checkExpireCallback != nil {
if !cache.checkExpireCallback(item.key, item.data) {
item.touch()
cache.priorityQueue.update(item)
i++
if i == cache.priorityQueue.Len() {
break
}
continue
}
}
cache.removeItem(item, Expired)
if cache.priorityQueue.Len() == 0 {
return
}
}
}
// Close calls Purge after stopping the goroutine that does ttl checking, for a clean shutdown.
// The cache is no longer cleaning up after the first call to Close, repeated calls are safe and return ErrClosed.
func (cache *Cache) Close() error {
cache.mutex.Lock()
if !cache.isShutDown {
cache.isShutDown = true
cache.mutex.Unlock()
feedback := make(chan struct{})
cache.shutdownSignal <- feedback
<-feedback
close(cache.shutdownSignal)
cache.Purge()
} else {
cache.mutex.Unlock()
return ErrClosed
}
return nil
}
// Set is a thread-safe way to add new items to the map.
func (cache *Cache) Set(key string, data interface{}) error {
return cache.SetWithTTL(key, data, ItemExpireWithGlobalTTL)
}
// SetWithTTL is a thread-safe way to add new items to the map with individual ttl.
func (cache *Cache) SetWithTTL(key string, data interface{}, ttl time.Duration) error {
cache.mutex.Lock()
if cache.isShutDown {
cache.mutex.Unlock()
return ErrClosed
}
item, exists, _ := cache.getItem(key)
if exists {
item.data = data
item.ttl = ttl
} else {
if cache.sizeLimit != 0 && len(cache.items) >= cache.sizeLimit {
cache.removeItem(cache.priorityQueue.items[0], EvictedSize)
}
item = newItem(key, data, ttl)
cache.items[key] = item
}
cache.metrics.Inserted++
if item.ttl >= 0 && (item.ttl > 0 || cache.ttl > 0) {
if cache.ttl > 0 && item.ttl == 0 {
item.ttl = cache.ttl
}
item.touch()
}
if exists {
cache.priorityQueue.update(item)
} else {
cache.priorityQueue.push(item)
}
cache.mutex.Unlock()
if !exists && cache.newItemCallback != nil {
cache.newItemCallback(key, data)
}
cache.expirationNotification <- true
return nil
}
// Get is a thread-safe way to lookup items
// Every lookup, also touches the item, hence extending it's life
func (cache *Cache) Get(key string) (interface{}, error) {
return cache.GetByLoader(key, nil)
}
// GetWithTTL has exactly the same behaviour as Get but also returns
// the remaining TTL for an specific item at the moment it its retrieved
func (cache *Cache) GetWithTTL(key string) (interface{}, time.Duration, error) {
return cache.GetByLoaderWithTtl(key, nil)
}
// GetByLoader can take a per key loader function (ie. to propagate context)
func (cache *Cache) GetByLoader(key string, customLoaderFunction LoaderFunction) (interface{}, error) {
dataToReturn, _, err := cache.GetByLoaderWithTtl(key, customLoaderFunction)
return dataToReturn, err
}
// GetByLoaderWithTtl can take a per key loader function (ie. to propagate context)
func (cache *Cache) GetByLoaderWithTtl(key string, customLoaderFunction LoaderFunction) (interface{}, time.Duration, error) {
cache.mutex.Lock()
if cache.isShutDown {
cache.mutex.Unlock()
return nil, 0, ErrClosed
}
cache.metrics.Hits++
item, exists, triggerExpirationNotification := cache.getItem(key)
var dataToReturn interface{}
ttlToReturn := time.Duration(0)
if exists {
cache.metrics.Retrievals++
dataToReturn = item.data
ttlToReturn = time.Until(item.expireAt)
if ttlToReturn < 0 {
ttlToReturn = 0
}
}
var err error
if !exists {
cache.metrics.Misses++
err = ErrNotFound
}
loaderFunction := cache.loaderFunction
if customLoaderFunction != nil {
loaderFunction = customLoaderFunction
}
if loaderFunction == nil || exists {
cache.mutex.Unlock()
}
if loaderFunction != nil && !exists {
type loaderResult struct {
data interface{}
ttl time.Duration
}
ch := cache.loaderLock.DoChan(key, func() (interface{}, error) {
// cache is not blocked during io
invokeData, ttl, err := cache.invokeLoader(key, loaderFunction)
lr := &loaderResult{
data: invokeData,
ttl: ttl,
}
return lr, err
})
cache.mutex.Unlock()
res := <-ch
dataToReturn = res.Val.(*loaderResult).data
ttlToReturn = res.Val.(*loaderResult).ttl
err = res.Err
}
if triggerExpirationNotification {
cache.expirationNotification <- true
}
return dataToReturn, ttlToReturn, err
}
func (cache *Cache) invokeLoader(key string, loaderFunction LoaderFunction) (dataToReturn interface{}, ttl time.Duration, err error) {
dataToReturn, ttl, err = loaderFunction(key)
if err == nil {
err = cache.SetWithTTL(key, dataToReturn, ttl)
if err != nil {
dataToReturn = nil
}
}
return dataToReturn, ttl, err
}
// Remove removes an item from the cache if it exists, triggers expiration callback when set. Can return ErrNotFound if the entry was not present.
func (cache *Cache) Remove(key string) error {
cache.mutex.Lock()
defer cache.mutex.Unlock()
if cache.isShutDown {
return ErrClosed
}
object, exists := cache.items[key]
if !exists {
return ErrNotFound
}
cache.removeItem(object, Removed)
return nil
}
// Count returns the number of items in the cache. Returns zero when the cache has been closed.
func (cache *Cache) Count() int {
cache.mutex.Lock()
defer cache.mutex.Unlock()
if cache.isShutDown {
return 0
}
length := len(cache.items)
return length
}
// GetKeys returns all keys of items in the cache. Returns nil when the cache has been closed.
func (cache *Cache) GetKeys() []string {
cache.mutex.Lock()
defer cache.mutex.Unlock()
if cache.isShutDown {
return nil
}
keys := make([]string, len(cache.items))
i := 0
for k := range cache.items {
keys[i] = k
i++
}
return keys
}
// SetTTL sets the global TTL value for items in the cache, which can be overridden at the item level.
func (cache *Cache) SetTTL(ttl time.Duration) error {
cache.mutex.Lock()
if cache.isShutDown {
cache.mutex.Unlock()
return ErrClosed
}
cache.ttl = ttl
cache.mutex.Unlock()
cache.expirationNotification <- true
return nil
}
// SetExpirationCallback sets a callback that will be called when an item expires
func (cache *Cache) SetExpirationCallback(callback ExpireCallback) {
cache.mutex.Lock()
defer cache.mutex.Unlock()
cache.expireCallback = callback
}
// SetExpirationReasonCallback sets a callback that will be called when an item expires, includes reason of expiry
func (cache *Cache) SetExpirationReasonCallback(callback ExpireReasonCallback) {
cache.mutex.Lock()
defer cache.mutex.Unlock()
cache.expireReasonCallback = callback
}
// SetCheckExpirationCallback sets a callback that will be called when an item is about to expire
// in order to allow external code to decide whether the item expires or remains for another TTL cycle
func (cache *Cache) SetCheckExpirationCallback(callback CheckExpireCallback) {
cache.mutex.Lock()
defer cache.mutex.Unlock()
cache.checkExpireCallback = callback
}
// SetNewItemCallback sets a callback that will be called when a new item is added to the cache
func (cache *Cache) SetNewItemCallback(callback ExpireCallback) {
cache.mutex.Lock()
defer cache.mutex.Unlock()
cache.newItemCallback = callback
}
// SkipTTLExtensionOnHit allows the user to change the cache behaviour. When this flag is set to true it will
// no longer extend TTL of items when they are retrieved using Get, or when their expiration condition is evaluated
// using SetCheckExpirationCallback.
func (cache *Cache) SkipTTLExtensionOnHit(value bool) {
cache.mutex.Lock()
defer cache.mutex.Unlock()
cache.skipTTLExtension = value
}
// SetLoaderFunction allows you to set a function to retrieve cache misses. The signature matches that of the Get function.
// Additional Get calls on the same key block while fetching is in progress (groupcache style).
func (cache *Cache) SetLoaderFunction(loader LoaderFunction) {
cache.mutex.Lock()
defer cache.mutex.Unlock()
cache.loaderFunction = loader
}
// Purge will remove all entries
func (cache *Cache) Purge() error {
cache.mutex.Lock()
defer cache.mutex.Unlock()
if cache.isShutDown {
return ErrClosed
}
cache.metrics.Evicted += int64(len(cache.items))
cache.items = make(map[string]*item)
cache.priorityQueue = newPriorityQueue()
return nil
}
// SetCacheSizeLimit sets a limit to the amount of cached items.
// If a new item is getting cached, the closes item to being timed out will be replaced
// Set to 0 to turn off
func (cache *Cache) SetCacheSizeLimit(limit int) {
cache.mutex.Lock()
defer cache.mutex.Unlock()
cache.sizeLimit = limit
}
// NewCache is a helper to create instance of the Cache struct
func NewCache() *Cache {
shutdownChan := make(chan chan struct{})
cache := &Cache{
items: make(map[string]*item),
loaderLock: &singleflight.Group{},
priorityQueue: newPriorityQueue(),
expirationNotification: make(chan bool),
expirationTime: time.Now(),
shutdownSignal: shutdownChan,
isShutDown: false,
loaderFunction: nil,
sizeLimit: 0,
metrics: Metrics{},
}
go cache.startExpirationProcessing()
return cache
}
// GetMetrics exposes the metrics of the cache. This is a snapshot copy of the metrics.
func (cache *Cache) GetMetrics() Metrics {
cache.mutex.Lock()
defer cache.mutex.Unlock()
return cache.metrics
}
// Touch resets the TTL of the key when it exists, returns ErrNotFound if the key is not present.
func (cache *Cache) Touch(key string) error {
cache.mutex.Lock()
defer cache.mutex.Unlock()
item, exists := cache.items[key]
if !exists {
return ErrNotFound
}
item.touch()
return nil
}
func min(duration time.Duration, second time.Duration) time.Duration {
if duration < second {
return duration
}
return second
}