//go:build flaky_integration
// +build flaky_integration
package etcdv3
import (
"context"
"io"
"os"
"testing"
"time"
"github.com/go-kit/kit/endpoint"
"github.com/go-kit/kit/sd"
"github.com/go-kit/log"
)
func runIntegration(settings integrationSettings, client Client, service Service, t *testing.T) {
// Verify test data is initially empty.
entries, err := client.GetEntries(settings.key)
if err != nil {
t.Fatalf("GetEntries(%q): expected no error, got one: %v", settings.key, err)
}
if len(entries) > 0 {
t.Fatalf("GetEntries(%q): expected no instance entries, got %d", settings.key, len(entries))
}
t.Logf("GetEntries(%q): %v (OK)", settings.key, entries)
// Instantiate a new Registrar, passing in test data.
registrar := NewRegistrar(
client,
service,
log.With(log.NewLogfmtLogger(os.Stderr), "component", "registrar"),
)
// Register our instance.
registrar.Register()
t.Log("Registered")
// Retrieve entries from etcd manually.
entries, err = client.GetEntries(settings.key)
if err != nil {
t.Fatalf("client.GetEntries(%q): %v", settings.key, err)
}
if want, have := 1, len(entries); want != have {
t.Fatalf("client.GetEntries(%q): want %d, have %d", settings.key, want, have)
}
if want, have := settings.value, entries[0]; want != have {
t.Fatalf("want %q, have %q", want, have)
}
instancer, err := NewInstancer(
client,
settings.prefix,
log.With(log.NewLogfmtLogger(os.Stderr), "component", "instancer"),
)
if err != nil {
t.Fatalf("NewInstancer: %v", err)
}
t.Log("Constructed Instancer OK")
defer instancer.Stop()
endpointer := sd.NewEndpointer(
instancer,
func(string) (endpoint.Endpoint, io.Closer, error) { return endpoint.Nop, nil, nil },
log.With(log.NewLogfmtLogger(os.Stderr), "component", "instancer"),
)
t.Log("Constructed Endpointer OK")
defer endpointer.Close()
if !within(time.Second, func() bool {
endpoints, err := endpointer.Endpoints()
return err == nil && len(endpoints) == 1
}) {
t.Fatal("Endpointer didn't see Register in time")
}
t.Log("Endpointer saw Register OK")
// Deregister first instance of test data.
registrar.Deregister()
t.Log("Deregistered")
// Check it was deregistered.
if !within(time.Second, func() bool {
endpoints, err := endpointer.Endpoints()
t.Logf("Checking Deregister: len(endpoints) = %d, err = %v", len(endpoints), err)
return err == nil && len(endpoints) == 0
}) {
t.Fatalf("Endpointer didn't see Deregister in time")
}
// Verify test data no longer exists in etcd.
entries, err = client.GetEntries(settings.key)
if err != nil {
t.Fatalf("GetEntries(%q): expected no error, got one: %v", settings.key, err)
}
if len(entries) > 0 {
t.Fatalf("GetEntries(%q): expected no entries, got %v", settings.key, entries)
}
t.Logf("GetEntries(%q): %v (OK)", settings.key, entries)
}
type integrationSettings struct {
addr string
prefix string
instance string
key string
value string
}
func testIntegrationSettings(t *testing.T) integrationSettings {
var settings integrationSettings
settings.addr = os.Getenv("ETCD_ADDR")
if settings.addr == "" {
t.Skip("ETCD_ADDR not set; skipping integration test")
}
settings.prefix = "/services/foosvc/" // known at compile time
settings.instance = "1.2.3.4:8080" // taken from runtime or platform, somehow
settings.key = settings.prefix + settings.instance
settings.value = "http://" + settings.instance // based on our transport
return settings
}
// Package sd/etcd provides a wrapper around the etcd key/value store. This
// example assumes the user has an instance of etcd installed and running
// locally on port 2379.
func TestIntegration(t *testing.T) {
settings := testIntegrationSettings(t)
client, err := NewClient(context.Background(), []string{settings.addr}, ClientOptions{
DialTimeout: 2 * time.Second,
DialKeepAlive: 2 * time.Second,
})
if err != nil {
t.Fatalf("NewClient(%q): %v", settings.addr, err)
}
service := Service{
Key: settings.key,
Value: settings.value,
}
runIntegration(settings, client, service, t)
}
func TestIntegrationTTL(t *testing.T) {
settings := testIntegrationSettings(t)
client, err := NewClient(context.Background(), []string{settings.addr}, ClientOptions{
DialTimeout: 2 * time.Second,
DialKeepAlive: 2 * time.Second,
})
if err != nil {
t.Fatalf("NewClient(%q): %v", settings.addr, err)
}
service := Service{
Key: settings.key,
Value: settings.value,
TTL: NewTTLOption(time.Second*3, time.Second*10),
}
defer client.Deregister(service)
runIntegration(settings, client, service, t)
}
func TestIntegrationRegistrarOnly(t *testing.T) {
settings := testIntegrationSettings(t)
client, err := NewClient(context.Background(), []string{settings.addr}, ClientOptions{
DialTimeout: 2 * time.Second,
DialKeepAlive: 2 * time.Second,
})
if err != nil {
t.Fatalf("NewClient(%q): %v", settings.addr, err)
}
service := Service{
Key: settings.key,
Value: settings.value,
TTL: NewTTLOption(time.Second*3, time.Second*10),
}
defer client.Deregister(service)
// Verify test data is initially empty.
entries, err := client.GetEntries(settings.key)
if err != nil {
t.Fatalf("GetEntries(%q): expected no error, got one: %v", settings.key, err)
}
if len(entries) > 0 {
t.Fatalf("GetEntries(%q): expected no instance entries, got %d", settings.key, len(entries))
}
t.Logf("GetEntries(%q): %v (OK)", settings.key, entries)
// Instantiate a new Registrar, passing in test data.
registrar := NewRegistrar(
client,
service,
log.With(log.NewLogfmtLogger(os.Stderr), "component", "registrar"),
)
// Register our instance.
registrar.Register()
t.Log("Registered")
// Deregister our instance. (so we test registrar only scenario)
registrar.Deregister()
t.Log("Deregistered")
}
func within(d time.Duration, f func() bool) bool {
deadline := time.Now().Add(d)
for time.Now().Before(deadline) {
if f() {
return true
}
time.Sleep(d / 10)
}
return false
}