package instance
import (
"reflect"
"sort"
"sync"
"github.com/go-kit/kit/sd"
)
// Cache keeps track of resource instances provided to it via Update method
// and implements the Instancer interface
type Cache struct {
mtx sync.RWMutex
state sd.Event
reg registry
}
// NewCache creates a new Cache.
func NewCache() *Cache {
return &Cache{
reg: registry{},
}
}
// Update receives new instances from service discovery, stores them internally,
// and notifies all registered listeners.
func (c *Cache) Update(event sd.Event) {
c.mtx.Lock()
defer c.mtx.Unlock()
sort.Strings(event.Instances)
if reflect.DeepEqual(c.state, event) {
return // no need to broadcast the same instances
}
c.state = event
c.reg.broadcast(event)
}
// State returns the current state of discovery (instances or error) as sd.Event
func (c *Cache) State() sd.Event {
c.mtx.RLock()
defer c.mtx.RUnlock()
return c.state
}
// Register implements Instancer.
func (c *Cache) Register(ch chan<- sd.Event) {
c.mtx.Lock()
defer c.mtx.Unlock()
c.reg.register(ch)
// always push the current state to new channels
ch <- c.state
}
// Deregister implements Instancer.
func (c *Cache) Deregister(ch chan<- sd.Event) {
c.mtx.Lock()
defer c.mtx.Unlock()
c.reg.deregister(ch)
}
// registry is not goroutine-safe.
type registry map[chan<- sd.Event]struct{}
func (r registry) broadcast(event sd.Event) {
for c := range r {
c <- event
}
}
func (r registry) register(c chan<- sd.Event) {
r[c] = struct{}{}
}
func (r registry) deregister(c chan<- sd.Event) {
delete(r, c)
}