Codebase list golang-github-go-kit-kit / 4509b84c-5d8f-4656-8a70-242053b8059a/main sd / endpoint_cache_test.go
4509b84c-5d8f-4656-8a70-242053b8059a/main

Tree @4509b84c-5d8f-4656-8a70-242053b8059a/main (Download .tar.gz)

endpoint_cache_test.go @4509b84c-5d8f-4656-8a70-242053b8059a/mainraw · history · blame

package sd

import (
	"errors"
	"io"
	"testing"
	"time"

	"github.com/go-kit/kit/endpoint"
	"github.com/go-kit/kit/log"
)

func TestEndpointCache(t *testing.T) {
	var (
		ca    = make(closer)
		cb    = make(closer)
		c     = map[string]io.Closer{"a": ca, "b": cb}
		f     = func(instance string) (endpoint.Endpoint, io.Closer, error) { return endpoint.Nop, c[instance], nil }
		cache = newEndpointCache(f, log.NewNopLogger(), endpointerOptions{})
	)

	// Populate
	cache.Update(Event{Instances: []string{"a", "b"}})
	select {
	case <-ca:
		t.Errorf("endpoint a closed, not good")
	case <-cb:
		t.Errorf("endpoint b closed, not good")
	case <-time.After(time.Millisecond):
		t.Logf("no closures yet, good")
	}
	assertEndpointsLen(t, cache, 2)

	// Duplicate, should be no-op
	cache.Update(Event{Instances: []string{"a", "b"}})
	select {
	case <-ca:
		t.Errorf("endpoint a closed, not good")
	case <-cb:
		t.Errorf("endpoint b closed, not good")
	case <-time.After(time.Millisecond):
		t.Logf("no closures yet, good")
	}
	assertEndpointsLen(t, cache, 2)

	// Error, should continue returning old endpoints
	cache.Update(Event{Err: errors.New("sd error")})
	select {
	case <-ca:
		t.Errorf("endpoint a closed, not good")
	case <-cb:
		t.Errorf("endpoint b closed, not good")
	case <-time.After(time.Millisecond):
		t.Logf("no closures yet, good")
	}
	assertEndpointsLen(t, cache, 2)

	// Delete b
	go cache.Update(Event{Instances: []string{"a"}})
	select {
	case <-ca:
		t.Errorf("endpoint a closed, not good")
	case <-cb:
		t.Logf("endpoint b closed, good")
	case <-time.After(time.Second):
		t.Errorf("didn't close the deleted instance in time")
	}
	assertEndpointsLen(t, cache, 1)

	// Delete a
	go cache.Update(Event{Instances: []string{}})
	select {
	// case <-cb: will succeed, as it's closed
	case <-ca:
		t.Logf("endpoint a closed, good")
	case <-time.After(time.Second):
		t.Errorf("didn't close the deleted instance in time")
	}
	assertEndpointsLen(t, cache, 0)
}

func TestEndpointCacheErrorAndTimeout(t *testing.T) {
	var (
		ca      = make(closer)
		cb      = make(closer)
		c       = map[string]io.Closer{"a": ca, "b": cb}
		f       = func(instance string) (endpoint.Endpoint, io.Closer, error) { return endpoint.Nop, c[instance], nil }
		timeOut = 100 * time.Millisecond
		cache   = newEndpointCache(f, log.NewNopLogger(), endpointerOptions{
			invalidateOnError: true,
			invalidateTimeout: timeOut,
		})
	)

	timeNow := time.Now()
	cache.timeNow = func() time.Time { return timeNow }

	// Populate
	cache.Update(Event{Instances: []string{"a"}})
	select {
	case <-ca:
		t.Errorf("endpoint a closed, not good")
	case <-time.After(time.Millisecond):
		t.Logf("no closures yet, good")
	}
	assertEndpointsLen(t, cache, 1)

	// Send error, keep time still.
	cache.Update(Event{Err: errors.New("sd error")})
	select {
	case <-ca:
		t.Errorf("endpoint a closed, not good")
	case <-time.After(time.Millisecond):
		t.Logf("no closures yet, good")
	}
	assertEndpointsLen(t, cache, 1)

	// Move the time, but less than the timeout
	timeNow = timeNow.Add(timeOut / 2)
	assertEndpointsLen(t, cache, 1)
	select {
	case <-ca:
		t.Errorf("endpoint a closed, not good")
	case <-time.After(time.Millisecond):
		t.Logf("no closures yet, good")
	}

	// Move the time past the timeout
	timeNow = timeNow.Add(timeOut)
	assertEndpointsError(t, cache, "sd error")
	select {
	case <-ca:
		t.Logf("endpoint a closed, good")
	case <-time.After(time.Millisecond):
		t.Errorf("didn't close the deleted instance in time")
	}

	// Send another error
	cache.Update(Event{Err: errors.New("another sd error")})
	assertEndpointsError(t, cache, "sd error") // expect original error
}

func TestBadFactory(t *testing.T) {
	cache := newEndpointCache(func(string) (endpoint.Endpoint, io.Closer, error) {
		return nil, nil, errors.New("bad factory")
	}, log.NewNopLogger(), endpointerOptions{})

	cache.Update(Event{Instances: []string{"foo:1234", "bar:5678"}})
	assertEndpointsLen(t, cache, 0)
}

func assertEndpointsLen(t *testing.T, cache *endpointCache, l int) {
	endpoints, err := cache.Endpoints()
	if err != nil {
		t.Errorf("unexpected error %v", err)
		return
	}
	if want, have := l, len(endpoints); want != have {
		t.Errorf("want %d, have %d", want, have)
	}
}

func assertEndpointsError(t *testing.T, cache *endpointCache, wantErr string) {
	endpoints, err := cache.Endpoints()
	if err == nil {
		t.Errorf("expecting error, not good")
		return
	}
	if want, have := wantErr, err.Error(); want != have {
		t.Errorf("want %s, have %s", want, have)
		return
	}
	if want, have := 0, len(endpoints); want != have {
		t.Errorf("want %d, have %d", want, have)
	}
}

type closer chan struct{}

func (c closer) Close() error { close(c); return nil }