Adding Register and Deregister functions as well as ingegration tests.
Matt Strong
7 years ago
2 | 2 | import ( |
3 | 3 | "crypto/tls" |
4 | 4 | "crypto/x509" |
5 | "errors" | |
5 | 6 | "io/ioutil" |
6 | 7 | "net" |
7 | 8 | "net/http" |
9 | 10 | |
10 | 11 | etcd "github.com/coreos/etcd/client" |
11 | 12 | "golang.org/x/net/context" |
13 | ) | |
14 | ||
15 | var ( | |
16 | ErrNoKey = errors.New("no key provided") | |
17 | ErrNoValue = errors.New("no value provided") | |
12 | 18 | ) |
13 | 19 | |
14 | 20 | // Client is a wrapper around the etcd client. |
19 | 25 | // WatchPrefix starts watching every change for given prefix in etcd. When an |
20 | 26 | // change is detected it will populate the responseChan when an *etcd.Response. |
21 | 27 | WatchPrefix(prefix string, responseChan chan *etcd.Response) |
28 | ||
29 | // Register a service with etcd. | |
30 | Register(s Service) error | |
31 | // Deregister a service with etcd. | |
32 | Deregister(s Service) error | |
22 | 33 | } |
23 | 34 | |
24 | 35 | type client struct { |
111 | 122 | } |
112 | 123 | |
113 | 124 | entries := make([]string, len(resp.Node.Nodes)) |
114 | for i, node := range resp.Node.Nodes { | |
115 | entries[i] = node.Value | |
125 | ||
126 | if len(entries) > 0 { | |
127 | for i, node := range resp.Node.Nodes { | |
128 | entries[i] = node.Value | |
129 | } | |
130 | } else { | |
131 | entries = append(entries, resp.Node.Value) | |
116 | 132 | } |
117 | 133 | return entries, nil |
134 | ||
118 | 135 | } |
119 | 136 | |
120 | 137 | // WatchPrefix implements the etcd Client interface. |
128 | 145 | responseChan <- res |
129 | 146 | } |
130 | 147 | } |
148 | ||
149 | func (c *client) Register(s Service) error { | |
150 | if s.Key == "" { | |
151 | return ErrNoKey | |
152 | } | |
153 | if s.Value == "" { | |
154 | return ErrNoValue | |
155 | } | |
156 | _, err := c.keysAPI.Create(c.ctx, s.Key, s.Value) | |
157 | return err | |
158 | } | |
159 | ||
160 | func (c *client) Deregister(s Service) error { | |
161 | if s.Key == "" { | |
162 | return ErrNoKey | |
163 | } | |
164 | _, err := c.keysAPI.Delete(c.ctx, s.Key, s.DeleteOptions) | |
165 | return err | |
166 | } |
0 | package etcd | |
1 | ||
2 | import ( | |
3 | "testing" | |
4 | "time" | |
5 | ||
6 | "golang.org/x/net/context" | |
7 | ) | |
8 | ||
9 | func TestNewClient(t *testing.T) { | |
10 | ClientOptions := ClientOptions{ | |
11 | Cert: "", | |
12 | Key: "", | |
13 | CaCert: "", | |
14 | DialTimeout: (2 * time.Second), | |
15 | DialKeepAline: (2 * time.Second), | |
16 | HeaderTimeoutPerRequest: (2 * time.Second), | |
17 | } | |
18 | ||
19 | client, err := NewClient( | |
20 | context.Background(), | |
21 | []string{"http://irrelevant:12345"}, | |
22 | ClientOptions, | |
23 | ) | |
24 | if err != nil { | |
25 | t.Fatalf("unexpected error creating client: %v", err) | |
26 | } | |
27 | if client == nil { | |
28 | t.Fatal("expected new Client, got nil") | |
29 | } | |
30 | } | |
31 | ||
32 | func TestOptions(t *testing.T) { | |
33 | //creating new client should fail when providing invalid or missing endpoints | |
34 | a, err := NewClient( | |
35 | context.Background(), | |
36 | []string{}, | |
37 | ClientOptions{ | |
38 | Cert: "", | |
39 | Key: "", | |
40 | CaCert: "", | |
41 | DialTimeout: (2 * time.Second), | |
42 | DialKeepAline: (2 * time.Second), | |
43 | HeaderTimeoutPerRequest: (2 * time.Second), | |
44 | }) | |
45 | ||
46 | if err == nil { | |
47 | t.Errorf("expected error: %v", err) | |
48 | } | |
49 | if a != nil { | |
50 | t.Fatalf("expected client to be nil on failure") | |
51 | } | |
52 | ||
53 | //creating new client should fail when providing invalid or missing endpoints | |
54 | _, err = NewClient( | |
55 | context.Background(), | |
56 | []string{"http://irrelevant:12345"}, | |
57 | ClientOptions{ | |
58 | Cert: "blank.crt", | |
59 | Key: "blank.key", | |
60 | CaCert: "blank.cacert", | |
61 | DialTimeout: (2 * time.Second), | |
62 | DialKeepAline: (2 * time.Second), | |
63 | HeaderTimeoutPerRequest: (2 * time.Second), | |
64 | }) | |
65 | ||
66 | if err == nil { | |
67 | t.Errorf("expected error: %v", err) | |
68 | } | |
69 | } |
0 | // +build integration | |
1 | ||
2 | package etcd | |
3 | ||
4 | import ( | |
5 | "flag" | |
6 | "kit/log" | |
7 | "os" | |
8 | "testing" | |
9 | "time" | |
10 | ||
11 | etcdc "github.com/coreos/etcd/client" | |
12 | etcdi "github.com/coreos/etcd/integration" | |
13 | "golang.org/x/net/context" | |
14 | ) | |
15 | ||
16 | var ( | |
17 | host []string | |
18 | kitClientOptions ClientOptions | |
19 | ) | |
20 | ||
21 | func TestMain(m *testing.M) { | |
22 | flag.Parse() | |
23 | ||
24 | kitClientOptions = ClientOptions{ | |
25 | Cert: "", | |
26 | Key: "", | |
27 | CaCert: "", | |
28 | DialTimeout: (2 * time.Second), | |
29 | DialKeepAline: (2 * time.Second), | |
30 | HeaderTimeoutPerRequest: (2 * time.Second), | |
31 | } | |
32 | ||
33 | code := m.Run() | |
34 | ||
35 | os.Exit(code) | |
36 | } | |
37 | ||
38 | func TestRegistrar(t *testing.T) { | |
39 | ts := etcdi.NewCluster(t, 1) | |
40 | ts.Launch(t) | |
41 | kitClient, err := NewClient(context.Background(), []string{ts.URL(0)}, kitClientOptions) | |
42 | ||
43 | // Valid registrar should pass | |
44 | registrar := NewRegistrar(kitClient, Service{ | |
45 | Key: "somekey", | |
46 | Value: "somevalue", | |
47 | DeleteOptions: &etcdc.DeleteOptions{ | |
48 | PrevValue: "", | |
49 | PrevIndex: 0, | |
50 | Recursive: true, | |
51 | Dir: false, | |
52 | }, | |
53 | }, log.NewNopLogger()) | |
54 | ||
55 | registrar.Register() | |
56 | r1, err := kitClient.GetEntries(registrar.service.Key) | |
57 | if err != nil { | |
58 | t.Fatalf("unexpected error when getting value for deregistered key: %v", err) | |
59 | } | |
60 | ||
61 | if want, have := registrar.service.Value, r1[0]; want != have { | |
62 | t.Fatalf("want %q, have %q", want, have) | |
63 | } | |
64 | ||
65 | registrar.Deregister() | |
66 | r2, err := kitClient.GetEntries(registrar.service.Key) | |
67 | if len(r2) > 0 { | |
68 | t.Fatalf("unexpected value found for deregistered key: %s", r2) | |
69 | } | |
70 | ||
71 | // Registrar with no key should register but value will be blank | |
72 | registrarNoKey := NewRegistrar(kitClient, Service{ | |
73 | Key: "", | |
74 | Value: "somevalue", | |
75 | DeleteOptions: &etcdc.DeleteOptions{ | |
76 | PrevValue: "", | |
77 | PrevIndex: 0, | |
78 | Recursive: true, | |
79 | Dir: false, | |
80 | }, | |
81 | }, log.NewNopLogger()) | |
82 | ||
83 | registrarNoKey.Register() | |
84 | r3, err := kitClient.GetEntries(registrarNoKey.service.Key) | |
85 | if err != nil { | |
86 | t.Errorf("unexpected error when getting value for entry with no key: %v", err) | |
87 | } | |
88 | ||
89 | if want, have := "", r3[0]; want != have { | |
90 | t.Fatalf("want %q, have %q", want, have) | |
91 | } | |
92 | ||
93 | // Registrar with no value should not register anything | |
94 | registrarNoValue := NewRegistrar(kitClient, Service{ | |
95 | Key: "somekey", | |
96 | Value: "", | |
97 | DeleteOptions: &etcdc.DeleteOptions{ | |
98 | PrevValue: "", | |
99 | PrevIndex: 0, | |
100 | Recursive: true, | |
101 | Dir: false, | |
102 | }, | |
103 | }, log.NewNopLogger()) | |
104 | ||
105 | registrarNoValue.Register() | |
106 | r4, err := kitClient.GetEntries(registrarNoValue.service.Key) | |
107 | if err == nil { | |
108 | t.Errorf("expected error when getting value for entry key which attempted to register with no value") | |
109 | } | |
110 | ||
111 | if len(r4) > 0 { | |
112 | t.Fatalf("unexpected value retreived when getting value for entry with no value") | |
113 | } | |
114 | ||
115 | ts.Terminate(t) | |
116 | } |
0 | package etcd | |
1 | ||
2 | import ( | |
3 | etcd "github.com/coreos/etcd/client" | |
4 | "github.com/go-kit/kit/log" | |
5 | ) | |
6 | ||
7 | // Registrar registers service instance liveness information to etcd. | |
8 | type Registrar struct { | |
9 | client Client | |
10 | service Service | |
11 | logger log.Logger | |
12 | } | |
13 | ||
14 | // Service holds the key, value and instance identifying data you | |
15 | // want to publish to etcd. | |
16 | type Service struct { | |
17 | Key string // discovery key, example: /myorganization/myplatform/ | |
18 | Value string // service name value, example: addsvc | |
19 | DeleteOptions *etcd.DeleteOptions | |
20 | } | |
21 | ||
22 | // NewRegistrar returns a etcd Registrar acting on the provided catalog | |
23 | // registration. | |
24 | func NewRegistrar(client Client, service Service, logger log.Logger) *Registrar { | |
25 | return &Registrar{ | |
26 | client: client, | |
27 | service: service, | |
28 | logger: log.NewContext(logger).With( | |
29 | "value", service.Value, | |
30 | "key", service.Key, | |
31 | ), | |
32 | } | |
33 | } | |
34 | ||
35 | // Register implements sd.Registrar interface. | |
36 | func (r *Registrar) Register() { | |
37 | if err := r.client.Register(r.service); err != nil { | |
38 | r.logger.Log("err", err) | |
39 | } else { | |
40 | r.logger.Log("action", "register") | |
41 | } | |
42 | } | |
43 | ||
44 | // Deregister implements sd.Registrar interface. | |
45 | func (r *Registrar) Deregister() { | |
46 | if err := r.client.Deregister(r.service); err != nil { | |
47 | r.logger.Log("err", err) | |
48 | } else { | |
49 | r.logger.Log("action", "deregister") | |
50 | } | |
51 | } |