Codebase list golang-github-go-kit-kit / 3acee0cf-8dfd-409b-92f9-f5db1260ed49/main sd / eureka / registrar.go
3acee0cf-8dfd-409b-92f9-f5db1260ed49/main

Tree @3acee0cf-8dfd-409b-92f9-f5db1260ed49/main (Download .tar.gz)

registrar.go @3acee0cf-8dfd-409b-92f9-f5db1260ed49/mainraw · history · blame

package eureka

import (
	"fmt"
	"net/http"
	"sync"
	"time"

	"github.com/hudl/fargo"

	"github.com/go-kit/kit/log"
	"github.com/go-kit/kit/sd"
)

// Matches official Netflix Java client default.
const defaultRenewalInterval = 30 * time.Second

// The methods of fargo.Connection used in this package.
type fargoConnection interface {
	RegisterInstance(instance *fargo.Instance) error
	DeregisterInstance(instance *fargo.Instance) error
	ReregisterInstance(instance *fargo.Instance) error
	HeartBeatInstance(instance *fargo.Instance) error
	ScheduleAppUpdates(name string, await bool, done <-chan struct{}) <-chan fargo.AppUpdate
	GetApp(name string) (*fargo.Application, error)
}

type fargoUnsuccessfulHTTPResponse struct {
	statusCode    int
	messagePrefix string
}

func (u *fargoUnsuccessfulHTTPResponse) Error() string {
	return fmt.Sprintf("err=%s code=%d", u.messagePrefix, u.statusCode)
}

// Registrar maintains service instance liveness information in Eureka.
type Registrar struct {
	conn     fargoConnection
	instance *fargo.Instance
	logger   log.Logger
	quitc    chan chan struct{}
	sync.Mutex
}

var _ sd.Registrar = (*Registrar)(nil)

// NewRegistrar returns an Eureka Registrar acting on behalf of the provided
// Fargo connection and instance. See the integration test for usage examples.
func NewRegistrar(conn fargoConnection, instance *fargo.Instance, logger log.Logger) *Registrar {
	return &Registrar{
		conn:     conn,
		instance: instance,
		logger:   log.With(logger, "service", instance.App, "address", fmt.Sprintf("%s:%d", instance.IPAddr, instance.Port)),
	}
}

// Register implements sd.Registrar.
func (r *Registrar) Register() {
	r.Lock()
	defer r.Unlock()

	if r.quitc != nil {
		return // Already in the registration loop.
	}

	if err := r.conn.RegisterInstance(r.instance); err != nil {
		r.logger.Log("during", "Register", "err", err)
	}

	r.quitc = make(chan chan struct{})
	go r.loop()
}

// Deregister implements sd.Registrar.
func (r *Registrar) Deregister() {
	r.Lock()
	defer r.Unlock()

	if r.quitc == nil {
		return // Already deregistered.
	}

	q := make(chan struct{})
	r.quitc <- q
	<-q
	r.quitc = nil
}

func (r *Registrar) loop() {
	var renewalInterval time.Duration
	if r.instance.LeaseInfo.RenewalIntervalInSecs > 0 {
		renewalInterval = time.Duration(r.instance.LeaseInfo.RenewalIntervalInSecs) * time.Second
	} else {
		renewalInterval = defaultRenewalInterval
	}
	ticker := time.NewTicker(renewalInterval)
	defer ticker.Stop()

	for {
		select {
		case <-ticker.C:
			if err := r.heartbeat(); err != nil {
				r.logger.Log("during", "heartbeat", "err", err)
			}

		case q := <-r.quitc:
			if err := r.conn.DeregisterInstance(r.instance); err != nil {
				r.logger.Log("during", "Deregister", "err", err)
			}
			close(q)
			return
		}
	}
}

func httpResponseStatusCode(err error) (code int, present bool) {
	if code, ok := fargo.HTTPResponseStatusCode(err); ok {
		return code, true
	}
	// Allow injection of errors for testing.
	if u, ok := err.(*fargoUnsuccessfulHTTPResponse); ok {
		return u.statusCode, true
	}
	return 0, false
}

func isNotFound(err error) bool {
	code, ok := httpResponseStatusCode(err)
	return ok && code == http.StatusNotFound
}

func (r *Registrar) heartbeat() error {
	err := r.conn.HeartBeatInstance(r.instance)
	if err == nil {
		return nil
	}
	if isNotFound(err) {
		// Instance expired (e.g. network partition). Re-register.
		return r.conn.ReregisterInstance(r.instance)
	}
	return err
}