Codebase list golang-github-go-kit-kit / a83f2ba5-b16e-4ce4-8519-5a89894e4275/main sd / eureka / instancer.go
a83f2ba5-b16e-4ce4-8519-5a89894e4275/main

Tree @a83f2ba5-b16e-4ce4-8519-5a89894e4275/main (Download .tar.gz)

instancer.go @a83f2ba5-b16e-4ce4-8519-5a89894e4275/mainraw · history · blame

package eureka

import (
	"fmt"

	"github.com/hudl/fargo"

	"github.com/go-kit/kit/log"
	"github.com/go-kit/kit/sd"
	"github.com/go-kit/kit/sd/internal/instance"
)

// Instancer yields instances stored in the Eureka registry for the given app.
// Changes in that app are watched and will update the subscribers.
type Instancer struct {
	cache  *instance.Cache
	conn   fargoConnection
	app    string
	logger log.Logger
	quitc  chan chan struct{}
}

// NewInstancer returns a Eureka Instancer. It will start watching the given
// app string for changes, and update the subscribers accordingly.
func NewInstancer(conn fargoConnection, app string, logger log.Logger) *Instancer {
	logger = log.With(logger, "app", app)

	s := &Instancer{
		cache:  instance.NewCache(),
		conn:   conn,
		app:    app,
		logger: logger,
		quitc:  make(chan chan struct{}),
	}

	done := make(chan struct{})
	updates := conn.ScheduleAppUpdates(app, true, done)
	s.consume(<-updates)
	go s.loop(updates, done)
	return s
}

// Stop terminates the Instancer.
func (s *Instancer) Stop() {
	q := make(chan struct{})
	s.quitc <- q
	<-q
	s.quitc = nil
}

func (s *Instancer) consume(update fargo.AppUpdate) {
	if update.Err != nil {
		s.logger.Log("during", "Update", "err", update.Err)
		s.cache.Update(sd.Event{Err: update.Err})
		return
	}
	instances := convertFargoAppToInstances(update.App)
	s.logger.Log("instances", len(instances))
	s.cache.Update(sd.Event{Instances: instances})
}

func (s *Instancer) loop(updates <-chan fargo.AppUpdate, done chan<- struct{}) {
	defer close(done)

	for {
		select {
		case update := <-updates:
			s.consume(update)
		case q := <-s.quitc:
			close(q)
			return
		}
	}
}

func (s *Instancer) getInstances() ([]string, error) {
	app, err := s.conn.GetApp(s.app)
	if err != nil {
		return nil, err
	}
	return convertFargoAppToInstances(app), nil
}

func convertFargoAppToInstances(app *fargo.Application) []string {
	instances := make([]string, len(app.Instances))
	for i, inst := range app.Instances {
		instances[i] = fmt.Sprintf("%s:%d", inst.IPAddr, inst.Port)
	}
	return instances
}

// Register implements Instancer.
func (s *Instancer) Register(ch chan<- sd.Event) {
	s.cache.Register(ch)
}

// Deregister implements Instancer.
func (s *Instancer) Deregister(ch chan<- sd.Event) {
	s.cache.Deregister(ch)
}

// state returns the current state of instance.Cache, only for testing
func (s *Instancer) state() sd.Event {
	return s.cache.State()
}