|
0 |
package etcd
|
|
1 |
|
|
2 |
import (
|
|
3 |
"io"
|
|
4 |
"os"
|
|
5 |
"testing"
|
|
6 |
"time"
|
|
7 |
|
|
8 |
"golang.org/x/net/context"
|
|
9 |
|
|
10 |
"github.com/go-kit/kit/endpoint"
|
|
11 |
"github.com/go-kit/kit/log"
|
|
12 |
)
|
|
13 |
|
|
14 |
// Package sd/etcd provides a wrapper around the etcd key/value store. This
|
|
15 |
// example assumes the user has an instance of etcd installed and running
|
|
16 |
// locally on port 2379.
|
|
17 |
func TestIntegration(t *testing.T) {
|
|
18 |
addr := os.Getenv("ETCD_ADDR")
|
|
19 |
if addr == "" {
|
|
20 |
t.Skip("ETCD_ADDR not set; skipping integration test")
|
|
21 |
}
|
|
22 |
|
|
23 |
var (
|
|
24 |
prefix = "/services/foosvc/" // known at compile time
|
|
25 |
instance = "1.2.3.4:8080" // taken from runtime or platform, somehow
|
|
26 |
key = prefix + instance
|
|
27 |
value = "http://" + instance // based on our transport
|
|
28 |
)
|
|
29 |
|
|
30 |
client, err := NewClient(context.Background(), []string{addr}, ClientOptions{
|
|
31 |
DialTimeout: 2 * time.Second,
|
|
32 |
DialKeepAlive: 2 * time.Second,
|
|
33 |
HeaderTimeoutPerRequest: 2 * time.Second,
|
|
34 |
})
|
|
35 |
if err != nil {
|
|
36 |
t.Fatalf("NewClient(%q): %v", addr, err)
|
|
37 |
}
|
|
38 |
|
|
39 |
// Verify test data is initially empty.
|
|
40 |
entries, err := client.GetEntries(key)
|
|
41 |
if err == nil {
|
|
42 |
t.Fatalf("GetEntries(%q): expected error, got none", key)
|
|
43 |
}
|
|
44 |
t.Logf("GetEntries(%q): %v (OK)", key, err)
|
|
45 |
|
|
46 |
// Instantiate a new Registrar, passing in test data.
|
|
47 |
registrar := NewRegistrar(client, Service{
|
|
48 |
Key: key,
|
|
49 |
Value: value,
|
|
50 |
}, log.NewContext(log.NewLogfmtLogger(os.Stderr)).With("component", "registrar"))
|
|
51 |
|
|
52 |
// Register our instance.
|
|
53 |
registrar.Register()
|
|
54 |
t.Logf("Registered")
|
|
55 |
|
|
56 |
// Retrieve entries from etcd manually.
|
|
57 |
entries, err = client.GetEntries(key)
|
|
58 |
if err != nil {
|
|
59 |
t.Fatalf("client.GetEntries(%q): %v", key, err)
|
|
60 |
}
|
|
61 |
if want, have := 1, len(entries); want != have {
|
|
62 |
t.Fatalf("client.GetEntries(%q): want %d, have %d", key, want, have)
|
|
63 |
}
|
|
64 |
if want, have := value, entries[0]; want != have {
|
|
65 |
t.Fatalf("want %q, have %q", want, have)
|
|
66 |
}
|
|
67 |
|
|
68 |
subscriber, err := NewSubscriber(
|
|
69 |
client,
|
|
70 |
prefix,
|
|
71 |
func(string) (endpoint.Endpoint, io.Closer, error) { return endpoint.Nop, nil, nil },
|
|
72 |
log.NewContext(log.NewLogfmtLogger(os.Stderr)).With("component", "subscriber"),
|
|
73 |
)
|
|
74 |
if err != nil {
|
|
75 |
t.Fatalf("NewSubscriber: %v", err)
|
|
76 |
}
|
|
77 |
t.Logf("Constructed Subscriber OK")
|
|
78 |
|
|
79 |
if !within(time.Second, func() bool {
|
|
80 |
endpoints, err := subscriber.Endpoints()
|
|
81 |
return err == nil && len(endpoints) == 1
|
|
82 |
}) {
|
|
83 |
t.Fatalf("Subscriber didn't see Register in time")
|
|
84 |
}
|
|
85 |
t.Logf("Subscriber saw Register OK")
|
|
86 |
|
|
87 |
// Deregister first instance of test data.
|
|
88 |
registrar.Deregister()
|
|
89 |
t.Logf("Deregistered")
|
|
90 |
|
|
91 |
// Check it was deregistered.
|
|
92 |
if !within(time.Second, func() bool {
|
|
93 |
endpoints, err := subscriber.Endpoints()
|
|
94 |
t.Logf("Checking Deregister: len(endpoints) = %d, err = %v", len(endpoints), err)
|
|
95 |
return err == nil && len(endpoints) == 0
|
|
96 |
}) {
|
|
97 |
t.Fatalf("Subscriber didn't see Deregister in time")
|
|
98 |
}
|
|
99 |
|
|
100 |
// Verify test data no longer exists in etcd.
|
|
101 |
entries, err = client.GetEntries(key)
|
|
102 |
if err == nil {
|
|
103 |
t.Fatalf("GetEntries(%q): expected error, got none", key)
|
|
104 |
}
|
|
105 |
t.Logf("GetEntries(%q): %v (OK)", key, err)
|
|
106 |
}
|
|
107 |
|
|
108 |
func within(d time.Duration, f func() bool) bool {
|
|
109 |
deadline := time.Now().Add(d)
|
|
110 |
for time.Now().Before(deadline) {
|
|
111 |
if f() {
|
|
112 |
return true
|
|
113 |
}
|
|
114 |
time.Sleep(d / 10)
|
|
115 |
}
|
|
116 |
return false
|
|
117 |
}
|