diff --git a/loadbalancer/etcd/client.go b/loadbalancer/etcd/client.go index c19101d..799977c 100644 --- a/loadbalancer/etcd/client.go +++ b/loadbalancer/etcd/client.go @@ -1,10 +1,15 @@ package etcd import ( - "fmt" - "strings" + "crypto/tls" + "crypto/x509" + "io/ioutil" + "net" + "net/http" + "time" - "github.com/coreos/go-etcd/etcd" + etcd "github.com/coreos/etcd/client" + "golang.org/x/net/context" ) // Client is a wrapper arround the etcd client. @@ -17,35 +22,91 @@ } type client struct { - *etcd.Client + keysAPI etcd.KeysAPI + ctx context.Context +} + +type ClientOptions struct { + Cert string + Key string + CaCert string + DialTimeout time.Duration + DialKeepAline time.Duration + HeaderTimeoutPerRequest time.Duration } // NewClient returns an *etcd.Client with a connection to the named machines. // It will return an error if a connection to the cluster cannot be made. // The parameter machines needs to be a full URL with schemas. -// e.g. "http://localhost:4001" will work, but "localhost:4001" will not. -func NewClient(machines []string, cert, key, caCert string) (Client, error) { - var c *etcd.Client - var err error +// e.g. "http://localhost:2379" will work, but "localhost:2379" will not. +func NewClient(ctx context.Context, machines []string, options *ClientOptions) (Client, error) { + var ( + c etcd.KeysAPI + err error + caCertCt []byte + tlsCert tls.Certificate + ) + if options == nil { + options = &ClientOptions{} + } - if cert != "" && key != "" { - c, err = etcd.NewTLSClient(machines, cert, key, caCert) + if options.Cert != "" && options.Key != "" { + tlsCert, err = tls.LoadX509KeyPair(options.Cert, options.Key) if err != nil { return nil, err } + + caCertCt, err = ioutil.ReadFile(options.CaCert) + if err != nil { + return nil, err + } + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(caCertCt) + + tlsConfig := &tls.Config{ + Certificates: []tls.Certificate{tlsCert}, + RootCAs: caCertPool, + } + + transport := &http.Transport{ + TLSClientConfig: tlsConfig, + Dial: func(network, addr string) (net.Conn, error) { + dial := &net.Dialer{ + Timeout: options.DialTimeout, + KeepAlive: options.DialKeepAline, + } + return dial.Dial(network, addr) + }, + } + + cfg := etcd.Config{ + Endpoints: machines, + Transport: transport, + HeaderTimeoutPerRequest: options.HeaderTimeoutPerRequest, + } + ce, err := etcd.New(cfg) + if err != nil { + return nil, err + } + c = etcd.NewKeysAPI(ce) } else { - c = etcd.NewClient(machines) + cfg := etcd.Config{ + Endpoints: machines, + Transport: etcd.DefaultTransport, + HeaderTimeoutPerRequest: options.HeaderTimeoutPerRequest, + } + ce, err := etcd.New(cfg) + if err != nil { + return nil, err + } + c = etcd.NewKeysAPI(ce) } - success := c.SetCluster(machines) - if !success { - return nil, fmt.Errorf("cannot connect to the etcd cluster: %s", strings.Join(machines, ",")) - } - return &client{c}, nil + return &client{c, ctx}, nil } // GetEntries implements the etcd Client interface. func (c *client) GetEntries(key string) ([]string, error) { - resp, err := c.Get(key, false, true) + resp, err := c.keysAPI.Get(c.ctx, key, &etcd.GetOptions{Recursive: true}) if err != nil { return nil, err } @@ -59,5 +120,12 @@ // WatchPrefix implements the etcd Client interface. func (c *client) WatchPrefix(prefix string, responseChan chan *etcd.Response) { - c.Watch(prefix, 0, true, responseChan, nil) + watch := c.keysAPI.Watcher(prefix, &etcd.WatcherOptions{AfterIndex: 0, Recursive: true}) + for { + res, err := watch.Next(c.ctx) + if err != nil { + return + } + responseChan <- res + } } diff --git a/loadbalancer/etcd/publisher.go b/loadbalancer/etcd/publisher.go index 2df768e..37c8aac 100644 --- a/loadbalancer/etcd/publisher.go +++ b/loadbalancer/etcd/publisher.go @@ -1,7 +1,7 @@ package etcd import ( - "github.com/coreos/go-etcd/etcd" + etcd "github.com/coreos/etcd/client" "github.com/go-kit/kit/endpoint" "github.com/go-kit/kit/loadbalancer" diff --git a/loadbalancer/etcd/publisher_test.go b/loadbalancer/etcd/publisher_test.go index 89668bb..8d1f517 100644 --- a/loadbalancer/etcd/publisher_test.go +++ b/loadbalancer/etcd/publisher_test.go @@ -5,7 +5,7 @@ "io" "testing" - stdetcd "github.com/coreos/go-etcd/etcd" + stdetcd "github.com/coreos/etcd/client" "golang.org/x/net/context" "github.com/go-kit/kit/endpoint"