Codebase list golang-github-go-kit-kit / 9a19822 sd / cache / cache.go
9a19822

Tree @9a19822 (Download .tar.gz)

cache.go @9a19822raw · history · blame

package cache

import (
	"io"
	"sort"
	"sync"

	"github.com/go-kit/kit/endpoint"
	"github.com/go-kit/kit/log"
	"github.com/go-kit/kit/sd"
)

// Cache collects the most recent set of endpoints from a service discovery
// system via a subscriber, and makes them available to consumers. Cache is
// meant to be embedded inside of a concrete subscriber, and can serve Service
// invocations directly.
type Cache struct {
	mtx     sync.RWMutex
	factory sd.Factory
	cache   map[string]endpointCloser
	slice   []endpoint.Endpoint
	logger  log.Logger
}

type endpointCloser struct {
	endpoint.Endpoint
	io.Closer
}

// New returns a new, empty endpoint cache.
func New(factory sd.Factory, logger log.Logger) *Cache {
	return &Cache{
		factory: factory,
		cache:   map[string]endpointCloser{},
		logger:  logger,
	}
}

// Update should be invoked by clients with a complete set of current instance
// strings whenever that set changes. The cache manufactures new endpoints via
// the factory, closes old endpoints when they disappear, and persists existing
// endpoints if they survive through an update.
func (c *Cache) Update(instances []string) {
	c.mtx.Lock()
	defer c.mtx.Unlock()

	// Deterministic order (for later).
	sort.Strings(instances)

	// Produce the current set of services.
	cache := make(map[string]endpointCloser, len(instances))
	for _, instance := range instances {
		// If it already exists, just copy it over.
		if sc, ok := c.cache[instance]; ok {
			cache[instance] = sc
			delete(c.cache, instance)
			continue
		}

		// If it doesn't exist, create it.
		service, closer, err := c.factory(instance)
		if err != nil {
			c.logger.Log("instance", instance, "err", err)
			continue
		}
		cache[instance] = endpointCloser{service, closer}
	}

	// Close any leftover endpoints.
	for _, sc := range c.cache {
		if sc.Closer != nil {
			sc.Closer.Close()
		}
	}

	// Populate the slice of endpoints.
	slice := make([]endpoint.Endpoint, 0, len(cache))
	for _, instance := range instances {
		// A bad factory may mean an instance is not present.
		if _, ok := cache[instance]; !ok {
			continue
		}
		slice = append(slice, cache[instance].Endpoint)
	}

	// Swap and trigger GC for old copies.
	c.slice = slice
	c.cache = cache
}

// Endpoints yields the current set of (presumably identical) endpoints, ordered
// lexicographically by the corresponding instance string.
func (c *Cache) Endpoints() []endpoint.Endpoint {
	c.mtx.RLock()
	defer c.mtx.RUnlock()
	return c.slice
}