Merge pull request #176 from omnistream/master
Switch ETCD client to official client
Peter Bourgon
7 years ago
0 | 0 | package etcd |
1 | 1 | |
2 | 2 | import ( |
3 | "fmt" | |
4 | "strings" | |
3 | "crypto/tls" | |
4 | "crypto/x509" | |
5 | "io/ioutil" | |
6 | "net" | |
7 | "net/http" | |
8 | "time" | |
5 | 9 | |
6 | "github.com/coreos/go-etcd/etcd" | |
10 | etcd "github.com/coreos/etcd/client" | |
11 | "golang.org/x/net/context" | |
7 | 12 | ) |
8 | 13 | |
9 | 14 | // Client is a wrapper arround the etcd client. |
16 | 21 | } |
17 | 22 | |
18 | 23 | type client struct { |
19 | *etcd.Client | |
24 | keysAPI etcd.KeysAPI | |
25 | ctx context.Context | |
26 | } | |
27 | ||
28 | type ClientOptions struct { | |
29 | Cert string | |
30 | Key string | |
31 | CaCert string | |
32 | DialTimeout time.Duration | |
33 | DialKeepAline time.Duration | |
34 | HeaderTimeoutPerRequest time.Duration | |
20 | 35 | } |
21 | 36 | |
22 | 37 | // NewClient returns an *etcd.Client with a connection to the named machines. |
23 | 38 | // It will return an error if a connection to the cluster cannot be made. |
24 | 39 | // The parameter machines needs to be a full URL with schemas. |
25 | // e.g. "http://localhost:4001" will work, but "localhost:4001" will not. | |
26 | func NewClient(machines []string, cert, key, caCert string) (Client, error) { | |
27 | var c *etcd.Client | |
28 | var err error | |
40 | // e.g. "http://localhost:2379" will work, but "localhost:2379" will not. | |
41 | func NewClient(ctx context.Context, machines []string, options *ClientOptions) (Client, error) { | |
42 | var ( | |
43 | c etcd.KeysAPI | |
44 | err error | |
45 | caCertCt []byte | |
46 | tlsCert tls.Certificate | |
47 | ) | |
48 | if options == nil { | |
49 | options = &ClientOptions{} | |
50 | } | |
29 | 51 | |
30 | if cert != "" && key != "" { | |
31 | c, err = etcd.NewTLSClient(machines, cert, key, caCert) | |
52 | if options.Cert != "" && options.Key != "" { | |
53 | tlsCert, err = tls.LoadX509KeyPair(options.Cert, options.Key) | |
32 | 54 | if err != nil { |
33 | 55 | return nil, err |
34 | 56 | } |
57 | ||
58 | caCertCt, err = ioutil.ReadFile(options.CaCert) | |
59 | if err != nil { | |
60 | return nil, err | |
61 | } | |
62 | caCertPool := x509.NewCertPool() | |
63 | caCertPool.AppendCertsFromPEM(caCertCt) | |
64 | ||
65 | tlsConfig := &tls.Config{ | |
66 | Certificates: []tls.Certificate{tlsCert}, | |
67 | RootCAs: caCertPool, | |
68 | } | |
69 | ||
70 | transport := &http.Transport{ | |
71 | TLSClientConfig: tlsConfig, | |
72 | Dial: func(network, addr string) (net.Conn, error) { | |
73 | dial := &net.Dialer{ | |
74 | Timeout: options.DialTimeout, | |
75 | KeepAlive: options.DialKeepAline, | |
76 | } | |
77 | return dial.Dial(network, addr) | |
78 | }, | |
79 | } | |
80 | ||
81 | cfg := etcd.Config{ | |
82 | Endpoints: machines, | |
83 | Transport: transport, | |
84 | HeaderTimeoutPerRequest: options.HeaderTimeoutPerRequest, | |
85 | } | |
86 | ce, err := etcd.New(cfg) | |
87 | if err != nil { | |
88 | return nil, err | |
89 | } | |
90 | c = etcd.NewKeysAPI(ce) | |
35 | 91 | } else { |
36 | c = etcd.NewClient(machines) | |
92 | cfg := etcd.Config{ | |
93 | Endpoints: machines, | |
94 | Transport: etcd.DefaultTransport, | |
95 | HeaderTimeoutPerRequest: options.HeaderTimeoutPerRequest, | |
96 | } | |
97 | ce, err := etcd.New(cfg) | |
98 | if err != nil { | |
99 | return nil, err | |
100 | } | |
101 | c = etcd.NewKeysAPI(ce) | |
37 | 102 | } |
38 | success := c.SetCluster(machines) | |
39 | if !success { | |
40 | return nil, fmt.Errorf("cannot connect to the etcd cluster: %s", strings.Join(machines, ",")) | |
41 | } | |
42 | return &client{c}, nil | |
103 | return &client{c, ctx}, nil | |
43 | 104 | } |
44 | 105 | |
45 | 106 | // GetEntries implements the etcd Client interface. |
46 | 107 | func (c *client) GetEntries(key string) ([]string, error) { |
47 | resp, err := c.Get(key, false, true) | |
108 | resp, err := c.keysAPI.Get(c.ctx, key, &etcd.GetOptions{Recursive: true}) | |
48 | 109 | if err != nil { |
49 | 110 | return nil, err |
50 | 111 | } |
58 | 119 | |
59 | 120 | // WatchPrefix implements the etcd Client interface. |
60 | 121 | func (c *client) WatchPrefix(prefix string, responseChan chan *etcd.Response) { |
61 | c.Watch(prefix, 0, true, responseChan, nil) | |
122 | watch := c.keysAPI.Watcher(prefix, &etcd.WatcherOptions{AfterIndex: 0, Recursive: true}) | |
123 | for { | |
124 | res, err := watch.Next(c.ctx) | |
125 | if err != nil { | |
126 | return | |
127 | } | |
128 | responseChan <- res | |
129 | } | |
62 | 130 | } |