Codebase list golang-github-go-kit-kit / 8df4980 loadbalancer / endpoint_cache.go
8df4980

Tree @8df4980 (Download .tar.gz)

endpoint_cache.go @8df4980raw · history · blame

package loadbalancer

import (
	"io"
	"sort"
	"sync"
	"sync/atomic"

	"github.com/go-kit/kit/endpoint"
	"github.com/go-kit/kit/log"
)

// EndpointCache caches endpoints that need to be deallocated when they're no
// longer useful. Clients update the cache by providing a current set of
// instance strings. The cache converts each instance string to an endpoint
// and a closer via the factory function.
//
// Instance strings are assumed to be unique and are used as keys. Endpoints
// that were in the previous set of instances and are not in the current set
// are considered invalid and closed.
//
// EndpointCache is designed to be used in your publisher implementation.
type EndpointCache struct {
	mtx    sync.Mutex
	f      Factory
	m      map[string]endpointCloser
	cache  atomic.Value //[]endpoint.Endpoint
	logger log.Logger
}

// NewEndpointCache produces a new EndpointCache, ready for use. Instance
// strings will be converted to endpoints via the provided factory function.
// The logger is used to log errors.
func NewEndpointCache(f Factory, logger log.Logger) *EndpointCache {
	endpointCache := &EndpointCache{
		f:      f,
		m:      map[string]endpointCloser{},
		logger: log.NewContext(logger).With("component", "Endpoint Cache"),
	}

	endpointCache.cache.Store(make([]endpoint.Endpoint, 0))

	return endpointCache
}

type endpointCloser struct {
	endpoint.Endpoint
	io.Closer
}

// Replace replaces the current set of endpoints with endpoints manufactured
// by the passed instances. If the same instance exists in both the existing
// and new sets, it's left untouched.
func (t *EndpointCache) Replace(instances []string) {
	t.mtx.Lock()
	defer t.mtx.Unlock()

	// Produce the current set of endpoints.
	oldMap := t.m
	t.m = make(map[string]endpointCloser, len(instances))
	for _, instance := range instances {
		// If it already exists, just copy it over.
		if ec, ok := oldMap[instance]; ok {
			t.m[instance] = ec
			delete(oldMap, instance)
			continue
		}

		// If it doesn't exist, create it.
		endpoint, closer, err := t.f(instance)
		if err != nil {
			t.logger.Log("instance", instance, "err", err)
			continue
		}
		t.m[instance] = endpointCloser{endpoint, closer}
	}

	t.refreshCache()

	// Close any leftover endpoints.
	for _, ec := range oldMap {
		if ec.Closer != nil {
			ec.Closer.Close()
		}
	}
}

func (t *EndpointCache) refreshCache() {
	var (
		length    = len(t.m)
		instances = make([]string, 0, length)
		newCache  = make([]endpoint.Endpoint, 0, length)
	)

	for instance, _ := range t.m {
		instances = append(instances, instance)
	}
	// Sort the instances for ensuring that Endpoints are returned into the same order if no modified.
	sort.Strings(instances)

	for _, instance := range instances {
		newCache = append(newCache, t.m[instance].Endpoint)
	}

	t.cache.Store(newCache)
}

// Endpoints returns the current set of endpoints in undefined order. Satisfies
// Publisher interface.
func (t *EndpointCache) Endpoints() ([]endpoint.Endpoint, error) {
	return t.cache.Load().([]endpoint.Endpoint), nil
}