Codebase list golang-github-go-kit-kit / 50036860-5a95-4bf4-b155-667a95eed3d7/main sd / endpoint_cache_test.go
50036860-5a95-4bf4-b155-667a95eed3d7/main

Tree @50036860-5a95-4bf4-b155-667a95eed3d7/main (Download .tar.gz)

endpoint_cache_test.go @50036860-5a95-4bf4-b155-667a95eed3d7/main

c80303e
9a19822
 
 
 
 
 
 
 
 
 
 
f5bc66e
9a19822
 
 
 
 
c80303e
9a19822
 
 
c80303e
9a19822
 
 
 
 
 
 
 
c80303e
9a19822
 
c80303e
9a19822
 
 
 
 
 
 
 
c80303e
9a19822
9225fe9
 
 
 
 
 
 
 
 
 
 
 
9a19822
c80303e
9a19822
 
 
 
 
 
 
 
c80303e
9a19822
 
c80303e
9a19822
 
 
 
 
 
 
c80303e
9a19822
 
f5bc66e
9225fe9
 
 
 
 
 
8cb3665
 
 
 
9225fe9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9a19822
c80303e
9a19822
c80303e
9a19822
c80303e
 
 
 
 
 
 
 
 
 
 
9a19822
 
 
 
9225fe9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9a19822
 
 
package sd

import (
	"errors"
	"io"
	"testing"
	"time"

	"github.com/go-kit/kit/endpoint"
	"github.com/go-kit/kit/log"
)

func TestEndpointCache(t *testing.T) {
	var (
		ca    = make(closer)
		cb    = make(closer)
		c     = map[string]io.Closer{"a": ca, "b": cb}
		f     = func(instance string) (endpoint.Endpoint, io.Closer, error) { return endpoint.Nop, c[instance], nil }
		cache = newEndpointCache(f, log.NewNopLogger(), endpointerOptions{})
	)

	// Populate
	cache.Update(Event{Instances: []string{"a", "b"}})
	select {
	case <-ca:
		t.Errorf("endpoint a closed, not good")
	case <-cb:
		t.Errorf("endpoint b closed, not good")
	case <-time.After(time.Millisecond):
		t.Logf("no closures yet, good")
	}
	assertEndpointsLen(t, cache, 2)

	// Duplicate, should be no-op
	cache.Update(Event{Instances: []string{"a", "b"}})
	select {
	case <-ca:
		t.Errorf("endpoint a closed, not good")
	case <-cb:
		t.Errorf("endpoint b closed, not good")
	case <-time.After(time.Millisecond):
		t.Logf("no closures yet, good")
	}
	assertEndpointsLen(t, cache, 2)

	// Error, should continue returning old endpoints
	cache.Update(Event{Err: errors.New("sd error")})
	select {
	case <-ca:
		t.Errorf("endpoint a closed, not good")
	case <-cb:
		t.Errorf("endpoint b closed, not good")
	case <-time.After(time.Millisecond):
		t.Logf("no closures yet, good")
	}
	assertEndpointsLen(t, cache, 2)

	// Delete b
	go cache.Update(Event{Instances: []string{"a"}})
	select {
	case <-ca:
		t.Errorf("endpoint a closed, not good")
	case <-cb:
		t.Logf("endpoint b closed, good")
	case <-time.After(time.Second):
		t.Errorf("didn't close the deleted instance in time")
	}
	assertEndpointsLen(t, cache, 1)

	// Delete a
	go cache.Update(Event{Instances: []string{}})
	select {
	// case <-cb: will succeed, as it's closed
	case <-ca:
		t.Logf("endpoint a closed, good")
	case <-time.After(time.Second):
		t.Errorf("didn't close the deleted instance in time")
	}
	assertEndpointsLen(t, cache, 0)
}

func TestEndpointCacheErrorAndTimeout(t *testing.T) {
	var (
		ca      = make(closer)
		cb      = make(closer)
		c       = map[string]io.Closer{"a": ca, "b": cb}
		f       = func(instance string) (endpoint.Endpoint, io.Closer, error) { return endpoint.Nop, c[instance], nil }
		timeOut = 100 * time.Millisecond
		cache   = newEndpointCache(f, log.NewNopLogger(), endpointerOptions{
			invalidateOnError: true,
			invalidateTimeout: timeOut,
		})
	)

	timeNow := time.Now()
	cache.timeNow = func() time.Time { return timeNow }

	// Populate
	cache.Update(Event{Instances: []string{"a"}})
	select {
	case <-ca:
		t.Errorf("endpoint a closed, not good")
	case <-time.After(time.Millisecond):
		t.Logf("no closures yet, good")
	}
	assertEndpointsLen(t, cache, 1)

	// Send error, keep time still.
	cache.Update(Event{Err: errors.New("sd error")})
	select {
	case <-ca:
		t.Errorf("endpoint a closed, not good")
	case <-time.After(time.Millisecond):
		t.Logf("no closures yet, good")
	}
	assertEndpointsLen(t, cache, 1)

	// Move the time, but less than the timeout
	timeNow = timeNow.Add(timeOut / 2)
	assertEndpointsLen(t, cache, 1)
	select {
	case <-ca:
		t.Errorf("endpoint a closed, not good")
	case <-time.After(time.Millisecond):
		t.Logf("no closures yet, good")
	}

	// Move the time past the timeout
	timeNow = timeNow.Add(timeOut)
	assertEndpointsError(t, cache, "sd error")
	select {
	case <-ca:
		t.Logf("endpoint a closed, good")
	case <-time.After(time.Millisecond):
		t.Errorf("didn't close the deleted instance in time")
	}

	// Send another error
	cache.Update(Event{Err: errors.New("another sd error")})
	assertEndpointsError(t, cache, "sd error") // expect original error
}

func TestBadFactory(t *testing.T) {
	cache := newEndpointCache(func(string) (endpoint.Endpoint, io.Closer, error) {
		return nil, nil, errors.New("bad factory")
	}, log.NewNopLogger(), endpointerOptions{})

	cache.Update(Event{Instances: []string{"foo:1234", "bar:5678"}})
	assertEndpointsLen(t, cache, 0)
}

func assertEndpointsLen(t *testing.T, cache *endpointCache, l int) {
	endpoints, err := cache.Endpoints()
	if err != nil {
		t.Errorf("unexpected error %v", err)
		return
	}
	if want, have := l, len(endpoints); want != have {
		t.Errorf("want %d, have %d", want, have)
	}
}

func assertEndpointsError(t *testing.T, cache *endpointCache, wantErr string) {
	endpoints, err := cache.Endpoints()
	if err == nil {
		t.Errorf("expecting error, not good")
		return
	}
	if want, have := wantErr, err.Error(); want != have {
		t.Errorf("want %s, have %s", want, have)
		return
	}
	if want, have := 0, len(endpoints); want != have {
		t.Errorf("want %d, have %d", want, have)
	}
}

type closer chan struct{}

func (c closer) Close() error { close(c); return nil }