Codebase list golang-github-go-kit-kit / 203e4782-15ed-484f-9517-a1c12654c5bc/v0.12.0 sd / etcdv3 / integration_test.go
203e4782-15ed-484f-9517-a1c12654c5bc/v0.12.0

Tree @203e4782-15ed-484f-9517-a1c12654c5bc/v0.12.0 (Download .tar.gz)

integration_test.go @203e4782-15ed-484f-9517-a1c12654c5bc/v0.12.0raw · history · blame

//go:build flaky_integration
// +build flaky_integration

package etcdv3

import (
	"context"
	"io"
	"os"
	"testing"
	"time"

	"github.com/go-kit/kit/endpoint"
	"github.com/go-kit/kit/sd"
	"github.com/go-kit/log"
)

func runIntegration(settings integrationSettings, client Client, service Service, t *testing.T) {
	// Verify test data is initially empty.
	entries, err := client.GetEntries(settings.key)
	if err != nil {
		t.Fatalf("GetEntries(%q): expected no error, got one: %v", settings.key, err)
	}
	if len(entries) > 0 {
		t.Fatalf("GetEntries(%q): expected no instance entries, got %d", settings.key, len(entries))
	}
	t.Logf("GetEntries(%q): %v (OK)", settings.key, entries)

	// Instantiate a new Registrar, passing in test data.
	registrar := NewRegistrar(
		client,
		service,
		log.With(log.NewLogfmtLogger(os.Stderr), "component", "registrar"),
	)

	// Register our instance.
	registrar.Register()
	t.Log("Registered")

	// Retrieve entries from etcd manually.
	entries, err = client.GetEntries(settings.key)
	if err != nil {
		t.Fatalf("client.GetEntries(%q): %v", settings.key, err)
	}
	if want, have := 1, len(entries); want != have {
		t.Fatalf("client.GetEntries(%q): want %d, have %d", settings.key, want, have)
	}
	if want, have := settings.value, entries[0]; want != have {
		t.Fatalf("want %q, have %q", want, have)
	}

	instancer, err := NewInstancer(
		client,
		settings.prefix,
		log.With(log.NewLogfmtLogger(os.Stderr), "component", "instancer"),
	)
	if err != nil {
		t.Fatalf("NewInstancer: %v", err)
	}
	t.Log("Constructed Instancer OK")
	defer instancer.Stop()

	endpointer := sd.NewEndpointer(
		instancer,
		func(string) (endpoint.Endpoint, io.Closer, error) { return endpoint.Nop, nil, nil },
		log.With(log.NewLogfmtLogger(os.Stderr), "component", "instancer"),
	)
	t.Log("Constructed Endpointer OK")
	defer endpointer.Close()

	if !within(time.Second, func() bool {
		endpoints, err := endpointer.Endpoints()
		return err == nil && len(endpoints) == 1
	}) {
		t.Fatal("Endpointer didn't see Register in time")
	}
	t.Log("Endpointer saw Register OK")

	// Deregister first instance of test data.
	registrar.Deregister()
	t.Log("Deregistered")

	// Check it was deregistered.
	if !within(time.Second, func() bool {
		endpoints, err := endpointer.Endpoints()
		t.Logf("Checking Deregister: len(endpoints) = %d, err = %v", len(endpoints), err)
		return err == nil && len(endpoints) == 0
	}) {
		t.Fatalf("Endpointer didn't see Deregister in time")
	}

	// Verify test data no longer exists in etcd.
	entries, err = client.GetEntries(settings.key)
	if err != nil {
		t.Fatalf("GetEntries(%q): expected no error, got one: %v", settings.key, err)
	}
	if len(entries) > 0 {
		t.Fatalf("GetEntries(%q): expected no entries, got %v", settings.key, entries)
	}
	t.Logf("GetEntries(%q): %v (OK)", settings.key, entries)
}

type integrationSettings struct {
	addr     string
	prefix   string
	instance string
	key      string
	value    string
}

func testIntegrationSettings(t *testing.T) integrationSettings {
	var settings integrationSettings

	settings.addr = os.Getenv("ETCD_ADDR")
	if settings.addr == "" {
		t.Skip("ETCD_ADDR not set; skipping integration test")
	}

	settings.prefix = "/services/foosvc/" // known at compile time
	settings.instance = "1.2.3.4:8080"    // taken from runtime or platform, somehow
	settings.key = settings.prefix + settings.instance
	settings.value = "http://" + settings.instance // based on our transport

	return settings
}

// Package sd/etcd provides a wrapper around the etcd key/value store. This
// example assumes the user has an instance of etcd installed and running
// locally on port 2379.
func TestIntegration(t *testing.T) {
	settings := testIntegrationSettings(t)
	client, err := NewClient(context.Background(), []string{settings.addr}, ClientOptions{
		DialTimeout:   2 * time.Second,
		DialKeepAlive: 2 * time.Second,
	})
	if err != nil {
		t.Fatalf("NewClient(%q): %v", settings.addr, err)
	}

	service := Service{
		Key:   settings.key,
		Value: settings.value,
	}

	runIntegration(settings, client, service, t)
}

func TestIntegrationTTL(t *testing.T) {
	settings := testIntegrationSettings(t)
	client, err := NewClient(context.Background(), []string{settings.addr}, ClientOptions{
		DialTimeout:   2 * time.Second,
		DialKeepAlive: 2 * time.Second,
	})
	if err != nil {
		t.Fatalf("NewClient(%q): %v", settings.addr, err)
	}

	service := Service{
		Key:   settings.key,
		Value: settings.value,
		TTL:   NewTTLOption(time.Second*3, time.Second*10),
	}
	defer client.Deregister(service)

	runIntegration(settings, client, service, t)
}

func TestIntegrationRegistrarOnly(t *testing.T) {
	settings := testIntegrationSettings(t)
	client, err := NewClient(context.Background(), []string{settings.addr}, ClientOptions{
		DialTimeout:   2 * time.Second,
		DialKeepAlive: 2 * time.Second,
	})
	if err != nil {
		t.Fatalf("NewClient(%q): %v", settings.addr, err)
	}

	service := Service{
		Key:   settings.key,
		Value: settings.value,
		TTL:   NewTTLOption(time.Second*3, time.Second*10),
	}
	defer client.Deregister(service)

	// Verify test data is initially empty.
	entries, err := client.GetEntries(settings.key)
	if err != nil {
		t.Fatalf("GetEntries(%q): expected no error, got one: %v", settings.key, err)
	}
	if len(entries) > 0 {
		t.Fatalf("GetEntries(%q): expected no instance entries, got %d", settings.key, len(entries))
	}
	t.Logf("GetEntries(%q): %v (OK)", settings.key, entries)

	// Instantiate a new Registrar, passing in test data.
	registrar := NewRegistrar(
		client,
		service,
		log.With(log.NewLogfmtLogger(os.Stderr), "component", "registrar"),
	)

	// Register our instance.
	registrar.Register()
	t.Log("Registered")

	// Deregister our instance. (so we test registrar only scenario)
	registrar.Deregister()
	t.Log("Deregistered")

}

func within(d time.Duration, f func() bool) bool {
	deadline := time.Now().Add(d)
	for time.Now().Before(deadline) {
		if f() {
			return true
		}
		time.Sleep(d / 10)
	}
	return false
}