Codebase list golang-github-go-kit-kit / 60fd57b
Explanation and simplficiation Peter Bourgon 7 years ago
5 changed file(s) with 69 addition(s) and 86 deletion(s). Raw diff Collapse all Expand all
1313 )
1414
1515 var (
16 ErrNoKey = errors.New("no key provided")
16 // ErrNoKey indicates a client method needs a key but receives none.
17 ErrNoKey = errors.New("no key provided")
18
19 // ErrNoValue indicates a client method needs a value but receives none.
1720 ErrNoValue = errors.New("no value provided")
1821 )
1922
2023 // Client is a wrapper around the etcd client.
2124 type Client interface {
22 // GetEntries will query the given prefix in etcd and returns a set of entries.
25 // GetEntries queries the given prefix in etcd and returns a slice
26 // containing the values of all keys found, recursively, underneath that
27 // prefix.
2328 GetEntries(prefix string) ([]string, error)
2429
25 // WatchPrefix starts watching every change for given prefix in etcd. When an
26 // change is detected it will populate the responseChan when an *etcd.Response.
27 WatchPrefix(prefix string, responseChan chan *etcd.Response)
30 // WatchPrefix watches the given prefix in etcd for changes. When a change
31 // is detected, it will signal on the passed channel. Clients are expected
32 // to call GetEntries to update themselves with the latest set of complete
33 // values. WatchPrefix will always send an initial sentinel value on the
34 // channel after establishing the watch, to ensure that clients always
35 // receive the latest set of values. WatchPrefix will block until the
36 // context passed to the NewClient constructor is terminated.
37 WatchPrefix(prefix string, ch chan struct{})
2838
2939 // Register a service with etcd.
3040 Register(s Service) error
41
3142 // Deregister a service with etcd.
3243 Deregister(s Service) error
3344 }
4152 type ClientOptions struct {
4253 Cert string
4354 Key string
44 CaCert string
55 CACert string
4556 DialTimeout time.Duration
4657 DialKeepAlive time.Duration
4758 HeaderTimeoutPerRequest time.Duration
5263 // The parameter machines needs to be a full URL with schemas.
5364 // e.g. "http://localhost:2379" will work, but "localhost:2379" will not.
5465 func NewClient(ctx context.Context, machines []string, options ClientOptions) (Client, error) {
55 var (
56 c etcd.KeysAPI
57 err error
58 caCertCt []byte
59 tlsCert tls.Certificate
60 )
61
66 transport := etcd.DefaultTransport
6267 if options.Cert != "" && options.Key != "" {
63 tlsCert, err = tls.LoadX509KeyPair(options.Cert, options.Key)
68 tlsCert, err := tls.LoadX509KeyPair(options.Cert, options.Key)
6469 if err != nil {
6570 return nil, err
6671 }
67
68 caCertCt, err = ioutil.ReadFile(options.CaCert)
72 caCertCt, err := ioutil.ReadFile(options.CACert)
6973 if err != nil {
7074 return nil, err
7175 }
7276 caCertPool := x509.NewCertPool()
7377 caCertPool.AppendCertsFromPEM(caCertCt)
74
75 tlsConfig := &tls.Config{
76 Certificates: []tls.Certificate{tlsCert},
77 RootCAs: caCertPool,
78 }
79
80 transport := &http.Transport{
81 TLSClientConfig: tlsConfig,
82 Dial: func(network, addr string) (net.Conn, error) {
83 dial := &net.Dialer{
78 transport = &http.Transport{
79 TLSClientConfig: &tls.Config{
80 Certificates: []tls.Certificate{tlsCert},
81 RootCAs: caCertPool,
82 },
83 Dial: func(network, address string) (net.Conn, error) {
84 return (&net.Dialer{
8485 Timeout: options.DialTimeout,
8586 KeepAlive: options.DialKeepAlive,
86 }
87 return dial.Dial(network, addr)
87 }).Dial(network, address)
8888 },
8989 }
90
91 cfg := etcd.Config{
92 Endpoints: machines,
93 Transport: transport,
94 HeaderTimeoutPerRequest: options.HeaderTimeoutPerRequest,
95 }
96 ce, err := etcd.New(cfg)
97 if err != nil {
98 return nil, err
99 }
100 c = etcd.NewKeysAPI(ce)
101 } else {
102 cfg := etcd.Config{
103 Endpoints: machines,
104 Transport: etcd.DefaultTransport,
105 HeaderTimeoutPerRequest: options.HeaderTimeoutPerRequest,
106 }
107 ce, err := etcd.New(cfg)
108 if err != nil {
109 return nil, err
110 }
111 c = etcd.NewKeysAPI(ce)
11290 }
11391
114 return &client{c, ctx}, nil
92 ce, err := etcd.New(etcd.Config{
93 Endpoints: machines,
94 Transport: transport,
95 HeaderTimeoutPerRequest: options.HeaderTimeoutPerRequest,
96 })
97 if err != nil {
98 return nil, err
99 }
100
101 return &client{
102 keysAPI: etcd.NewKeysAPI(ce),
103 ctx: ctx,
104 }, nil
115105 }
116106
117107 // GetEntries implements the etcd Client interface.
136126 }
137127
138128 // WatchPrefix implements the etcd Client interface.
139 func (c *client) WatchPrefix(prefix string, responseChan chan *etcd.Response) {
129 func (c *client) WatchPrefix(prefix string, ch chan struct{}) {
140130 watch := c.keysAPI.Watcher(prefix, &etcd.WatcherOptions{AfterIndex: 0, Recursive: true})
141 responseChan <- nil // TODO(pb) explain this
131 ch <- struct{}{} // make sure caller invokes GetEntries
142132 for {
143 res, err := watch.Next(c.ctx)
144 if err != nil {
133 if _, err := watch.Next(c.ctx); err != nil {
145134 return
146135 }
147 responseChan <- res
136 ch <- struct{}{}
148137 }
149138 }
150139
77 )
88
99 func TestNewClient(t *testing.T) {
10 ClientOptions := ClientOptions{
11 Cert: "",
12 Key: "",
13 CaCert: "",
14 DialTimeout: (2 * time.Second),
15 DialKeepAlive: (2 * time.Second),
16 HeaderTimeoutPerRequest: (2 * time.Second),
17 }
18
1910 client, err := NewClient(
2011 context.Background(),
2112 []string{"http://irrelevant:12345"},
22 ClientOptions,
13 ClientOptions{
14 DialTimeout: 2 * time.Second,
15 DialKeepAlive: 2 * time.Second,
16 HeaderTimeoutPerRequest: 2 * time.Second,
17 },
2318 )
2419 if err != nil {
2520 t.Fatalf("unexpected error creating client: %v", err)
2924 }
3025 }
3126
27 // NewClient should fail when providing invalid or missing endpoints.
3228 func TestOptions(t *testing.T) {
33 //creating new client should fail when providing invalid or missing endpoints
3429 a, err := NewClient(
3530 context.Background(),
3631 []string{},
3732 ClientOptions{
3833 Cert: "",
3934 Key: "",
40 CaCert: "",
41 DialTimeout: (2 * time.Second),
42 DialKeepAlive: (2 * time.Second),
43 HeaderTimeoutPerRequest: (2 * time.Second),
44 })
45
35 CACert: "",
36 DialTimeout: 2 * time.Second,
37 DialKeepAlive: 2 * time.Second,
38 HeaderTimeoutPerRequest: 2 * time.Second,
39 },
40 )
4641 if err == nil {
4742 t.Errorf("expected error: %v", err)
4843 }
5045 t.Fatalf("expected client to be nil on failure")
5146 }
5247
53 //creating new client should fail when providing invalid or missing endpoints
5448 _, err = NewClient(
5549 context.Background(),
5650 []string{"http://irrelevant:12345"},
5751 ClientOptions{
5852 Cert: "blank.crt",
5953 Key: "blank.key",
60 CaCert: "blank.cacert",
61 DialTimeout: (2 * time.Second),
62 DialKeepAlive: (2 * time.Second),
63 HeaderTimeoutPerRequest: (2 * time.Second),
64 })
65
54 CACert: "blank.CACert",
55 DialTimeout: 2 * time.Second,
56 DialKeepAlive: 2 * time.Second,
57 HeaderTimeoutPerRequest: 2 * time.Second,
58 },
59 )
6660 if err == nil {
6761 t.Errorf("expected error: %v", err)
6862 }
0 // +build integration
1
02 package etcd
13
24 import (
00 package etcd
11
22 import (
3 etcd "github.com/coreos/etcd/client"
4
53 "github.com/go-kit/kit/endpoint"
64 "github.com/go-kit/kit/log"
75 "github.com/go-kit/kit/sd"
4442 }
4543
4644 func (s *Subscriber) loop() {
47 responseChan := make(chan *etcd.Response)
48 go s.client.WatchPrefix(s.prefix, responseChan)
45 ch := make(chan struct{})
46 go s.client.WatchPrefix(s.prefix, ch)
4947 for {
5048 select {
51 case <-responseChan:
49 case <-ch:
5250 instances, err := s.client.GetEntries(s.prefix)
5351 if err != nil {
5452 s.logger.Log("msg", "failed to retrieve entries", "err", err)
8585 return entries, nil
8686 }
8787
88 func (c *fakeClient) WatchPrefix(prefix string, responseChan chan *stdetcd.Response) {}
88 func (c *fakeClient) WatchPrefix(prefix string, ch chan struct{}) {}
8989
9090 func (c *fakeClient) Register(Service) error {
9191 return nil