Codebase list golang-github-go-kit-kit / cme/main sd / eureka / registrar.go
cme/main

Tree @cme/main (Download .tar.gz)

registrar.go @cme/main

cb03da6
 
 
 
443f6ea
f5b8fe6
cb03da6
 
f5b8fe6
cb03da6
 
443f6ea
cb03da6
 
443f6ea
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a843a9e
 
 
 
cb03da6
 
443f6ea
72a93ad
cb03da6
443f6ea
 
cb03da6
 
443f6ea
 
cb03da6
443f6ea
 
cb03da6
443f6ea
 
 
cb03da6
 
 
443f6ea
cb03da6
443f6ea
 
 
 
 
cb03da6
 
443f6ea
 
cb03da6
443f6ea
 
 
cb03da6
 
443f6ea
cb03da6
443f6ea
 
cb03da6
443f6ea
 
cb03da6
443f6ea
 
 
 
 
cb03da6
 
 
443f6ea
 
 
 
 
 
 
 
2a823da
cb03da6
 
443f6ea
 
 
 
 
 
 
 
cb03da6
443f6ea
cb03da6
 
 
 
443f6ea
a843a9e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
443f6ea
 
a843a9e
 
 
 
 
 
443f6ea
 
 
package eureka

import (
	"fmt"
	"net/http"
	"sync"
	"time"

	"github.com/hudl/fargo"

	"github.com/go-kit/kit/log"
	"github.com/go-kit/kit/sd"
)

// Matches official Netflix Java client default.
const defaultRenewalInterval = 30 * time.Second

// The methods of fargo.Connection used in this package.
type fargoConnection interface {
	RegisterInstance(instance *fargo.Instance) error
	DeregisterInstance(instance *fargo.Instance) error
	ReregisterInstance(instance *fargo.Instance) error
	HeartBeatInstance(instance *fargo.Instance) error
	ScheduleAppUpdates(name string, await bool, done <-chan struct{}) <-chan fargo.AppUpdate
	GetApp(name string) (*fargo.Application, error)
}

type fargoUnsuccessfulHTTPResponse struct {
	statusCode    int
	messagePrefix string
}

func (u *fargoUnsuccessfulHTTPResponse) Error() string {
	return fmt.Sprintf("err=%s code=%d", u.messagePrefix, u.statusCode)
}

// Registrar maintains service instance liveness information in Eureka.
type Registrar struct {
	conn     fargoConnection
	instance *fargo.Instance
	logger   log.Logger
	quitc    chan chan struct{}
	sync.Mutex
}

var _ sd.Registrar = (*Registrar)(nil)

// NewRegistrar returns an Eureka Registrar acting on behalf of the provided
// Fargo connection and instance. See the integration test for usage examples.
func NewRegistrar(conn fargoConnection, instance *fargo.Instance, logger log.Logger) *Registrar {
	return &Registrar{
		conn:     conn,
		instance: instance,
		logger:   log.With(logger, "service", instance.App, "address", fmt.Sprintf("%s:%d", instance.IPAddr, instance.Port)),
	}
}

// Register implements sd.Registrar.
func (r *Registrar) Register() {
	r.Lock()
	defer r.Unlock()

	if r.quitc != nil {
		return // Already in the registration loop.
	}

	if err := r.conn.RegisterInstance(r.instance); err != nil {
		r.logger.Log("during", "Register", "err", err)
	}

	r.quitc = make(chan chan struct{})
	go r.loop()
}

// Deregister implements sd.Registrar.
func (r *Registrar) Deregister() {
	r.Lock()
	defer r.Unlock()

	if r.quitc == nil {
		return // Already deregistered.
	}

	q := make(chan struct{})
	r.quitc <- q
	<-q
	r.quitc = nil
}

func (r *Registrar) loop() {
	var renewalInterval time.Duration
	if r.instance.LeaseInfo.RenewalIntervalInSecs > 0 {
		renewalInterval = time.Duration(r.instance.LeaseInfo.RenewalIntervalInSecs) * time.Second
	} else {
		renewalInterval = defaultRenewalInterval
	}
	ticker := time.NewTicker(renewalInterval)
	defer ticker.Stop()

	for {
		select {
		case <-ticker.C:
			if err := r.heartbeat(); err != nil {
				r.logger.Log("during", "heartbeat", "err", err)
			}

		case q := <-r.quitc:
			if err := r.conn.DeregisterInstance(r.instance); err != nil {
				r.logger.Log("during", "Deregister", "err", err)
			}
			close(q)
			return
		}
	}
}

func httpResponseStatusCode(err error) (code int, present bool) {
	if code, ok := fargo.HTTPResponseStatusCode(err); ok {
		return code, true
	}
	// Allow injection of errors for testing.
	if u, ok := err.(*fargoUnsuccessfulHTTPResponse); ok {
		return u.statusCode, true
	}
	return 0, false
}

func isNotFound(err error) bool {
	code, ok := httpResponseStatusCode(err)
	return ok && code == http.StatusNotFound
}

func (r *Registrar) heartbeat() error {
	err := r.conn.HeartBeatInstance(r.instance)
	if err == nil {
		return nil
	}
	if isNotFound(err) {
		// Instance expired (e.g. network partition). Re-register.
		return r.conn.ReregisterInstance(r.instance)
	}
	return err
}