Codebase list golang-github-go-kit-kit / 8fd6486
Merge pull request #288 from basvanbeek/master sd.Registrar for ZooKeeper see issue #284 Bas van Beek 7 years ago
4 changed file(s) with 144 addition(s) and 32 deletion(s). Raw diff Collapse all Expand all
1515 DefaultACL = zk.WorldACL(zk.PermAll)
1616 ErrInvalidCredentials = errors.New("invalid credentials provided")
1717 ErrClientClosed = errors.New("client service closed")
18 ErrNotRegistered = errors.New("not registered")
19 ErrNodeNotFound = errors.New("node not found")
1820 )
1921
2022 const (
3436 // CreateParentNodes should try to create the path in case it does not exist
3537 // yet on ZooKeeper.
3638 CreateParentNodes(path string) error
39 // Register a service with ZooKeeper.
40 Register(s *Service) error
41 // Deregister a service with ZooKeeper.
42 Deregister(s *Service) error
3743 // Stop should properly shutdown the client implementation
3844 Stop()
3945 }
222228 return resp, eventc, nil
223229 }
224230
231 // Register implements the ZooKeeper Client interface.
232 func (c *client) Register(s *Service) error {
233 if s.Path[len(s.Path)-1] != '/' {
234 s.Path += "/"
235 }
236 path := s.Path + s.Name
237 if err := c.CreateParentNodes(path); err != nil {
238 return err
239 }
240 node, err := c.CreateProtectedEphemeralSequential(path, s.Data, c.acl)
241 if err != nil {
242 return err
243 }
244 s.node = node
245 return nil
246 }
247
248 // Deregister implements the ZooKeeper Client interface.
249 func (c *client) Deregister(s *Service) error {
250 if s.node == "" {
251 return ErrNotRegistered
252 }
253 path := s.Path + s.Name
254 found, stat, err := c.Exists(path)
255 if err != nil {
256 return err
257 }
258 if !found {
259 return ErrNodeNotFound
260 }
261 if err := c.Delete(path, stat.Version); err != nil {
262 return err
263 }
264 return nil
265 }
266
225267 // Stop implements the ZooKeeper Client interface.
226268 func (c *client) Stop() {
227269 c.active = false
5151 }
5252 defer s.Stop()
5353
54 services, err := s.Services()
54 services, err := s.Endpoints()
5555 if err != nil {
5656 t.Fatal(err)
5757 }
123123 }
124124
125125 func TestGetEntriesOnServer(t *testing.T) {
126 var instancePayload = "protocol://hostname:port/routing"
126 var instancePayload = "10.0.3.204:8002"
127127
128128 c1, err := NewClient(host, logger)
129129 if err != nil {
139139 }
140140 defer c2.Stop()
141141
142 c2impl, _ := c2.(*client)
143 _, err = c2impl.Create(
144 path+"/instance1",
145 []byte(instancePayload),
146 stdzk.FlagEphemeral|stdzk.FlagSequence,
147 stdzk.WorldACL(stdzk.PermAll),
148 )
149 if err != nil {
150 t.Fatalf("Unable to create test ephemeral znode 1: %v", err)
151 }
152 _, err = c2impl.Create(
153 path+"/instance2",
154 []byte(instancePayload+"2"),
155 stdzk.FlagEphemeral|stdzk.FlagSequence,
156 stdzk.WorldACL(stdzk.PermAll),
157 )
158 if err != nil {
159 t.Fatalf("Unable to create test ephemeral znode 2: %v", err)
142 instance1 := &Service{
143 Path: path,
144 Name: "instance1",
145 Data: []byte(instancePayload),
146 }
147 if err = c2.Register(instance1); err != nil {
148 t.Fatalf("Unable to create test ephemeral znode 1: %+v", err)
149 }
150 instance2 := &Service{
151 Path: path,
152 Name: "instance2",
153 Data: []byte(instancePayload),
154 }
155 if err = c2.Register(instance2); err != nil {
156 t.Fatalf("Unable to create test ephemeral znode 2: %+v", err)
160157 }
161158
162159 time.Sleep(50 * time.Millisecond)
163160
164 services, err := s.Services()
161 services, err := s.Endpoints()
165162 if err != nil {
166163 t.Fatal(err)
167164 }
179176 if err != nil {
180177 t.Fatal(err)
181178 }
182 _, err = c.(*client).Create(
183 path+"/instance3",
184 []byte("just some payload"),
185 stdzk.FlagEphemeral|stdzk.FlagSequence,
186 stdzk.WorldACL(stdzk.PermAll),
187 )
188 if err != nil {
189 t.Fatalf("Unable to create test ephemeral znode: %v", err)
190 }
179
180 instance3 := Service{
181 Path: path,
182 Name: "instance3",
183 Data: []byte("just some payload"),
184 }
185 registrar := NewRegistrar(c, instance3, logger)
186 registrar.Register()
191187 select {
192188 case event := <-eventc:
193189 if want, have := stdzk.EventNodeChildrenChanged.String(), event.Type.String(); want != have {
194190 t.Errorf("want %s, have %s", want, have)
195191 }
196 case <-time.After(20 * time.Millisecond):
192 case <-time.After(100 * time.Millisecond):
197193 t.Errorf("expected incoming watch event, timeout occurred")
198194 }
199195
200 }
196 _, eventc, err = c.GetEntries(path)
197 if err != nil {
198 t.Fatal(err)
199 }
200
201 registrar.Deregister()
202 select {
203 case event := <-eventc:
204 if want, have := stdzk.EventNodeChildrenChanged.String(), event.Type.String(); want != have {
205 t.Errorf("want %s, have %s", want, have)
206 }
207 case <-time.After(100 * time.Millisecond):
208 t.Errorf("expected incoming watch event, timeout occurred")
209 }
210
211 }
0 package zk
1
2 import "github.com/go-kit/kit/log"
3
4 // Registrar registers service instance liveness information to ZooKeeper.
5 type Registrar struct {
6 client Client
7 service Service
8 logger log.Logger
9 }
10
11 // Service holds the root path, service name and instance identifying data you
12 // want to publish to ZooKeeper.
13 type Service struct {
14 Path string // discovery namespace, example: /myorganization/myplatform/
15 Name string // service name, example: addscv
16 Data []byte // instance data to store for discovery, example: 10.0.2.10:80
17 node string // Client will record the ephemeral node name so we can deregister
18 }
19
20 // NewRegistrar returns a ZooKeeper Registrar acting on the provided catalog
21 // registration.
22 func NewRegistrar(client Client, service Service, logger log.Logger) *Registrar {
23 return &Registrar{
24 client: client,
25 service: service,
26 logger: log.NewContext(logger).With(
27 "service", service.Name,
28 "path", service.Path,
29 "data", string(service.Data),
30 ),
31 }
32 }
33
34 // Register implements sd.Registrar interface.
35 func (r *Registrar) Register() {
36 if err := r.client.Register(&r.service); err != nil {
37 r.logger.Log("err", err)
38 } else {
39 r.logger.Log("action", "register")
40 }
41 }
42
43 // Deregister implements sd.Registrar interface.
44 func (r *Registrar) Deregister() {
45 if err := r.client.Deregister(&r.service); err != nil {
46 r.logger.Log("err", err)
47 } else {
48 r.logger.Log("action", "deregister")
49 }
50 }
7070 c.ch <- zk.Event{}
7171 }
7272
73 func (c *fakeClient) Register(s *Service) error {
74 return nil
75 }
76
77 func (c *fakeClient) Deregister(s *Service) error {
78 return nil
79 }
80
7381 func (c *fakeClient) SendErrorOnWatch() {
7482 c.mtx.Lock()
7583 defer c.mtx.Unlock()