diff --git a/sd/etcd/client.go b/sd/etcd/client.go index f56a593..3501a54 100644 --- a/sd/etcd/client.go +++ b/sd/etcd/client.go @@ -14,21 +14,32 @@ ) var ( - ErrNoKey = errors.New("no key provided") + // ErrNoKey indicates a client method needs a key but receives none. + ErrNoKey = errors.New("no key provided") + + // ErrNoValue indicates a client method needs a value but receives none. ErrNoValue = errors.New("no value provided") ) // Client is a wrapper around the etcd client. type Client interface { - // GetEntries will query the given prefix in etcd and returns a set of entries. + // GetEntries queries the given prefix in etcd and returns a slice + // containing the values of all keys found, recursively, underneath that + // prefix. GetEntries(prefix string) ([]string, error) - // WatchPrefix starts watching every change for given prefix in etcd. When an - // change is detected it will populate the responseChan when an *etcd.Response. - WatchPrefix(prefix string, responseChan chan *etcd.Response) + // WatchPrefix watches the given prefix in etcd for changes. When a change + // is detected, it will signal on the passed channel. Clients are expected + // to call GetEntries to update themselves with the latest set of complete + // values. WatchPrefix will always send an initial sentinel value on the + // channel after establishing the watch, to ensure that clients always + // receive the latest set of values. WatchPrefix will block until the + // context passed to the NewClient constructor is terminated. + WatchPrefix(prefix string, ch chan struct{}) // Register a service with etcd. Register(s Service) error + // Deregister a service with etcd. Deregister(s Service) error } @@ -42,7 +53,7 @@ type ClientOptions struct { Cert string Key string - CaCert string + CACert string DialTimeout time.Duration DialKeepAlive time.Duration HeaderTimeoutPerRequest time.Duration @@ -53,66 +64,45 @@ // The parameter machines needs to be a full URL with schemas. // e.g. "http://localhost:2379" will work, but "localhost:2379" will not. func NewClient(ctx context.Context, machines []string, options ClientOptions) (Client, error) { - var ( - c etcd.KeysAPI - err error - caCertCt []byte - tlsCert tls.Certificate - ) - + transport := etcd.DefaultTransport if options.Cert != "" && options.Key != "" { - tlsCert, err = tls.LoadX509KeyPair(options.Cert, options.Key) + tlsCert, err := tls.LoadX509KeyPair(options.Cert, options.Key) if err != nil { return nil, err } - - caCertCt, err = ioutil.ReadFile(options.CaCert) + caCertCt, err := ioutil.ReadFile(options.CACert) if err != nil { return nil, err } caCertPool := x509.NewCertPool() caCertPool.AppendCertsFromPEM(caCertCt) - - tlsConfig := &tls.Config{ - Certificates: []tls.Certificate{tlsCert}, - RootCAs: caCertPool, - } - - transport := &http.Transport{ - TLSClientConfig: tlsConfig, - Dial: func(network, addr string) (net.Conn, error) { - dial := &net.Dialer{ + transport = &http.Transport{ + TLSClientConfig: &tls.Config{ + Certificates: []tls.Certificate{tlsCert}, + RootCAs: caCertPool, + }, + Dial: func(network, address string) (net.Conn, error) { + return (&net.Dialer{ Timeout: options.DialTimeout, KeepAlive: options.DialKeepAlive, - } - return dial.Dial(network, addr) + }).Dial(network, address) }, } - - cfg := etcd.Config{ - Endpoints: machines, - Transport: transport, - HeaderTimeoutPerRequest: options.HeaderTimeoutPerRequest, - } - ce, err := etcd.New(cfg) - if err != nil { - return nil, err - } - c = etcd.NewKeysAPI(ce) - } else { - cfg := etcd.Config{ - Endpoints: machines, - Transport: etcd.DefaultTransport, - HeaderTimeoutPerRequest: options.HeaderTimeoutPerRequest, - } - ce, err := etcd.New(cfg) - if err != nil { - return nil, err - } - c = etcd.NewKeysAPI(ce) } - return &client{c, ctx}, nil + ce, err := etcd.New(etcd.Config{ + Endpoints: machines, + Transport: transport, + HeaderTimeoutPerRequest: options.HeaderTimeoutPerRequest, + }) + if err != nil { + return nil, err + } + + return &client{ + keysAPI: etcd.NewKeysAPI(ce), + ctx: ctx, + }, nil } // GetEntries implements the etcd Client interface. @@ -137,15 +127,14 @@ } // WatchPrefix implements the etcd Client interface. -func (c *client) WatchPrefix(prefix string, responseChan chan *etcd.Response) { +func (c *client) WatchPrefix(prefix string, ch chan struct{}) { watch := c.keysAPI.Watcher(prefix, &etcd.WatcherOptions{AfterIndex: 0, Recursive: true}) - responseChan <- nil // TODO(pb) explain this + ch <- struct{}{} // make sure caller invokes GetEntries for { - res, err := watch.Next(c.ctx) - if err != nil { + if _, err := watch.Next(c.ctx); err != nil { return } - responseChan <- res + ch <- struct{}{} } } diff --git a/sd/etcd/client_test.go b/sd/etcd/client_test.go index 1caeb59..6ac2a94 100644 --- a/sd/etcd/client_test.go +++ b/sd/etcd/client_test.go @@ -8,19 +8,14 @@ ) func TestNewClient(t *testing.T) { - ClientOptions := ClientOptions{ - Cert: "", - Key: "", - CaCert: "", - DialTimeout: (2 * time.Second), - DialKeepAlive: (2 * time.Second), - HeaderTimeoutPerRequest: (2 * time.Second), - } - client, err := NewClient( context.Background(), []string{"http://irrelevant:12345"}, - ClientOptions, + ClientOptions{ + DialTimeout: 2 * time.Second, + DialKeepAlive: 2 * time.Second, + HeaderTimeoutPerRequest: 2 * time.Second, + }, ) if err != nil { t.Fatalf("unexpected error creating client: %v", err) @@ -30,20 +25,20 @@ } } +// NewClient should fail when providing invalid or missing endpoints. func TestOptions(t *testing.T) { - //creating new client should fail when providing invalid or missing endpoints a, err := NewClient( context.Background(), []string{}, ClientOptions{ Cert: "", Key: "", - CaCert: "", - DialTimeout: (2 * time.Second), - DialKeepAlive: (2 * time.Second), - HeaderTimeoutPerRequest: (2 * time.Second), - }) - + CACert: "", + DialTimeout: 2 * time.Second, + DialKeepAlive: 2 * time.Second, + HeaderTimeoutPerRequest: 2 * time.Second, + }, + ) if err == nil { t.Errorf("expected error: %v", err) } @@ -51,19 +46,18 @@ t.Fatalf("expected client to be nil on failure") } - //creating new client should fail when providing invalid or missing endpoints _, err = NewClient( context.Background(), []string{"http://irrelevant:12345"}, ClientOptions{ Cert: "blank.crt", Key: "blank.key", - CaCert: "blank.cacert", - DialTimeout: (2 * time.Second), - DialKeepAlive: (2 * time.Second), - HeaderTimeoutPerRequest: (2 * time.Second), - }) - + CACert: "blank.CACert", + DialTimeout: 2 * time.Second, + DialKeepAlive: 2 * time.Second, + HeaderTimeoutPerRequest: 2 * time.Second, + }, + ) if err == nil { t.Errorf("expected error: %v", err) } diff --git a/sd/etcd/integration_test.go b/sd/etcd/integration_test.go index 6cdf194..e65840d 100644 --- a/sd/etcd/integration_test.go +++ b/sd/etcd/integration_test.go @@ -1,3 +1,5 @@ +// +build integration + package etcd import ( diff --git a/sd/etcd/subscriber.go b/sd/etcd/subscriber.go index 1d579eb..1b91872 100644 --- a/sd/etcd/subscriber.go +++ b/sd/etcd/subscriber.go @@ -1,8 +1,6 @@ package etcd import ( - etcd "github.com/coreos/etcd/client" - "github.com/go-kit/kit/endpoint" "github.com/go-kit/kit/log" "github.com/go-kit/kit/sd" @@ -45,11 +43,11 @@ } func (s *Subscriber) loop() { - responseChan := make(chan *etcd.Response) - go s.client.WatchPrefix(s.prefix, responseChan) + ch := make(chan struct{}) + go s.client.WatchPrefix(s.prefix, ch) for { select { - case <-responseChan: + case <-ch: instances, err := s.client.GetEntries(s.prefix) if err != nil { s.logger.Log("msg", "failed to retrieve entries", "err", err) diff --git a/sd/etcd/subscriber_test.go b/sd/etcd/subscriber_test.go index 84f7813..dad320b 100644 --- a/sd/etcd/subscriber_test.go +++ b/sd/etcd/subscriber_test.go @@ -86,7 +86,7 @@ return entries, nil } -func (c *fakeClient) WatchPrefix(prefix string, responseChan chan *stdetcd.Response) {} +func (c *fakeClient) WatchPrefix(prefix string, ch chan struct{}) {} func (c *fakeClient) Register(Service) error { return nil