Service Discovery support for etcd v3 (#663)
* Service Discovery support for etcd v3
This adds a go-kit/kit/sd implementation for etcd/clientv3. All tests except the
client_test were ported over.
I have removed the client tests because they are
more problematic with regards to etcd v3. The etcd maintainers
switched to protobufs in v3 and placed those generated messages under
internal/, which would cause issues trying to reference it
(see https://golang.org/s/go14internal).
This makes stubbing clientv3.KV more difficult without burying the
implementation behind interfaces to remove the direct dependence
on those internal packages.
Features in use with v3:
- Support for etcd/clientv3.Watcher
- Support for etcd/clientv3.Lease (TTL)
- Username/password support
* - dead code cleanup
- moved global structs inside of test functions since they are not used
anywhere else
- moved unused Err variable
- removed error return type from close()
- added doc comment
Robin Harper authored 6 years ago
Peter Bourgon committed 6 years ago
0 | package etcdv3 | |
1 | ||
2 | import ( | |
3 | "context" | |
4 | "crypto/tls" | |
5 | "errors" | |
6 | "time" | |
7 | ||
8 | "github.com/coreos/etcd/clientv3" | |
9 | "github.com/coreos/etcd/pkg/transport" | |
10 | ) | |
11 | ||
12 | var ( | |
13 | // ErrNoKey indicates a client method needs a key but receives none. | |
14 | ErrNoKey = errors.New("no key provided") | |
15 | ||
16 | // ErrNoValue indicates a client method needs a value but receives none. | |
17 | ErrNoValue = errors.New("no value provided") | |
18 | ) | |
19 | ||
20 | // Client is a wrapper around the etcd client. | |
21 | type Client interface { | |
22 | // GetEntries queries the given prefix in etcd and returns a slice | |
23 | // containing the values of all keys found, recursively, underneath that | |
24 | // prefix. | |
25 | GetEntries(prefix string) ([]string, error) | |
26 | ||
27 | // WatchPrefix watches the given prefix in etcd for changes. When a change | |
28 | // is detected, it will signal on the passed channel. Clients are expected | |
29 | // to call GetEntries to update themselves with the latest set of complete | |
30 | // values. WatchPrefix will always send an initial sentinel value on the | |
31 | // channel after establishing the watch, to ensure that clients always | |
32 | // receive the latest set of values. WatchPrefix will block until the | |
33 | // context passed to the NewClient constructor is terminated. | |
34 | WatchPrefix(prefix string, ch chan struct{}) | |
35 | ||
36 | // Register a service with etcd. | |
37 | Register(s Service) error | |
38 | ||
39 | // Deregister a service with etcd. | |
40 | Deregister(s Service) error | |
41 | ||
42 | // LeaseID returns the lease id created for this service instance | |
43 | LeaseID() int64 | |
44 | } | |
45 | ||
46 | type client struct { | |
47 | cli *clientv3.Client | |
48 | ctx context.Context | |
49 | ||
50 | kv clientv3.KV | |
51 | ||
52 | // Watcher interface instance, used to leverage Watcher.Close() | |
53 | watcher clientv3.Watcher | |
54 | // watcher context | |
55 | wctx context.Context | |
56 | // watcher cancel func | |
57 | wcf context.CancelFunc | |
58 | ||
59 | // leaseID will be 0 (clientv3.NoLease) if a lease was not created | |
60 | leaseID clientv3.LeaseID | |
61 | ||
62 | hbch <-chan *clientv3.LeaseKeepAliveResponse | |
63 | // Lease interface instance, used to leverage Lease.Close() | |
64 | leaser clientv3.Lease | |
65 | } | |
66 | ||
67 | // ClientOptions defines options for the etcd client. All values are optional. | |
68 | // If any duration is not specified, a default of 3 seconds will be used. | |
69 | type ClientOptions struct { | |
70 | Cert string | |
71 | Key string | |
72 | CACert string | |
73 | DialTimeout time.Duration | |
74 | DialKeepAlive time.Duration | |
75 | Username string | |
76 | Password string | |
77 | } | |
78 | ||
79 | // NewClient returns Client with a connection to the named machines. It will | |
80 | // return an error if a connection to the cluster cannot be made. | |
81 | func NewClient(ctx context.Context, machines []string, options ClientOptions) (Client, error) { | |
82 | if options.DialTimeout == 0 { | |
83 | options.DialTimeout = 3 * time.Second | |
84 | } | |
85 | if options.DialKeepAlive == 0 { | |
86 | options.DialKeepAlive = 3 * time.Second | |
87 | } | |
88 | ||
89 | var err error | |
90 | var tlscfg *tls.Config | |
91 | ||
92 | if options.Cert != "" && options.Key != "" { | |
93 | tlsInfo := transport.TLSInfo{ | |
94 | CertFile: options.Cert, | |
95 | KeyFile: options.Key, | |
96 | TrustedCAFile: options.CACert, | |
97 | } | |
98 | tlscfg, err = tlsInfo.ClientConfig() | |
99 | if err != nil { | |
100 | return nil, err | |
101 | } | |
102 | } | |
103 | ||
104 | cli, err := clientv3.New(clientv3.Config{ | |
105 | Context: ctx, | |
106 | Endpoints: machines, | |
107 | DialTimeout: options.DialTimeout, | |
108 | DialKeepAliveTime: options.DialKeepAlive, | |
109 | TLS: tlscfg, | |
110 | Username: options.Username, | |
111 | Password: options.Password, | |
112 | }) | |
113 | if err != nil { | |
114 | return nil, err | |
115 | } | |
116 | ||
117 | return &client{ | |
118 | cli: cli, | |
119 | ctx: ctx, | |
120 | kv: clientv3.NewKV(cli), | |
121 | }, nil | |
122 | } | |
123 | ||
124 | func (c *client) LeaseID() int64 { return int64(c.leaseID) } | |
125 | ||
126 | // GetEntries implements the etcd Client interface. | |
127 | func (c *client) GetEntries(key string) ([]string, error) { | |
128 | resp, err := c.kv.Get(c.ctx, key, clientv3.WithPrefix()) | |
129 | if err != nil { | |
130 | return nil, err | |
131 | } | |
132 | ||
133 | entries := make([]string, len(resp.Kvs)) | |
134 | for i, kv := range resp.Kvs { | |
135 | entries[i] = string(kv.Value) | |
136 | } | |
137 | ||
138 | return entries, nil | |
139 | } | |
140 | ||
141 | // WatchPrefix implements the etcd Client interface. | |
142 | func (c *client) WatchPrefix(prefix string, ch chan struct{}) { | |
143 | c.wctx, c.wcf = context.WithCancel(c.ctx) | |
144 | c.watcher = clientv3.NewWatcher(c.cli) | |
145 | ||
146 | wch := c.watcher.Watch(c.wctx, prefix, clientv3.WithPrefix(), clientv3.WithRev(0)) | |
147 | ch <- struct{}{} | |
148 | for wr := range wch { | |
149 | if wr.Canceled { | |
150 | return | |
151 | } | |
152 | ch <- struct{}{} | |
153 | } | |
154 | } | |
155 | ||
156 | func (c *client) Register(s Service) error { | |
157 | var err error | |
158 | ||
159 | if s.Key == "" { | |
160 | return ErrNoKey | |
161 | } | |
162 | if s.Value == "" { | |
163 | return ErrNoValue | |
164 | } | |
165 | ||
166 | if c.leaser != nil { | |
167 | c.leaser.Close() | |
168 | } | |
169 | c.leaser = clientv3.NewLease(c.cli) | |
170 | ||
171 | if c.watcher != nil { | |
172 | c.watcher.Close() | |
173 | } | |
174 | c.watcher = clientv3.NewWatcher(c.cli) | |
175 | if c.kv == nil { | |
176 | c.kv = clientv3.NewKV(c.cli) | |
177 | } | |
178 | ||
179 | if s.TTL == nil { | |
180 | s.TTL = NewTTLOption(time.Second*3, time.Second*10) | |
181 | } | |
182 | ||
183 | grantResp, err := c.leaser.Grant(c.ctx, int64(s.TTL.ttl.Seconds())) | |
184 | if err != nil { | |
185 | return err | |
186 | } | |
187 | c.leaseID = grantResp.ID | |
188 | ||
189 | _, err = c.kv.Put( | |
190 | c.ctx, | |
191 | s.Key, | |
192 | s.Value, | |
193 | clientv3.WithLease(c.leaseID), | |
194 | ) | |
195 | if err != nil { | |
196 | return err | |
197 | } | |
198 | ||
199 | // this will keep the key alive 'forever' or until we revoke it or | |
200 | // the context is canceled | |
201 | c.hbch, err = c.leaser.KeepAlive(c.ctx, c.leaseID) | |
202 | if err != nil { | |
203 | return err | |
204 | } | |
205 | ||
206 | return nil | |
207 | } | |
208 | ||
209 | func (c *client) Deregister(s Service) error { | |
210 | defer c.close() | |
211 | ||
212 | if s.Key == "" { | |
213 | return ErrNoKey | |
214 | } | |
215 | if _, err := c.cli.Delete(c.ctx, s.Key, clientv3.WithIgnoreLease()); err != nil { | |
216 | return err | |
217 | } | |
218 | ||
219 | return nil | |
220 | } | |
221 | ||
222 | // close will close any open clients and call | |
223 | // the watcher cancel func | |
224 | func (c *client) close() { | |
225 | if c.leaser != nil { | |
226 | c.leaser.Close() | |
227 | } | |
228 | if c.watcher != nil { | |
229 | c.watcher.Close() | |
230 | c.wcf() | |
231 | } | |
232 | } |
0 | // Package etcdv3 provides an Instancer and Registrar implementation for etcd v3. If | |
1 | // you use etcd v3 as your service discovery system, this package will help you | |
2 | // implement the registration and client-side load balancing patterns. | |
3 | package etcdv3 |
0 | package etcdv3 | |
1 | ||
2 | import ( | |
3 | "context" | |
4 | "io" | |
5 | "time" | |
6 | ||
7 | "github.com/go-kit/kit/endpoint" | |
8 | "github.com/go-kit/kit/log" | |
9 | "github.com/go-kit/kit/sd" | |
10 | "github.com/go-kit/kit/sd/lb" | |
11 | ) | |
12 | ||
13 | func Example() { | |
14 | // Let's say this is a service that means to register itself. | |
15 | // First, we will set up some context. | |
16 | var ( | |
17 | etcdServer = "10.0.0.1:2379" // in the change from v2 to v3, the schema is no longer necessary if connecting directly to an etcd v3 instance | |
18 | prefix = "/services/foosvc/" // known at compile time | |
19 | instance = "1.2.3.4:8080" // taken from runtime or platform, somehow | |
20 | key = prefix + instance // should be globally unique | |
21 | value = "http://" + instance // based on our transport | |
22 | ctx = context.Background() | |
23 | ) | |
24 | ||
25 | options := ClientOptions{ | |
26 | // Path to trusted ca file | |
27 | CACert: "", | |
28 | ||
29 | // Path to certificate | |
30 | Cert: "", | |
31 | ||
32 | // Path to private key | |
33 | Key: "", | |
34 | ||
35 | // Username if required | |
36 | Username: "", | |
37 | ||
38 | // Password if required | |
39 | Password: "", | |
40 | ||
41 | // If DialTimeout is 0, it defaults to 3s | |
42 | DialTimeout: time.Second * 3, | |
43 | ||
44 | // If DialKeepAlive is 0, it defaults to 3s | |
45 | DialKeepAlive: time.Second * 3, | |
46 | } | |
47 | ||
48 | // Build the client. | |
49 | client, err := NewClient(ctx, []string{etcdServer}, options) | |
50 | if err != nil { | |
51 | panic(err) | |
52 | } | |
53 | ||
54 | // Build the registrar. | |
55 | registrar := NewRegistrar(client, Service{ | |
56 | Key: key, | |
57 | Value: value, | |
58 | }, log.NewNopLogger()) | |
59 | ||
60 | // Register our instance. | |
61 | registrar.Register() | |
62 | ||
63 | // At the end of our service lifecycle, for example at the end of func main, | |
64 | // we should make sure to deregister ourselves. This is important! Don't | |
65 | // accidentally skip this step by invoking a log.Fatal or os.Exit in the | |
66 | // interim, which bypasses the defer stack. | |
67 | defer registrar.Deregister() | |
68 | ||
69 | // It's likely that we'll also want to connect to other services and call | |
70 | // their methods. We can build an Instancer to listen for changes from etcd, | |
71 | // create Endpointer, wrap it with a load-balancer to pick a single | |
72 | // endpoint, and finally wrap it with a retry strategy to get something that | |
73 | // can be used as an endpoint directly. | |
74 | barPrefix := "/services/barsvc" | |
75 | logger := log.NewNopLogger() | |
76 | instancer, err := NewInstancer(client, barPrefix, logger) | |
77 | if err != nil { | |
78 | panic(err) | |
79 | } | |
80 | endpointer := sd.NewEndpointer(instancer, barFactory, logger) | |
81 | balancer := lb.NewRoundRobin(endpointer) | |
82 | retry := lb.Retry(3, 3*time.Second, balancer) | |
83 | ||
84 | // And now retry can be used like any other endpoint. | |
85 | req := struct{}{} | |
86 | if _, err = retry(ctx, req); err != nil { | |
87 | panic(err) | |
88 | } | |
89 | } | |
90 | ||
91 | func barFactory(string) (endpoint.Endpoint, io.Closer, error) { return endpoint.Nop, nil, nil } |
0 | package etcdv3 | |
1 | ||
2 | import ( | |
3 | "github.com/go-kit/kit/log" | |
4 | "github.com/go-kit/kit/sd" | |
5 | "github.com/go-kit/kit/sd/internal/instance" | |
6 | ) | |
7 | ||
8 | // Instancer yields instances stored in a certain etcd keyspace. Any kind of | |
9 | // change in that keyspace is watched and will update the Instancer's Instancers. | |
10 | type Instancer struct { | |
11 | cache *instance.Cache | |
12 | client Client | |
13 | prefix string | |
14 | logger log.Logger | |
15 | quitc chan struct{} | |
16 | } | |
17 | ||
18 | // NewInstancer returns an etcd instancer. It will start watching the given | |
19 | // prefix for changes, and update the subscribers. | |
20 | func NewInstancer(c Client, prefix string, logger log.Logger) (*Instancer, error) { | |
21 | s := &Instancer{ | |
22 | client: c, | |
23 | prefix: prefix, | |
24 | cache: instance.NewCache(), | |
25 | logger: logger, | |
26 | quitc: make(chan struct{}), | |
27 | } | |
28 | ||
29 | instances, err := s.client.GetEntries(s.prefix) | |
30 | if err == nil { | |
31 | logger.Log("prefix", s.prefix, "instances", len(instances)) | |
32 | } else { | |
33 | logger.Log("prefix", s.prefix, "err", err) | |
34 | } | |
35 | s.cache.Update(sd.Event{Instances: instances, Err: err}) | |
36 | ||
37 | go s.loop() | |
38 | return s, nil | |
39 | } | |
40 | ||
41 | func (s *Instancer) loop() { | |
42 | ch := make(chan struct{}) | |
43 | go s.client.WatchPrefix(s.prefix, ch) | |
44 | ||
45 | for { | |
46 | select { | |
47 | case <-ch: | |
48 | instances, err := s.client.GetEntries(s.prefix) | |
49 | if err != nil { | |
50 | s.logger.Log("msg", "failed to retrieve entries", "err", err) | |
51 | s.cache.Update(sd.Event{Err: err}) | |
52 | continue | |
53 | } | |
54 | s.cache.Update(sd.Event{Instances: instances}) | |
55 | ||
56 | case <-s.quitc: | |
57 | return | |
58 | } | |
59 | } | |
60 | } | |
61 | ||
62 | // Stop terminates the Instancer. | |
63 | func (s *Instancer) Stop() { | |
64 | close(s.quitc) | |
65 | } | |
66 | ||
67 | // Register implements Instancer. | |
68 | func (s *Instancer) Register(ch chan<- sd.Event) { | |
69 | s.cache.Register(ch) | |
70 | } | |
71 | ||
72 | // Deregister implements Instancer. | |
73 | func (s *Instancer) Deregister(ch chan<- sd.Event) { | |
74 | s.cache.Deregister(ch) | |
75 | } |
0 | package etcdv3 | |
1 | ||
2 | import ( | |
3 | "errors" | |
4 | "testing" | |
5 | ||
6 | "github.com/go-kit/kit/log" | |
7 | "github.com/go-kit/kit/sd" | |
8 | ) | |
9 | ||
10 | var _ sd.Instancer = (*Instancer)(nil) // API check | |
11 | ||
12 | type testKV struct { | |
13 | Key []byte | |
14 | Value []byte | |
15 | } | |
16 | ||
17 | type testResponse struct { | |
18 | Kvs []testKV | |
19 | } | |
20 | ||
21 | var ( | |
22 | fakeResponse = testResponse{ | |
23 | Kvs: []testKV{ | |
24 | { | |
25 | Key: []byte("/foo/1"), | |
26 | Value: []byte("1:1"), | |
27 | }, | |
28 | { | |
29 | Key: []byte("/foo/2"), | |
30 | Value: []byte("2:2"), | |
31 | }, | |
32 | }, | |
33 | } | |
34 | ) | |
35 | ||
36 | var _ sd.Instancer = &Instancer{} // API check | |
37 | ||
38 | func TestInstancer(t *testing.T) { | |
39 | client := &fakeClient{ | |
40 | responses: map[string]testResponse{"/foo": fakeResponse}, | |
41 | } | |
42 | ||
43 | s, err := NewInstancer(client, "/foo", log.NewNopLogger()) | |
44 | if err != nil { | |
45 | t.Fatal(err) | |
46 | } | |
47 | defer s.Stop() | |
48 | ||
49 | if state := s.cache.State(); state.Err != nil { | |
50 | t.Fatal(state.Err) | |
51 | } | |
52 | } | |
53 | ||
54 | type fakeClient struct { | |
55 | responses map[string]testResponse | |
56 | } | |
57 | ||
58 | func (c *fakeClient) GetEntries(prefix string) ([]string, error) { | |
59 | response, ok := c.responses[prefix] | |
60 | if !ok { | |
61 | return nil, errors.New("key not exist") | |
62 | } | |
63 | ||
64 | entries := make([]string, len(response.Kvs)) | |
65 | for i, node := range response.Kvs { | |
66 | entries[i] = string(node.Value) | |
67 | } | |
68 | return entries, nil | |
69 | } | |
70 | ||
71 | func (c *fakeClient) WatchPrefix(prefix string, ch chan struct{}) { | |
72 | } | |
73 | ||
74 | func (c *fakeClient) LeaseID() int64 { | |
75 | return 0 | |
76 | } | |
77 | ||
78 | func (c *fakeClient) Register(Service) error { | |
79 | return nil | |
80 | } | |
81 | func (c *fakeClient) Deregister(Service) error { | |
82 | return nil | |
83 | } |
0 | // +build integration | |
1 | ||
2 | package etcdv3 | |
3 | ||
4 | import ( | |
5 | "context" | |
6 | "io" | |
7 | "os" | |
8 | "testing" | |
9 | "time" | |
10 | ||
11 | "github.com/go-kit/kit/endpoint" | |
12 | "github.com/go-kit/kit/log" | |
13 | "github.com/go-kit/kit/sd" | |
14 | ) | |
15 | ||
16 | func runIntegration(settings integrationSettings, client Client, service Service, t *testing.T) { | |
17 | // Verify test data is initially empty. | |
18 | entries, err := client.GetEntries(settings.key) | |
19 | if err != nil { | |
20 | t.Fatalf("GetEntries(%q): expected no error, got one: %v", settings.key, err) | |
21 | } | |
22 | if len(entries) > 0 { | |
23 | t.Fatalf("GetEntries(%q): expected no instance entries, got %d", settings.key, len(entries)) | |
24 | } | |
25 | t.Logf("GetEntries(%q): %v (OK)", settings.key, entries) | |
26 | ||
27 | // Instantiate a new Registrar, passing in test data. | |
28 | registrar := NewRegistrar( | |
29 | client, | |
30 | service, | |
31 | log.With(log.NewLogfmtLogger(os.Stderr), "component", "registrar"), | |
32 | ) | |
33 | ||
34 | // Register our instance. | |
35 | registrar.Register() | |
36 | t.Logf("Registered") | |
37 | ||
38 | // Retrieve entries from etcd manually. | |
39 | entries, err = client.GetEntries(settings.key) | |
40 | if err != nil { | |
41 | t.Fatalf("client.GetEntries(%q): %v", settings.key, err) | |
42 | } | |
43 | if want, have := 1, len(entries); want != have { | |
44 | t.Fatalf("client.GetEntries(%q): want %d, have %d", settings.key, want, have) | |
45 | } | |
46 | if want, have := settings.value, entries[0]; want != have { | |
47 | t.Fatalf("want %q, have %q", want, have) | |
48 | } | |
49 | ||
50 | instancer, err := NewInstancer( | |
51 | client, | |
52 | settings.prefix, | |
53 | log.With(log.NewLogfmtLogger(os.Stderr), "component", "instancer"), | |
54 | ) | |
55 | if err != nil { | |
56 | t.Fatalf("NewInstancer: %v", err) | |
57 | } | |
58 | t.Logf("Constructed Instancer OK") | |
59 | defer instancer.Stop() | |
60 | ||
61 | endpointer := sd.NewEndpointer( | |
62 | instancer, | |
63 | func(string) (endpoint.Endpoint, io.Closer, error) { return endpoint.Nop, nil, nil }, | |
64 | log.With(log.NewLogfmtLogger(os.Stderr), "component", "instancer"), | |
65 | ) | |
66 | t.Logf("Constructed Endpointer OK") | |
67 | defer endpointer.Close() | |
68 | ||
69 | if !within(time.Second, func() bool { | |
70 | endpoints, err := endpointer.Endpoints() | |
71 | return err == nil && len(endpoints) == 1 | |
72 | }) { | |
73 | t.Fatalf("Endpointer didn't see Register in time") | |
74 | } | |
75 | t.Logf("Endpointer saw Register OK") | |
76 | ||
77 | // Deregister first instance of test data. | |
78 | registrar.Deregister() | |
79 | t.Logf("Deregistered") | |
80 | ||
81 | // Check it was deregistered. | |
82 | if !within(time.Second, func() bool { | |
83 | endpoints, err := endpointer.Endpoints() | |
84 | t.Logf("Checking Deregister: len(endpoints) = %d, err = %v", len(endpoints), err) | |
85 | return err == nil && len(endpoints) == 0 | |
86 | }) { | |
87 | t.Fatalf("Endpointer didn't see Deregister in time") | |
88 | } | |
89 | ||
90 | // Verify test data no longer exists in etcd. | |
91 | entries, err = client.GetEntries(settings.key) | |
92 | if err != nil { | |
93 | t.Fatalf("GetEntries(%q): expected no error, got one: %v", settings.key, err) | |
94 | } | |
95 | if len(entries) > 0 { | |
96 | t.Fatalf("GetEntries(%q): expected no entries, got %v", settings.key, entries) | |
97 | } | |
98 | t.Logf("GetEntries(%q): %v (OK)", settings.key, entries) | |
99 | } | |
100 | ||
101 | type integrationSettings struct { | |
102 | addr string | |
103 | prefix string | |
104 | instance string | |
105 | key string | |
106 | value string | |
107 | } | |
108 | ||
109 | func testIntegrationSettings(t *testing.T) integrationSettings { | |
110 | var settings integrationSettings | |
111 | ||
112 | settings.addr = os.Getenv("ETCD_ADDR") | |
113 | if settings.addr == "" { | |
114 | t.Skip("ETCD_ADDR not set; skipping integration test") | |
115 | } | |
116 | ||
117 | settings.prefix = "/services/foosvc/" // known at compile time | |
118 | settings.instance = "1.2.3.4:8080" // taken from runtime or platform, somehow | |
119 | settings.key = settings.prefix + settings.instance | |
120 | settings.value = "http://" + settings.instance // based on our transport | |
121 | ||
122 | return settings | |
123 | } | |
124 | ||
125 | // Package sd/etcd provides a wrapper around the etcd key/value store. This | |
126 | // example assumes the user has an instance of etcd installed and running | |
127 | // locally on port 2379. | |
128 | func TestIntegration(t *testing.T) { | |
129 | settings := testIntegrationSettings(t) | |
130 | client, err := NewClient(context.Background(), []string{settings.addr}, ClientOptions{ | |
131 | DialTimeout: 2 * time.Second, | |
132 | DialKeepAlive: 2 * time.Second, | |
133 | }) | |
134 | if err != nil { | |
135 | t.Fatalf("NewClient(%q): %v", settings.addr, err) | |
136 | } | |
137 | ||
138 | service := Service{ | |
139 | Key: settings.key, | |
140 | Value: settings.value, | |
141 | } | |
142 | ||
143 | runIntegration(settings, client, service, t) | |
144 | } | |
145 | ||
146 | func TestIntegrationTTL(t *testing.T) { | |
147 | settings := testIntegrationSettings(t) | |
148 | client, err := NewClient(context.Background(), []string{settings.addr}, ClientOptions{ | |
149 | DialTimeout: 2 * time.Second, | |
150 | DialKeepAlive: 2 * time.Second, | |
151 | }) | |
152 | if err != nil { | |
153 | t.Fatalf("NewClient(%q): %v", settings.addr, err) | |
154 | } | |
155 | ||
156 | service := Service{ | |
157 | Key: settings.key, | |
158 | Value: settings.value, | |
159 | TTL: NewTTLOption(time.Second*3, time.Second*10), | |
160 | } | |
161 | defer client.Deregister(service) | |
162 | ||
163 | runIntegration(settings, client, service, t) | |
164 | } | |
165 | ||
166 | func within(d time.Duration, f func() bool) bool { | |
167 | deadline := time.Now().Add(d) | |
168 | for time.Now().Before(deadline) { | |
169 | if f() { | |
170 | return true | |
171 | } | |
172 | time.Sleep(d / 10) | |
173 | } | |
174 | return false | |
175 | } |
0 | package etcdv3 | |
1 | ||
2 | import ( | |
3 | "sync" | |
4 | "time" | |
5 | ||
6 | "github.com/go-kit/kit/log" | |
7 | ) | |
8 | ||
9 | const minHeartBeatTime = 500 * time.Millisecond | |
10 | ||
11 | // Registrar registers service instance liveness information to etcd. | |
12 | type Registrar struct { | |
13 | client Client | |
14 | service Service | |
15 | logger log.Logger | |
16 | ||
17 | quitmtx sync.Mutex | |
18 | quit chan struct{} | |
19 | } | |
20 | ||
21 | // Service holds the instance identifying data you want to publish to etcd. Key | |
22 | // must be unique, and value is the string returned to subscribers, typically | |
23 | // called the "instance" string in other parts of package sd. | |
24 | type Service struct { | |
25 | Key string // unique key, e.g. "/service/foobar/1.2.3.4:8080" | |
26 | Value string // returned to subscribers, e.g. "http://1.2.3.4:8080" | |
27 | TTL *TTLOption | |
28 | } | |
29 | ||
30 | // TTLOption allow setting a key with a TTL. This option will be used by a loop | |
31 | // goroutine which regularly refreshes the lease of the key. | |
32 | type TTLOption struct { | |
33 | heartbeat time.Duration // e.g. time.Second * 3 | |
34 | ttl time.Duration // e.g. time.Second * 10 | |
35 | } | |
36 | ||
37 | // NewTTLOption returns a TTLOption that contains proper TTL settings. Heartbeat | |
38 | // is used to refresh the lease of the key periodically; its value should be at | |
39 | // least 500ms. TTL defines the lease of the key; its value should be | |
40 | // significantly greater than heartbeat. | |
41 | // | |
42 | // Good default values might be 3s heartbeat, 10s TTL. | |
43 | func NewTTLOption(heartbeat, ttl time.Duration) *TTLOption { | |
44 | if heartbeat <= minHeartBeatTime { | |
45 | heartbeat = minHeartBeatTime | |
46 | } | |
47 | if ttl <= heartbeat { | |
48 | ttl = 3 * heartbeat | |
49 | } | |
50 | return &TTLOption{ | |
51 | heartbeat: heartbeat, | |
52 | ttl: ttl, | |
53 | } | |
54 | } | |
55 | ||
56 | // NewRegistrar returns a etcd Registrar acting on the provided catalog | |
57 | // registration (service). | |
58 | func NewRegistrar(client Client, service Service, logger log.Logger) *Registrar { | |
59 | return &Registrar{ | |
60 | client: client, | |
61 | service: service, | |
62 | logger: log.With(logger, "key", service.Key, "value", service.Value), | |
63 | } | |
64 | } | |
65 | ||
66 | // Register implements the sd.Registrar interface. Call it when you want your | |
67 | // service to be registered in etcd, typically at startup. | |
68 | func (r *Registrar) Register() { | |
69 | if err := r.client.Register(r.service); err != nil { | |
70 | r.logger.Log("err", err) | |
71 | return | |
72 | } | |
73 | if r.service.TTL != nil { | |
74 | r.logger.Log("action", "register", "lease", r.client.LeaseID()) | |
75 | } else { | |
76 | r.logger.Log("action", "register") | |
77 | } | |
78 | } | |
79 | ||
80 | // Deregister implements the sd.Registrar interface. Call it when you want your | |
81 | // service to be deregistered from etcd, typically just prior to shutdown. | |
82 | func (r *Registrar) Deregister() { | |
83 | if err := r.client.Deregister(r.service); err != nil { | |
84 | r.logger.Log("err", err) | |
85 | } else { | |
86 | r.logger.Log("action", "deregister") | |
87 | } | |
88 | ||
89 | r.quitmtx.Lock() | |
90 | defer r.quitmtx.Unlock() | |
91 | if r.quit != nil { | |
92 | close(r.quit) | |
93 | r.quit = nil | |
94 | } | |
95 | } |
0 | package etcdv3 | |
1 | ||
2 | import ( | |
3 | "bytes" | |
4 | "errors" | |
5 | "testing" | |
6 | ||
7 | "github.com/go-kit/kit/log" | |
8 | ) | |
9 | ||
10 | // testClient is a basic implementation of Client | |
11 | type testClient struct { | |
12 | registerRes error // value returned when Register or Deregister is called | |
13 | } | |
14 | ||
15 | func (tc *testClient) GetEntries(prefix string) ([]string, error) { | |
16 | return nil, nil | |
17 | } | |
18 | ||
19 | func (tc *testClient) WatchPrefix(prefix string, ch chan struct{}) { | |
20 | } | |
21 | ||
22 | func (tc *testClient) Register(s Service) error { | |
23 | return tc.registerRes | |
24 | } | |
25 | ||
26 | func (tc *testClient) Deregister(s Service) error { | |
27 | return tc.registerRes | |
28 | } | |
29 | ||
30 | func (tc *testClient) LeaseID() int64 { | |
31 | return 0 | |
32 | } | |
33 | ||
34 | // default service used to build registrar in our tests | |
35 | var testService = Service{ | |
36 | Key: "testKey", | |
37 | Value: "testValue", | |
38 | TTL: nil, | |
39 | } | |
40 | ||
41 | // NewRegistar should return a registar with a logger using the service key and value | |
42 | func TestNewRegistar(t *testing.T) { | |
43 | c := Client(&testClient{nil}) | |
44 | buf := &bytes.Buffer{} | |
45 | logger := log.NewLogfmtLogger(buf) | |
46 | r := NewRegistrar( | |
47 | c, | |
48 | testService, | |
49 | logger, | |
50 | ) | |
51 | ||
52 | if err := r.logger.Log("msg", "message"); err != nil { | |
53 | t.Fatal(err) | |
54 | } | |
55 | if want, have := "key=testKey value=testValue msg=message\n", buf.String(); want != have { | |
56 | t.Errorf("\nwant: %shave: %s", want, have) | |
57 | } | |
58 | } | |
59 | ||
60 | func TestRegister(t *testing.T) { | |
61 | // Register log the error returned by the client or log the successful registration action | |
62 | // table of test cases for method Register | |
63 | var registerTestTable = []struct { | |
64 | registerRes error // value returned by the client on calls to Register | |
65 | log string // expected log by the registrar | |
66 | ||
67 | }{ | |
68 | // test case: an error is returned by the client | |
69 | {errors.New("regError"), "key=testKey value=testValue err=regError\n"}, | |
70 | // test case: registration successful | |
71 | {nil, "key=testKey value=testValue action=register\n"}, | |
72 | } | |
73 | ||
74 | for _, tc := range registerTestTable { | |
75 | c := Client(&testClient{tc.registerRes}) | |
76 | buf := &bytes.Buffer{} | |
77 | logger := log.NewLogfmtLogger(buf) | |
78 | r := NewRegistrar( | |
79 | c, | |
80 | testService, | |
81 | logger, | |
82 | ) | |
83 | r.Register() | |
84 | if want, have := tc.log, buf.String(); want != have { | |
85 | t.Fatalf("want %v, have %v", want, have) | |
86 | } | |
87 | } | |
88 | } | |
89 | ||
90 | func TestDeregister(t *testing.T) { | |
91 | // Deregister log the error returned by the client or log the successful deregistration action | |
92 | // table of test cases for method Deregister | |
93 | var deregisterTestTable = []struct { | |
94 | deregisterRes error // value returned by the client on calls to Deregister | |
95 | log string // expected log by the registrar | |
96 | }{ | |
97 | // test case: an error is returned by the client | |
98 | {errors.New("deregError"), "key=testKey value=testValue err=deregError\n"}, | |
99 | // test case: deregistration successful | |
100 | {nil, "key=testKey value=testValue action=deregister\n"}, | |
101 | } | |
102 | ||
103 | for _, tc := range deregisterTestTable { | |
104 | c := Client(&testClient{tc.deregisterRes}) | |
105 | buf := &bytes.Buffer{} | |
106 | logger := log.NewLogfmtLogger(buf) | |
107 | r := NewRegistrar( | |
108 | c, | |
109 | testService, | |
110 | logger, | |
111 | ) | |
112 | r.Deregister() | |
113 | if want, have := tc.log, buf.String(); want != have { | |
114 | t.Fatalf("want %v, have %v", want, have) | |
115 | } | |
116 | } | |
117 | } |