Codebase list golang-github-go-kit-kit / b076b44
Merge pull request #329 from go-kit/fix-etcd-tests Fix etcd tests Peter Bourgon authored 7 years ago GitHub committed 7 years ago
8 changed file(s) with 242 addition(s) and 241 deletion(s). Raw diff Collapse all Expand all
1313 )
1414
1515 var (
16 ErrNoKey = errors.New("no key provided")
16 // ErrNoKey indicates a client method needs a key but receives none.
17 ErrNoKey = errors.New("no key provided")
18
19 // ErrNoValue indicates a client method needs a value but receives none.
1720 ErrNoValue = errors.New("no value provided")
1821 )
1922
2023 // Client is a wrapper around the etcd client.
2124 type Client interface {
22 // GetEntries will query the given prefix in etcd and returns a set of entries.
25 // GetEntries queries the given prefix in etcd and returns a slice
26 // containing the values of all keys found, recursively, underneath that
27 // prefix.
2328 GetEntries(prefix string) ([]string, error)
2429
25 // WatchPrefix starts watching every change for given prefix in etcd. When an
26 // change is detected it will populate the responseChan when an *etcd.Response.
27 WatchPrefix(prefix string, responseChan chan *etcd.Response)
30 // WatchPrefix watches the given prefix in etcd for changes. When a change
31 // is detected, it will signal on the passed channel. Clients are expected
32 // to call GetEntries to update themselves with the latest set of complete
33 // values. WatchPrefix will always send an initial sentinel value on the
34 // channel after establishing the watch, to ensure that clients always
35 // receive the latest set of values. WatchPrefix will block until the
36 // context passed to the NewClient constructor is terminated.
37 WatchPrefix(prefix string, ch chan struct{})
2838
2939 // Register a service with etcd.
3040 Register(s Service) error
41
3142 // Deregister a service with etcd.
3243 Deregister(s Service) error
3344 }
3748 ctx context.Context
3849 }
3950
40 // ClientOptions defines options for the etcd client.
51 // ClientOptions defines options for the etcd client. All values are optional.
52 // If any duration is not specified, a default of 3 seconds will be used.
4153 type ClientOptions struct {
4254 Cert string
4355 Key string
44 CaCert string
56 CACert string
4557 DialTimeout time.Duration
4658 DialKeepAlive time.Duration
4759 HeaderTimeoutPerRequest time.Duration
4860 }
4961
50 // NewClient returns an *etcd.Client with a connection to the named machines.
51 // It will return an error if a connection to the cluster cannot be made.
52 // The parameter machines needs to be a full URL with schemas.
53 // e.g. "http://localhost:2379" will work, but "localhost:2379" will not.
62 // NewClient returns Client with a connection to the named machines. It will
63 // return an error if a connection to the cluster cannot be made. The parameter
64 // machines needs to be a full URL with schemas. e.g. "http://localhost:2379"
65 // will work, but "localhost:2379" will not.
5466 func NewClient(ctx context.Context, machines []string, options ClientOptions) (Client, error) {
55 var (
56 c etcd.KeysAPI
57 err error
58 caCertCt []byte
59 tlsCert tls.Certificate
60 )
67 if options.DialTimeout == 0 {
68 options.DialTimeout = 3 * time.Second
69 }
70 if options.DialKeepAlive == 0 {
71 options.DialKeepAlive = 3 * time.Second
72 }
73 if options.HeaderTimeoutPerRequest == 0 {
74 options.HeaderTimeoutPerRequest = 3 * time.Second
75 }
6176
77 transport := etcd.DefaultTransport
6278 if options.Cert != "" && options.Key != "" {
63 tlsCert, err = tls.LoadX509KeyPair(options.Cert, options.Key)
79 tlsCert, err := tls.LoadX509KeyPair(options.Cert, options.Key)
6480 if err != nil {
6581 return nil, err
6682 }
67
68 caCertCt, err = ioutil.ReadFile(options.CaCert)
83 caCertCt, err := ioutil.ReadFile(options.CACert)
6984 if err != nil {
7085 return nil, err
7186 }
7287 caCertPool := x509.NewCertPool()
7388 caCertPool.AppendCertsFromPEM(caCertCt)
74
75 tlsConfig := &tls.Config{
76 Certificates: []tls.Certificate{tlsCert},
77 RootCAs: caCertPool,
78 }
79
80 transport := &http.Transport{
81 TLSClientConfig: tlsConfig,
82 Dial: func(network, addr string) (net.Conn, error) {
83 dial := &net.Dialer{
89 transport = &http.Transport{
90 TLSClientConfig: &tls.Config{
91 Certificates: []tls.Certificate{tlsCert},
92 RootCAs: caCertPool,
93 },
94 Dial: func(network, address string) (net.Conn, error) {
95 return (&net.Dialer{
8496 Timeout: options.DialTimeout,
8597 KeepAlive: options.DialKeepAlive,
86 }
87 return dial.Dial(network, addr)
98 }).Dial(network, address)
8899 },
89100 }
90
91 cfg := etcd.Config{
92 Endpoints: machines,
93 Transport: transport,
94 HeaderTimeoutPerRequest: options.HeaderTimeoutPerRequest,
95 }
96 ce, err := etcd.New(cfg)
97 if err != nil {
98 return nil, err
99 }
100 c = etcd.NewKeysAPI(ce)
101 } else {
102 cfg := etcd.Config{
103 Endpoints: machines,
104 Transport: etcd.DefaultTransport,
105 HeaderTimeoutPerRequest: options.HeaderTimeoutPerRequest,
106 }
107 ce, err := etcd.New(cfg)
108 if err != nil {
109 return nil, err
110 }
111 c = etcd.NewKeysAPI(ce)
112101 }
113102
114 return &client{c, ctx}, nil
103 ce, err := etcd.New(etcd.Config{
104 Endpoints: machines,
105 Transport: transport,
106 HeaderTimeoutPerRequest: options.HeaderTimeoutPerRequest,
107 })
108 if err != nil {
109 return nil, err
110 }
111
112 return &client{
113 keysAPI: etcd.NewKeysAPI(ce),
114 ctx: ctx,
115 }, nil
115116 }
116117
117118 // GetEntries implements the etcd Client interface.
121122 return nil, err
122123 }
123124
125 // Special case. Note that it's possible that len(resp.Node.Nodes) == 0 and
126 // resp.Node.Value is also empty, in which case the key is empty and we
127 // should not return any entries.
128 if len(resp.Node.Nodes) == 0 && resp.Node.Value != "" {
129 return []string{resp.Node.Value}, nil
130 }
131
124132 entries := make([]string, len(resp.Node.Nodes))
125
126 if len(entries) > 0 {
127 for i, node := range resp.Node.Nodes {
128 entries[i] = node.Value
129 }
130 } else {
131 entries = append(entries, resp.Node.Value)
133 for i, node := range resp.Node.Nodes {
134 entries[i] = node.Value
132135 }
133136 return entries, nil
134
135137 }
136138
137139 // WatchPrefix implements the etcd Client interface.
138 func (c *client) WatchPrefix(prefix string, responseChan chan *etcd.Response) {
140 func (c *client) WatchPrefix(prefix string, ch chan struct{}) {
139141 watch := c.keysAPI.Watcher(prefix, &etcd.WatcherOptions{AfterIndex: 0, Recursive: true})
142 ch <- struct{}{} // make sure caller invokes GetEntries
140143 for {
141 res, err := watch.Next(c.ctx)
142 if err != nil {
144 if _, err := watch.Next(c.ctx); err != nil {
143145 return
144146 }
145 responseChan <- res
147 ch <- struct{}{}
146148 }
147149 }
148150
77 )
88
99 func TestNewClient(t *testing.T) {
10 ClientOptions := ClientOptions{
11 Cert: "",
12 Key: "",
13 CaCert: "",
14 DialTimeout: (2 * time.Second),
15 DialKeepAlive: (2 * time.Second),
16 HeaderTimeoutPerRequest: (2 * time.Second),
17 }
18
1910 client, err := NewClient(
2011 context.Background(),
2112 []string{"http://irrelevant:12345"},
22 ClientOptions,
13 ClientOptions{
14 DialTimeout: 2 * time.Second,
15 DialKeepAlive: 2 * time.Second,
16 HeaderTimeoutPerRequest: 2 * time.Second,
17 },
2318 )
2419 if err != nil {
2520 t.Fatalf("unexpected error creating client: %v", err)
2924 }
3025 }
3126
27 // NewClient should fail when providing invalid or missing endpoints.
3228 func TestOptions(t *testing.T) {
33 //creating new client should fail when providing invalid or missing endpoints
3429 a, err := NewClient(
3530 context.Background(),
3631 []string{},
3732 ClientOptions{
3833 Cert: "",
3934 Key: "",
40 CaCert: "",
41 DialTimeout: (2 * time.Second),
42 DialKeepAlive: (2 * time.Second),
43 HeaderTimeoutPerRequest: (2 * time.Second),
44 })
45
35 CACert: "",
36 DialTimeout: 2 * time.Second,
37 DialKeepAlive: 2 * time.Second,
38 HeaderTimeoutPerRequest: 2 * time.Second,
39 },
40 )
4641 if err == nil {
4742 t.Errorf("expected error: %v", err)
4843 }
5045 t.Fatalf("expected client to be nil on failure")
5146 }
5247
53 //creating new client should fail when providing invalid or missing endpoints
5448 _, err = NewClient(
5549 context.Background(),
5650 []string{"http://irrelevant:12345"},
5751 ClientOptions{
5852 Cert: "blank.crt",
5953 Key: "blank.key",
60 CaCert: "blank.cacert",
61 DialTimeout: (2 * time.Second),
62 DialKeepAlive: (2 * time.Second),
63 HeaderTimeoutPerRequest: (2 * time.Second),
64 })
65
54 CACert: "blank.CACert",
55 DialTimeout: 2 * time.Second,
56 DialKeepAlive: 2 * time.Second,
57 HeaderTimeoutPerRequest: 2 * time.Second,
58 },
59 )
6660 if err == nil {
6761 t.Errorf("expected error: %v", err)
6862 }
0 // Package etcd provides a subscriber implementation for etcd.
0 // Package etcd provides a Subscriber and Registrar implementation for etcd. If
1 // you use etcd as your service discovery system, this package will help you
2 // implement the registration and client-side load balancing patterns.
13 package etcd
00 package etcd
11
22 import (
3 "fmt"
3 "io"
44 "time"
5 "io"
65
6 "golang.org/x/net/context"
7
8 "github.com/go-kit/kit/endpoint"
79 "github.com/go-kit/kit/log"
8 "github.com/go-kit/kit/endpoint"
9 "golang.org/x/net/context"
10 "github.com/go-kit/kit/sd/lb"
1011 )
1112
12 // Package sd/etcd provides a wrapper around the coroes/etcd key value store (https://github.com/coreos/etcd)
13 // This example assumes the user has an instance of etcd installed and running locally on port 2379
1413 func Example() {
15
14 // Let's say this is a service that means to register itself.
15 // First, we will set up some context.
1616 var (
17 prefix = "/services/foosvc/" // known at compile time
18 instance = "1.2.3.4:8080" // taken from runtime or platform, somehow
19 key = prefix + instance
20 value = "http://" + instance // based on our transport
17 etcdServer = "http://10.0.0.1:2379" // don't forget schema and port!
18 prefix = "/services/foosvc/" // known at compile time
19 instance = "1.2.3.4:8080" // taken from runtime or platform, somehow
20 key = prefix + instance // should be globally unique
21 value = "http://" + instance // based on our transport
22 ctx = context.Background()
2123 )
2224
23 client, err := NewClient(context.Background(), []string{"http://:2379"}, ClientOptions{
24 DialTimeout: 2 * time.Second,
25 DialKeepAlive: 2 * time.Second,
26 HeaderTimeoutPerRequest: 2 * time.Second,
27 })
25 // Build the client.
26 client, err := NewClient(ctx, []string{etcdServer}, ClientOptions{})
27 if err != nil {
28 panic(err)
29 }
2830
29 // Instantiate new instance of *Registrar passing in test data
31 // Build the registrar.
3032 registrar := NewRegistrar(client, Service{
3133 Key: key,
3234 Value: value,
3335 }, log.NewNopLogger())
34 // Register new test data to etcd
36
37 // Register our instance.
3538 registrar.Register()
3639
37 //Retrieve entries from etcd
38 _, err = client.GetEntries(key)
40 // At the end of our service lifecycle, for example at the end of func main,
41 // we should make sure to deregister ourselves. This is important! Don't
42 // accidentally skip this step by invoking a log.Fatal or os.Exit in the
43 // interim, which bypasses the defer stack.
44 defer registrar.Deregister()
45
46 // It's likely that we'll also want to connect to other services and call
47 // their methods. We can build a subscriber to listen for changes from etcd
48 // and build endpoints, wrap it with a load-balancer to pick a single
49 // endpoint, and finally wrap it with a retry strategy to get something that
50 // can be used as an endpoint directly.
51 barPrefix := "/services/barsvc"
52 subscriber, err := NewSubscriber(client, barPrefix, barFactory, log.NewNopLogger())
3953 if err != nil {
40 fmt.Println(err)
54 panic(err)
4155 }
56 balancer := lb.NewRoundRobin(subscriber)
57 retry := lb.Retry(3, 3*time.Second, balancer)
4258
43 factory := func(string) (endpoint.Endpoint, io.Closer, error) {
44 return endpoint.Nop, nil, nil
45 }
46 subscriber, _ := NewSubscriber(client, prefix, factory, log.NewNopLogger())
47
48 endpoints, err := subscriber.Endpoints()
49 if err != nil {
50 fmt.Printf("err: %v", err)
51 }
52 fmt.Println(len(endpoints)) // hopefully 1
53
54 // Deregister first instance of test data
55 registrar.Deregister()
56
57 endpoints, err = subscriber.Endpoints()
58 if err != nil {
59 fmt.Printf("err: %v", err)
60 }
61 fmt.Println(len(endpoints)) // hopefully 0
62
63 // Verify test data no longer exists in etcd
64 _, err = client.GetEntries(key)
65 if err != nil {
66 fmt.Println(err)
59 // And now retry can be used like any other endpoint.
60 req := struct{}{}
61 if _, err = retry(ctx, req); err != nil {
62 panic(err)
6763 }
6864 }
65
66 func barFactory(string) (endpoint.Endpoint, io.Closer, error) { return endpoint.Nop, nil, nil }
22 package etcd
33
44 import (
5 "flag"
6 "kit/log"
5 "io"
76 "os"
87 "testing"
98 "time"
109
11 etcdc "github.com/coreos/etcd/client"
12 etcdi "github.com/coreos/etcd/integration"
1310 "golang.org/x/net/context"
11
12 "github.com/go-kit/kit/endpoint"
13 "github.com/go-kit/kit/log"
1414 )
1515
16 var (
17 host []string
18 kitClientOptions ClientOptions
19 )
20
21 func TestMain(m *testing.M) {
22 flag.Parse()
23
24 kitClientOptions = ClientOptions{
25 Cert: "",
26 Key: "",
27 CaCert: "",
28 DialTimeout: (2 * time.Second),
29 DialKeepAlive: (2 * time.Second),
30 HeaderTimeoutPerRequest: (2 * time.Second),
16 // Package sd/etcd provides a wrapper around the etcd key/value store. This
17 // example assumes the user has an instance of etcd installed and running
18 // locally on port 2379.
19 func TestIntegration(t *testing.T) {
20 addr := os.Getenv("ETCD_ADDR")
21 if addr == "" {
22 t.Skip("ETCD_ADDR not set; skipping integration test")
3123 }
3224
33 code := m.Run()
25 var (
26 prefix = "/services/foosvc/" // known at compile time
27 instance = "1.2.3.4:8080" // taken from runtime or platform, somehow
28 key = prefix + instance
29 value = "http://" + instance // based on our transport
30 )
3431
35 os.Exit(code)
36 }
37
38 func TestRegistrar(t *testing.T) {
39 ts := etcdi.NewCluster(t, 1)
40 ts.Launch(t)
41 kitClient, err := NewClient(context.Background(), []string{ts.URL(0)}, kitClientOptions)
42
43 // Valid registrar should pass
44 registrar := NewRegistrar(kitClient, Service{
45 Key: "somekey",
46 Value: "somevalue",
47 DeleteOptions: &etcdc.DeleteOptions{
48 PrevValue: "",
49 PrevIndex: 0,
50 Recursive: true,
51 Dir: false,
52 },
53 }, log.NewNopLogger())
54
55 registrar.Register()
56 r1, err := kitClient.GetEntries(registrar.service.Key)
32 client, err := NewClient(context.Background(), []string{addr}, ClientOptions{
33 DialTimeout: 2 * time.Second,
34 DialKeepAlive: 2 * time.Second,
35 HeaderTimeoutPerRequest: 2 * time.Second,
36 })
5737 if err != nil {
58 t.Fatalf("unexpected error when getting value for deregistered key: %v", err)
38 t.Fatalf("NewClient(%q): %v", addr, err)
5939 }
6040
61 if want, have := registrar.service.Value, r1[0]; want != have {
41 // Verify test data is initially empty.
42 entries, err := client.GetEntries(key)
43 if err == nil {
44 t.Fatalf("GetEntries(%q): expected error, got none", key)
45 }
46 t.Logf("GetEntries(%q): %v (OK)", key, err)
47
48 // Instantiate a new Registrar, passing in test data.
49 registrar := NewRegistrar(client, Service{
50 Key: key,
51 Value: value,
52 }, log.NewContext(log.NewLogfmtLogger(os.Stderr)).With("component", "registrar"))
53
54 // Register our instance.
55 registrar.Register()
56 t.Logf("Registered")
57
58 // Retrieve entries from etcd manually.
59 entries, err = client.GetEntries(key)
60 if err != nil {
61 t.Fatalf("client.GetEntries(%q): %v", key, err)
62 }
63 if want, have := 1, len(entries); want != have {
64 t.Fatalf("client.GetEntries(%q): want %d, have %d", key, want, have)
65 }
66 if want, have := value, entries[0]; want != have {
6267 t.Fatalf("want %q, have %q", want, have)
6368 }
6469
70 subscriber, err := NewSubscriber(
71 client,
72 prefix,
73 func(string) (endpoint.Endpoint, io.Closer, error) { return endpoint.Nop, nil, nil },
74 log.NewContext(log.NewLogfmtLogger(os.Stderr)).With("component", "subscriber"),
75 )
76 if err != nil {
77 t.Fatalf("NewSubscriber: %v", err)
78 }
79 t.Logf("Constructed Subscriber OK")
80
81 if !within(time.Second, func() bool {
82 endpoints, err := subscriber.Endpoints()
83 return err == nil && len(endpoints) == 1
84 }) {
85 t.Fatalf("Subscriber didn't see Register in time")
86 }
87 t.Logf("Subscriber saw Register OK")
88
89 // Deregister first instance of test data.
6590 registrar.Deregister()
66 r2, err := kitClient.GetEntries(registrar.service.Key)
67 if len(r2) > 0 {
68 t.Fatalf("unexpected value found for deregistered key: %s", r2)
91 t.Logf("Deregistered")
92
93 // Check it was deregistered.
94 if !within(time.Second, func() bool {
95 endpoints, err := subscriber.Endpoints()
96 t.Logf("Checking Deregister: len(endpoints) = %d, err = %v", len(endpoints), err)
97 return err == nil && len(endpoints) == 0
98 }) {
99 t.Fatalf("Subscriber didn't see Deregister in time")
69100 }
70101
71 // Registrar with no key should register but value will be blank
72 registrarNoKey := NewRegistrar(kitClient, Service{
73 Key: "",
74 Value: "somevalue",
75 DeleteOptions: &etcdc.DeleteOptions{
76 PrevValue: "",
77 PrevIndex: 0,
78 Recursive: true,
79 Dir: false,
80 },
81 }, log.NewNopLogger())
102 // Verify test data no longer exists in etcd.
103 entries, err = client.GetEntries(key)
104 if err == nil {
105 t.Fatalf("GetEntries(%q): expected error, got none", key)
106 }
107 t.Logf("GetEntries(%q): %v (OK)", key, err)
108 }
82109
83 registrarNoKey.Register()
84 r3, err := kitClient.GetEntries(registrarNoKey.service.Key)
85 if err != nil {
86 t.Errorf("unexpected error when getting value for entry with no key: %v", err)
110 func within(d time.Duration, f func() bool) bool {
111 deadline := time.Now().Add(d)
112 for time.Now().Before(deadline) {
113 if f() {
114 return true
115 }
116 time.Sleep(d / 10)
87117 }
88
89 if want, have := "", r3[0]; want != have {
90 t.Fatalf("want %q, have %q", want, have)
91 }
92
93 // Registrar with no value should not register anything
94 registrarNoValue := NewRegistrar(kitClient, Service{
95 Key: "somekey",
96 Value: "",
97 DeleteOptions: &etcdc.DeleteOptions{
98 PrevValue: "",
99 PrevIndex: 0,
100 Recursive: true,
101 Dir: false,
102 },
103 }, log.NewNopLogger())
104
105 registrarNoValue.Register()
106 r4, err := kitClient.GetEntries(registrarNoValue.service.Key)
107 if err == nil {
108 t.Errorf("expected error when getting value for entry key which attempted to register with no value")
109 }
110
111 if len(r4) > 0 {
112 t.Fatalf("unexpected value retreived when getting value for entry with no value")
113 }
114
115 ts.Terminate(t)
118 return false
116119 }
11
22 import (
33 etcd "github.com/coreos/etcd/client"
4
45 "github.com/go-kit/kit/log"
56 )
67
1112 logger log.Logger
1213 }
1314
14 // Service holds the key, value and instance identifying data you
15 // want to publish to etcd.
15 // Service holds the instance identifying data you want to publish to etcd. Key
16 // must be unique, and value is the string returned to subscribers, typically
17 // called the "instance" string in other parts of package sd.
1618 type Service struct {
17 Key string // discovery key, example: /myorganization/myplatform/
18 Value string // service name value, example: addsvc
19 Key string // unique key, e.g. "/service/foobar/1.2.3.4:8080"
20 Value string // returned to subscribers, e.g. "http://1.2.3.4:8080"
1921 DeleteOptions *etcd.DeleteOptions
2022 }
2123
2224 // NewRegistrar returns a etcd Registrar acting on the provided catalog
23 // registration.
25 // registration (service).
2426 func NewRegistrar(client Client, service Service, logger log.Logger) *Registrar {
2527 return &Registrar{
2628 client: client,
2729 service: service,
2830 logger: log.NewContext(logger).With(
31 "key", service.Key,
2932 "value", service.Value,
30 "key", service.Key,
3133 ),
3234 }
3335 }
3436
35 // Register implements sd.Registrar interface.
37 // Register implements the sd.Registrar interface. Call it when you want your
38 // service to be registered in etcd, typically at startup.
3639 func (r *Registrar) Register() {
3740 if err := r.client.Register(r.service); err != nil {
3841 r.logger.Log("err", err)
4144 }
4245 }
4346
44 // Deregister implements sd.Registrar interface.
47 // Deregister implements the sd.Registrar interface. Call it when you want your
48 // service to be deregistered from etcd, typically just prior to shutdown.
4549 func (r *Registrar) Deregister() {
4650 if err := r.client.Deregister(r.service); err != nil {
4751 r.logger.Log("err", err)
00 package etcd
11
22 import (
3 etcd "github.com/coreos/etcd/client"
4
53 "github.com/go-kit/kit/endpoint"
64 "github.com/go-kit/kit/log"
75 "github.com/go-kit/kit/sd"
4442 }
4543
4644 func (s *Subscriber) loop() {
47 responseChan := make(chan *etcd.Response)
48 go s.client.WatchPrefix(s.prefix, responseChan)
45 ch := make(chan struct{})
46 go s.client.WatchPrefix(s.prefix, ch)
4947 for {
5048 select {
51 case <-responseChan:
49 case <-ch:
5250 instances, err := s.client.GetEntries(s.prefix)
5351 if err != nil {
5452 s.logger.Log("msg", "failed to retrieve entries", "err", err)
8585 return entries, nil
8686 }
8787
88 func (c *fakeClient) WatchPrefix(prefix string, responseChan chan *stdetcd.Response) {}
88 func (c *fakeClient) WatchPrefix(prefix string, ch chan struct{}) {}
8989
9090 func (c *fakeClient) Register(Service) error {
9191 return nil