Codebase list golang-github-go-kit-kit / 203e4782-15ed-484f-9517-a1c12654c5bc/v0.8.0 sd / endpointer.go
203e4782-15ed-484f-9517-a1c12654c5bc/v0.8.0

Tree @203e4782-15ed-484f-9517-a1c12654c5bc/v0.8.0 (Download .tar.gz)

endpointer.go @203e4782-15ed-484f-9517-a1c12654c5bc/v0.8.0raw · history · blame

package sd

import (
	"time"

	"github.com/go-kit/kit/endpoint"
	"github.com/go-kit/kit/log"
)

// Endpointer listens to a service discovery system and yields a set of
// identical endpoints on demand. An error indicates a problem with connectivity
// to the service discovery system, or within the system itself; an Endpointer
// may yield no endpoints without error.
type Endpointer interface {
	Endpoints() ([]endpoint.Endpoint, error)
}

// FixedEndpointer yields a fixed set of endpoints.
type FixedEndpointer []endpoint.Endpoint

// Endpoints implements Endpointer.
func (s FixedEndpointer) Endpoints() ([]endpoint.Endpoint, error) { return s, nil }

// NewEndpointer creates an Endpointer that subscribes to updates from Instancer src
// and uses factory f to create Endpoints. If src notifies of an error, the Endpointer
// keeps returning previously created Endpoints assuming they are still good, unless
// this behavior is disabled via InvalidateOnError option.
func NewEndpointer(src Instancer, f Factory, logger log.Logger, options ...EndpointerOption) *DefaultEndpointer {
	opts := endpointerOptions{}
	for _, opt := range options {
		opt(&opts)
	}
	se := &DefaultEndpointer{
		cache:     newEndpointCache(f, logger, opts),
		instancer: src,
		ch:        make(chan Event),
	}
	go se.receive()
	src.Register(se.ch)
	return se
}

// EndpointerOption allows control of endpointCache behavior.
type EndpointerOption func(*endpointerOptions)

// InvalidateOnError returns EndpointerOption that controls how the Endpointer
// behaves when then Instancer publishes an Event containing an error.
// Without this option the Endpointer continues returning the last known
// endpoints. With this option, the Endpointer continues returning the last
// known endpoints until the timeout elapses, then closes all active endpoints
// and starts returning an error. Once the Instancer sends a new update with
// valid resource instances, the normal operation is resumed.
func InvalidateOnError(timeout time.Duration) EndpointerOption {
	return func(opts *endpointerOptions) {
		opts.invalidateOnError = true
		opts.invalidateTimeout = timeout
	}
}

type endpointerOptions struct {
	invalidateOnError bool
	invalidateTimeout time.Duration
}

// DefaultEndpointer implements an Endpointer interface.
// When created with NewEndpointer function, it automatically registers
// as a subscriber to events from the Instances and maintains a list
// of active Endpoints.
type DefaultEndpointer struct {
	cache     *endpointCache
	instancer Instancer
	ch        chan Event
}

func (de *DefaultEndpointer) receive() {
	for event := range de.ch {
		de.cache.Update(event)
	}
}

// Close deregisters DefaultEndpointer from the Instancer and stops the internal go-routine.
func (de *DefaultEndpointer) Close() {
	de.instancer.Deregister(de.ch)
	close(de.ch)
}

// Endpoints implements Endpointer.
func (de *DefaultEndpointer) Endpoints() ([]endpoint.Endpoint, error) {
	return de.cache.Endpoints()
}