10 | 10 |
type CheckExpireCallback func(key string, value interface{}) bool
|
11 | 11 |
|
12 | 12 |
// ExpireCallback is used as a callback on item expiration or when notifying of an item new to the cache
|
13 | |
// Note that ExpireReasonCallback will be the succesor of this function in the next major release.
|
|
13 |
// Note that ExpireReasonCallback will be the successor of this function in the next major release.
|
14 | 14 |
type ExpireCallback func(key string, value interface{})
|
15 | 15 |
|
16 | 16 |
// ExpireReasonCallback is used as a callback on item expiration with extra information why the item expired.
|
|
33 | 33 |
|
34 | 34 |
// Cache is a synchronized map of items that can auto-expire once stale
|
35 | 35 |
type Cache struct {
|
36 | |
mutex sync.Mutex
|
37 | |
ttl time.Duration
|
38 | |
items map[string]*item
|
39 | |
loaderLock *singleflight.Group
|
40 | |
expireCallback ExpireCallback
|
41 | |
expireReasonCallback ExpireReasonCallback
|
42 | |
checkExpireCallback CheckExpireCallback
|
43 | |
newItemCallback ExpireCallback
|
|
36 |
// mutex is shared for all operations that need to be safe
|
|
37 |
mutex sync.Mutex
|
|
38 |
// ttl is the global ttl for the cache, can be zero (is infinite)
|
|
39 |
ttl time.Duration
|
|
40 |
// actual item storage
|
|
41 |
items map[string]*item
|
|
42 |
// lock used to avoid fetching a remote item multiple times
|
|
43 |
loaderLock *singleflight.Group
|
|
44 |
expireCallback ExpireCallback
|
|
45 |
expireReasonCallback ExpireReasonCallback
|
|
46 |
checkExpireCallback CheckExpireCallback
|
|
47 |
newItemCallback ExpireCallback
|
|
48 |
// the queue is used to have an ordered structure to use for expiration and cleanup.
|
44 | 49 |
priorityQueue *priorityQueue
|
45 | 50 |
expirationNotification chan bool
|
46 | |
expirationTime time.Time
|
47 | |
skipTTLExtension bool
|
48 | |
shutdownSignal chan (chan struct{})
|
49 | |
isShutDown bool
|
50 | |
loaderFunction LoaderFunction
|
51 | |
sizeLimit int
|
52 | |
metrics Metrics
|
|
51 |
// hasNotified is used to not schedule new expiration processing when an request is already pending.
|
|
52 |
hasNotified bool
|
|
53 |
expirationTime time.Time
|
|
54 |
skipTTLExtension bool
|
|
55 |
shutdownSignal chan (chan struct{})
|
|
56 |
isShutDown bool
|
|
57 |
loaderFunction LoaderFunction
|
|
58 |
sizeLimit int
|
|
59 |
metrics Metrics
|
53 | 60 |
}
|
54 | 61 |
|
55 | 62 |
// EvictionReason is an enum that explains why an item was evicted
|
|
85 | 92 |
return nil, false, false
|
86 | 93 |
}
|
87 | 94 |
|
88 | |
if item.ttl >= 0 && (item.ttl > 0 || cache.ttl > 0) {
|
89 | |
if cache.ttl > 0 && item.ttl == 0 {
|
90 | |
item.ttl = cache.ttl
|
91 | |
}
|
92 | |
|
93 | |
if !cache.skipTTLExtension {
|
94 | |
item.touch()
|
95 | |
}
|
96 | |
cache.priorityQueue.update(item)
|
97 | |
}
|
|
95 |
// no need to change priority queue when skipTTLExtension is true or the item will not expire
|
|
96 |
if cache.skipTTLExtension || (item.ttl == 0 && cache.ttl == 0) {
|
|
97 |
return item, true, false
|
|
98 |
}
|
|
99 |
|
|
100 |
if item.ttl == 0 {
|
|
101 |
item.ttl = cache.ttl
|
|
102 |
}
|
|
103 |
|
|
104 |
item.touch()
|
|
105 |
|
|
106 |
oldExpireTime := cache.priorityQueue.root().expireAt
|
|
107 |
cache.priorityQueue.update(item)
|
|
108 |
nowExpireTime := cache.priorityQueue.root().expireAt
|
98 | 109 |
|
99 | 110 |
expirationNotification := false
|
100 | |
if cache.expirationTime.After(time.Now().Add(item.ttl)) {
|
|
111 |
|
|
112 |
// notify expiration only if the latest expire time is changed
|
|
113 |
if (oldExpireTime.IsZero() && !nowExpireTime.IsZero()) || oldExpireTime.After(nowExpireTime) {
|
101 | 114 |
expirationNotification = true
|
102 | 115 |
}
|
103 | 116 |
return item, exists, expirationNotification
|
|
108 | 121 |
for {
|
109 | 122 |
var sleepTime time.Duration
|
110 | 123 |
cache.mutex.Lock()
|
|
124 |
cache.hasNotified = false
|
111 | 125 |
if cache.priorityQueue.Len() > 0 {
|
112 | |
sleepTime = time.Until(cache.priorityQueue.items[0].expireAt)
|
113 | |
if sleepTime < 0 && cache.priorityQueue.items[0].expireAt.IsZero() {
|
|
126 |
sleepTime = time.Until(cache.priorityQueue.root().expireAt)
|
|
127 |
if sleepTime < 0 && cache.priorityQueue.root().expireAt.IsZero() {
|
114 | 128 |
sleepTime = time.Hour
|
115 | 129 |
} else if sleepTime < 0 {
|
116 | 130 |
sleepTime = time.Microsecond
|
|
171 | 185 |
cache.checkExpirationCallback(item, reason)
|
172 | 186 |
cache.priorityQueue.remove(item)
|
173 | 187 |
delete(cache.items, item.key)
|
174 | |
|
175 | 188 |
}
|
176 | 189 |
|
177 | 190 |
func (cache *Cache) evictjob(reason EvictionReason) {
|
|
243 | 256 |
}
|
244 | 257 |
item, exists, _ := cache.getItem(key)
|
245 | 258 |
|
|
259 |
oldExpireTime := time.Time{}
|
|
260 |
if !cache.priorityQueue.isEmpty() {
|
|
261 |
oldExpireTime = cache.priorityQueue.root().expireAt
|
|
262 |
}
|
|
263 |
|
246 | 264 |
if exists {
|
247 | 265 |
item.data = data
|
248 | 266 |
item.ttl = ttl
|
|
255 | 273 |
}
|
256 | 274 |
cache.metrics.Inserted++
|
257 | 275 |
|
258 | |
if item.ttl >= 0 && (item.ttl > 0 || cache.ttl > 0) {
|
259 | |
if cache.ttl > 0 && item.ttl == 0 {
|
260 | |
item.ttl = cache.ttl
|
261 | |
}
|
262 | |
item.touch()
|
263 | |
}
|
|
276 |
if item.ttl == 0 {
|
|
277 |
item.ttl = cache.ttl
|
|
278 |
}
|
|
279 |
|
|
280 |
item.touch()
|
264 | 281 |
|
265 | 282 |
if exists {
|
266 | 283 |
cache.priorityQueue.update(item)
|
|
268 | 285 |
cache.priorityQueue.push(item)
|
269 | 286 |
}
|
270 | 287 |
|
|
288 |
nowExpireTime := cache.priorityQueue.root().expireAt
|
|
289 |
|
271 | 290 |
cache.mutex.Unlock()
|
272 | 291 |
if !exists && cache.newItemCallback != nil {
|
273 | 292 |
cache.newItemCallback(key, data)
|
274 | 293 |
}
|
275 | |
cache.expirationNotification <- true
|
|
294 |
|
|
295 |
// notify expiration only if the latest expire time is changed
|
|
296 |
if (oldExpireTime.IsZero() && !nowExpireTime.IsZero()) || oldExpireTime.After(nowExpireTime) {
|
|
297 |
cache.notifyExpiration()
|
|
298 |
}
|
276 | 299 |
return nil
|
277 | 300 |
}
|
278 | 301 |
|
279 | 302 |
// Get is a thread-safe way to lookup items
|
280 | |
// Every lookup, also touches the item, hence extending it's life
|
|
303 |
// Every lookup, also touches the item, hence extending its life
|
281 | 304 |
func (cache *Cache) Get(key string) (interface{}, error) {
|
282 | 305 |
return cache.GetByLoader(key, nil)
|
283 | 306 |
}
|
284 | 307 |
|
285 | 308 |
// GetWithTTL has exactly the same behaviour as Get but also returns
|
286 | |
// the remaining TTL for an specific item at the moment it its retrieved
|
|
309 |
// the remaining TTL for a specific item at the moment its retrieved
|
287 | 310 |
func (cache *Cache) GetWithTTL(key string) (interface{}, time.Duration, error) {
|
288 | 311 |
return cache.GetByLoaderWithTtl(key, nil)
|
289 | 312 |
}
|
290 | 313 |
|
291 | |
// GetByLoader can take a per key loader function (ie. to propagate context)
|
|
314 |
// GetByLoader can take a per key loader function (i.e. to propagate context)
|
292 | 315 |
func (cache *Cache) GetByLoader(key string, customLoaderFunction LoaderFunction) (interface{}, error) {
|
293 | 316 |
dataToReturn, _, err := cache.GetByLoaderWithTtl(key, customLoaderFunction)
|
294 | 317 |
|
295 | 318 |
return dataToReturn, err
|
296 | 319 |
}
|
297 | 320 |
|
298 | |
// GetByLoaderWithTtl can take a per key loader function (ie. to propagate context)
|
|
321 |
// GetByLoaderWithTtl can take a per key loader function (i.e. to propagate context)
|
299 | 322 |
func (cache *Cache) GetByLoaderWithTtl(key string, customLoaderFunction LoaderFunction) (interface{}, time.Duration, error) {
|
300 | 323 |
cache.mutex.Lock()
|
301 | 324 |
if cache.isShutDown {
|
|
358 | 381 |
}
|
359 | 382 |
|
360 | 383 |
if triggerExpirationNotification {
|
361 | |
cache.expirationNotification <- true
|
|
384 |
cache.notifyExpiration()
|
362 | 385 |
}
|
363 | 386 |
|
364 | 387 |
return dataToReturn, ttlToReturn, err
|
|
388 |
}
|
|
389 |
|
|
390 |
func (cache *Cache) notifyExpiration() {
|
|
391 |
cache.mutex.Lock()
|
|
392 |
if cache.hasNotified {
|
|
393 |
cache.mutex.Unlock()
|
|
394 |
return
|
|
395 |
}
|
|
396 |
cache.hasNotified = true
|
|
397 |
cache.mutex.Unlock()
|
|
398 |
|
|
399 |
cache.expirationNotification <- true
|
365 | 400 |
}
|
366 | 401 |
|
367 | 402 |
func (cache *Cache) invokeLoader(key string, loaderFunction LoaderFunction) (dataToReturn interface{}, ttl time.Duration, err error) {
|
|
431 | 466 |
}
|
432 | 467 |
cache.ttl = ttl
|
433 | 468 |
cache.mutex.Unlock()
|
434 | |
cache.expirationNotification <- true
|
|
469 |
cache.notifyExpiration()
|
435 | 470 |
return nil
|
436 | 471 |
}
|
437 | 472 |
|
|
512 | 547 |
items: make(map[string]*item),
|
513 | 548 |
loaderLock: &singleflight.Group{},
|
514 | 549 |
priorityQueue: newPriorityQueue(),
|
515 | |
expirationNotification: make(chan bool),
|
|
550 |
expirationNotification: make(chan bool, 1),
|
516 | 551 |
expirationTime: time.Now(),
|
517 | 552 |
shutdownSignal: shutdownChan,
|
518 | 553 |
isShutDown: false,
|