package sd
import (
"errors"
"io"
"testing"
"time"
"github.com/go-kit/kit/endpoint"
"github.com/go-kit/kit/log"
)
func TestEndpointCache(t *testing.T) {
var (
ca = make(closer)
cb = make(closer)
c = map[string]io.Closer{"a": ca, "b": cb}
f = func(instance string) (endpoint.Endpoint, io.Closer, error) { return endpoint.Nop, c[instance], nil }
cache = newEndpointCache(f, log.NewNopLogger(), endpointerOptions{})
)
// Populate
cache.Update(Event{Instances: []string{"a", "b"}})
select {
case <-ca:
t.Errorf("endpoint a closed, not good")
case <-cb:
t.Errorf("endpoint b closed, not good")
case <-time.After(time.Millisecond):
t.Logf("no closures yet, good")
}
assertEndpointsLen(t, cache, 2)
// Duplicate, should be no-op
cache.Update(Event{Instances: []string{"a", "b"}})
select {
case <-ca:
t.Errorf("endpoint a closed, not good")
case <-cb:
t.Errorf("endpoint b closed, not good")
case <-time.After(time.Millisecond):
t.Logf("no closures yet, good")
}
assertEndpointsLen(t, cache, 2)
// Error, should continue returning old endpoints
cache.Update(Event{Err: errors.New("sd error")})
select {
case <-ca:
t.Errorf("endpoint a closed, not good")
case <-cb:
t.Errorf("endpoint b closed, not good")
case <-time.After(time.Millisecond):
t.Logf("no closures yet, good")
}
assertEndpointsLen(t, cache, 2)
// Delete b
go cache.Update(Event{Instances: []string{"a"}})
select {
case <-ca:
t.Errorf("endpoint a closed, not good")
case <-cb:
t.Logf("endpoint b closed, good")
case <-time.After(time.Second):
t.Errorf("didn't close the deleted instance in time")
}
assertEndpointsLen(t, cache, 1)
// Delete a
go cache.Update(Event{Instances: []string{}})
select {
// case <-cb: will succeed, as it's closed
case <-ca:
t.Logf("endpoint a closed, good")
case <-time.After(time.Second):
t.Errorf("didn't close the deleted instance in time")
}
assertEndpointsLen(t, cache, 0)
}
func TestEndpointCacheErrorAndTimeout(t *testing.T) {
var (
ca = make(closer)
cb = make(closer)
c = map[string]io.Closer{"a": ca, "b": cb}
f = func(instance string) (endpoint.Endpoint, io.Closer, error) { return endpoint.Nop, c[instance], nil }
timeOut = 100 * time.Millisecond
cache = newEndpointCache(f, log.NewNopLogger(), endpointerOptions{
invalidateOnError: true,
invalidateTimeout: timeOut,
})
)
timeNow := time.Now()
cache.timeNow = func() time.Time { return timeNow }
// Populate
cache.Update(Event{Instances: []string{"a"}})
select {
case <-ca:
t.Errorf("endpoint a closed, not good")
case <-time.After(time.Millisecond):
t.Logf("no closures yet, good")
}
assertEndpointsLen(t, cache, 1)
// Send error, keep time still.
cache.Update(Event{Err: errors.New("sd error")})
select {
case <-ca:
t.Errorf("endpoint a closed, not good")
case <-time.After(time.Millisecond):
t.Logf("no closures yet, good")
}
assertEndpointsLen(t, cache, 1)
// Move the time, but less than the timeout
timeNow = timeNow.Add(timeOut / 2)
assertEndpointsLen(t, cache, 1)
select {
case <-ca:
t.Errorf("endpoint a closed, not good")
case <-time.After(time.Millisecond):
t.Logf("no closures yet, good")
}
// Move the time past the timeout
timeNow = timeNow.Add(timeOut)
assertEndpointsError(t, cache, "sd error")
select {
case <-ca:
t.Logf("endpoint a closed, good")
case <-time.After(time.Millisecond):
t.Errorf("didn't close the deleted instance in time")
}
// Send another error
cache.Update(Event{Err: errors.New("another sd error")})
assertEndpointsError(t, cache, "sd error") // expect original error
}
func TestBadFactory(t *testing.T) {
cache := newEndpointCache(func(string) (endpoint.Endpoint, io.Closer, error) {
return nil, nil, errors.New("bad factory")
}, log.NewNopLogger(), endpointerOptions{})
cache.Update(Event{Instances: []string{"foo:1234", "bar:5678"}})
assertEndpointsLen(t, cache, 0)
}
func assertEndpointsLen(t *testing.T, cache *endpointCache, l int) {
endpoints, err := cache.Endpoints()
if err != nil {
t.Errorf("unexpected error %v", err)
return
}
if want, have := l, len(endpoints); want != have {
t.Errorf("want %d, have %d", want, have)
}
}
func assertEndpointsError(t *testing.T, cache *endpointCache, wantErr string) {
endpoints, err := cache.Endpoints()
if err == nil {
t.Errorf("expecting error, not good")
return
}
if want, have := wantErr, err.Error(); want != have {
t.Errorf("want %s, have %s", want, have)
return
}
if want, have := 0, len(endpoints); want != have {
t.Errorf("want %d, have %d", want, have)
}
}
type closer chan struct{}
func (c closer) Close() error { close(c); return nil }