diff --git a/CHANGELOG.md b/CHANGELOG.md index 88a984d..ccbf9cc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,15 @@ +# 2.9.0 (October 2021) + +#55,#56,#57 : @chenyahui was on fire and greatly improved the peformance of the library. He also got rid of the blocking call to expirationNotification, making the code run twice as fast in the benchmarks! + +# 2.8.1 (September 2021) + +#53 : Avoids recalculation of TTL value returned in API when TTL is extended. by @iczc + +# 2.8.0 (August 2021) + +#51 : The call GetWithTTL(key string) (interface{}, time.Duration, error) is added so that you can retrieve an item, and also know the remaining TTL. Thanks to @asgarciap for contributing. + # 2.7.0 (June 2021) #46 : got panic diff --git a/cache.go b/cache.go index fce01e1..778aaa9 100644 --- a/cache.go +++ b/cache.go @@ -11,7 +11,7 @@ type CheckExpireCallback func(key string, value interface{}) bool // ExpireCallback is used as a callback on item expiration or when notifying of an item new to the cache -// Note that ExpireReasonCallback will be the succesor of this function in the next major release. +// Note that ExpireReasonCallback will be the successor of this function in the next major release. type ExpireCallback func(key string, value interface{}) // ExpireReasonCallback is used as a callback on item expiration with extra information why the item expired. @@ -34,23 +34,30 @@ // Cache is a synchronized map of items that can auto-expire once stale type Cache struct { - mutex sync.Mutex - ttl time.Duration - items map[string]*item - loaderLock *singleflight.Group - expireCallback ExpireCallback - expireReasonCallback ExpireReasonCallback - checkExpireCallback CheckExpireCallback - newItemCallback ExpireCallback + // mutex is shared for all operations that need to be safe + mutex sync.Mutex + // ttl is the global ttl for the cache, can be zero (is infinite) + ttl time.Duration + // actual item storage + items map[string]*item + // lock used to avoid fetching a remote item multiple times + loaderLock *singleflight.Group + expireCallback ExpireCallback + expireReasonCallback ExpireReasonCallback + checkExpireCallback CheckExpireCallback + newItemCallback ExpireCallback + // the queue is used to have an ordered structure to use for expiration and cleanup. priorityQueue *priorityQueue expirationNotification chan bool - expirationTime time.Time - skipTTLExtension bool - shutdownSignal chan (chan struct{}) - isShutDown bool - loaderFunction LoaderFunction - sizeLimit int - metrics Metrics + // hasNotified is used to not schedule new expiration processing when an request is already pending. + hasNotified bool + expirationTime time.Time + skipTTLExtension bool + shutdownSignal chan (chan struct{}) + isShutDown bool + loaderFunction LoaderFunction + sizeLimit int + metrics Metrics } // EvictionReason is an enum that explains why an item was evicted @@ -86,19 +93,25 @@ return nil, false, false } - if item.ttl >= 0 && (item.ttl > 0 || cache.ttl > 0) { - if cache.ttl > 0 && item.ttl == 0 { - item.ttl = cache.ttl - } - - if !cache.skipTTLExtension { - item.touch() - } - cache.priorityQueue.update(item) - } + // no need to change priority queue when skipTTLExtension is true or the item will not expire + if cache.skipTTLExtension || (item.ttl == 0 && cache.ttl == 0) { + return item, true, false + } + + if item.ttl == 0 { + item.ttl = cache.ttl + } + + item.touch() + + oldExpireTime := cache.priorityQueue.root().expireAt + cache.priorityQueue.update(item) + nowExpireTime := cache.priorityQueue.root().expireAt expirationNotification := false - if cache.expirationTime.After(time.Now().Add(item.ttl)) { + + // notify expiration only if the latest expire time is changed + if (oldExpireTime.IsZero() && !nowExpireTime.IsZero()) || oldExpireTime.After(nowExpireTime) { expirationNotification = true } return item, exists, expirationNotification @@ -109,9 +122,10 @@ for { var sleepTime time.Duration cache.mutex.Lock() + cache.hasNotified = false if cache.priorityQueue.Len() > 0 { - sleepTime = time.Until(cache.priorityQueue.items[0].expireAt) - if sleepTime < 0 && cache.priorityQueue.items[0].expireAt.IsZero() { + sleepTime = time.Until(cache.priorityQueue.root().expireAt) + if sleepTime < 0 && cache.priorityQueue.root().expireAt.IsZero() { sleepTime = time.Hour } else if sleepTime < 0 { sleepTime = time.Microsecond @@ -172,7 +186,6 @@ cache.checkExpirationCallback(item, reason) cache.priorityQueue.remove(item) delete(cache.items, item.key) - } func (cache *Cache) evictjob(reason EvictionReason) { @@ -244,6 +257,11 @@ } item, exists, _ := cache.getItem(key) + oldExpireTime := time.Time{} + if !cache.priorityQueue.isEmpty() { + oldExpireTime = cache.priorityQueue.root().expireAt + } + if exists { item.data = data item.ttl = ttl @@ -256,12 +274,11 @@ } cache.metrics.Inserted++ - if item.ttl >= 0 && (item.ttl > 0 || cache.ttl > 0) { - if cache.ttl > 0 && item.ttl == 0 { - item.ttl = cache.ttl - } - item.touch() - } + if item.ttl == 0 { + item.ttl = cache.ttl + } + + item.touch() if exists { cache.priorityQueue.update(item) @@ -269,34 +286,40 @@ cache.priorityQueue.push(item) } + nowExpireTime := cache.priorityQueue.root().expireAt + cache.mutex.Unlock() if !exists && cache.newItemCallback != nil { cache.newItemCallback(key, data) } - cache.expirationNotification <- true + + // notify expiration only if the latest expire time is changed + if (oldExpireTime.IsZero() && !nowExpireTime.IsZero()) || oldExpireTime.After(nowExpireTime) { + cache.notifyExpiration() + } return nil } // Get is a thread-safe way to lookup items -// Every lookup, also touches the item, hence extending it's life +// Every lookup, also touches the item, hence extending its life func (cache *Cache) Get(key string) (interface{}, error) { return cache.GetByLoader(key, nil) } // GetWithTTL has exactly the same behaviour as Get but also returns -// the remaining TTL for an specific item at the moment it its retrieved +// the remaining TTL for a specific item at the moment its retrieved func (cache *Cache) GetWithTTL(key string) (interface{}, time.Duration, error) { return cache.GetByLoaderWithTtl(key, nil) } -// GetByLoader can take a per key loader function (ie. to propagate context) +// GetByLoader can take a per key loader function (i.e. to propagate context) func (cache *Cache) GetByLoader(key string, customLoaderFunction LoaderFunction) (interface{}, error) { dataToReturn, _, err := cache.GetByLoaderWithTtl(key, customLoaderFunction) return dataToReturn, err } -// GetByLoaderWithTtl can take a per key loader function (ie. to propagate context) +// GetByLoaderWithTtl can take a per key loader function (i.e. to propagate context) func (cache *Cache) GetByLoaderWithTtl(key string, customLoaderFunction LoaderFunction) (interface{}, time.Duration, error) { cache.mutex.Lock() if cache.isShutDown { @@ -359,10 +382,22 @@ } if triggerExpirationNotification { - cache.expirationNotification <- true + cache.notifyExpiration() } return dataToReturn, ttlToReturn, err +} + +func (cache *Cache) notifyExpiration() { + cache.mutex.Lock() + if cache.hasNotified { + cache.mutex.Unlock() + return + } + cache.hasNotified = true + cache.mutex.Unlock() + + cache.expirationNotification <- true } func (cache *Cache) invokeLoader(key string, loaderFunction LoaderFunction) (dataToReturn interface{}, ttl time.Duration, err error) { @@ -432,7 +467,7 @@ } cache.ttl = ttl cache.mutex.Unlock() - cache.expirationNotification <- true + cache.notifyExpiration() return nil } @@ -513,7 +548,7 @@ items: make(map[string]*item), loaderLock: &singleflight.Group{}, priorityQueue: newPriorityQueue(), - expirationNotification: make(chan bool), + expirationNotification: make(chan bool, 1), expirationTime: time.Now(), shutdownSignal: shutdownChan, isShutDown: false, diff --git a/priority_queue.go b/priority_queue.go index 11b9c31..eddd76b 100644 --- a/priority_queue.go +++ b/priority_queue.go @@ -12,6 +12,18 @@ type priorityQueue struct { items []*item +} + +func (pq *priorityQueue) isEmpty() bool { + return len(pq.items) == 0 +} + +func (pq *priorityQueue) root() *item { + if len(pq.items) == 0 { + return nil + } + + return pq.items[0] } func (pq *priorityQueue) update(item *item) {