Codebase list golang-github-go-kit-kit / de552e08-1950-4c1b-983e-dd11d2b704d4/main sd / eureka / instancer.go
de552e08-1950-4c1b-983e-dd11d2b704d4/main

Tree @de552e08-1950-4c1b-983e-dd11d2b704d4/main (Download .tar.gz)

instancer.go @de552e08-1950-4c1b-983e-dd11d2b704d4/main

cb03da6
 
 
 
 
f5b8fe6
cb03da6
 
 
c80303e
cb03da6
 
c80303e
 
 
9225fe9
c80303e
 
 
 
cb03da6
 
c80303e
 
 
443f6ea
 
c80303e
9225fe9
c80303e
 
 
 
cb03da6
 
4ede06e
 
 
 
cb03da6
 
 
c80303e
 
443f6ea
 
 
 
cb03da6
 
4ede06e
 
 
 
 
 
 
 
 
 
 
 
443f6ea
 
cb03da6
 
4ede06e
 
443f6ea
 
 
cb03da6
 
 
 
c80303e
443f6ea
 
 
cb03da6
443f6ea
cb03da6
 
443f6ea
 
 
 
cb03da6
 
 
9225fe9
 
 
 
 
 
 
 
 
 
513c60c
 
 
 
 
package eureka

import (
	"fmt"

	"github.com/hudl/fargo"

	"github.com/go-kit/kit/log"
	"github.com/go-kit/kit/sd"
	"github.com/go-kit/kit/sd/internal/instance"
)

// Instancer yields instances stored in the Eureka registry for the given app.
// Changes in that app are watched and will update the subscribers.
type Instancer struct {
	cache  *instance.Cache
	conn   fargoConnection
	app    string
	logger log.Logger
	quitc  chan chan struct{}
}

// NewInstancer returns a Eureka Instancer. It will start watching the given
// app string for changes, and update the subscribers accordingly.
func NewInstancer(conn fargoConnection, app string, logger log.Logger) *Instancer {
	logger = log.With(logger, "app", app)

	s := &Instancer{
		cache:  instance.NewCache(),
		conn:   conn,
		app:    app,
		logger: logger,
		quitc:  make(chan chan struct{}),
	}

	done := make(chan struct{})
	updates := conn.ScheduleAppUpdates(app, true, done)
	s.consume(<-updates)
	go s.loop(updates, done)
	return s
}

// Stop terminates the Instancer.
func (s *Instancer) Stop() {
	q := make(chan struct{})
	s.quitc <- q
	<-q
	s.quitc = nil
}

func (s *Instancer) consume(update fargo.AppUpdate) {
	if update.Err != nil {
		s.logger.Log("during", "Update", "err", update.Err)
		s.cache.Update(sd.Event{Err: update.Err})
		return
	}
	instances := convertFargoAppToInstances(update.App)
	s.logger.Log("instances", len(instances))
	s.cache.Update(sd.Event{Instances: instances})
}

func (s *Instancer) loop(updates <-chan fargo.AppUpdate, done chan<- struct{}) {
	defer close(done)

	for {
		select {
		case update := <-updates:
			s.consume(update)
		case q := <-s.quitc:
			close(q)
			return
		}
	}
}

func (s *Instancer) getInstances() ([]string, error) {
	app, err := s.conn.GetApp(s.app)
	if err != nil {
		return nil, err
	}
	return convertFargoAppToInstances(app), nil
}

func convertFargoAppToInstances(app *fargo.Application) []string {
	instances := make([]string, len(app.Instances))
	for i, inst := range app.Instances {
		instances[i] = fmt.Sprintf("%s:%d", inst.IPAddr, inst.Port)
	}
	return instances
}

// Register implements Instancer.
func (s *Instancer) Register(ch chan<- sd.Event) {
	s.cache.Register(ch)
}

// Deregister implements Instancer.
func (s *Instancer) Deregister(ch chan<- sd.Event) {
	s.cache.Deregister(ch)
}

// state returns the current state of instance.Cache, only for testing
func (s *Instancer) state() sd.Event {
	return s.cache.State()
}