diff --git a/sd/zk/client.go b/sd/zk/client.go index 8220817..70cdab3 100644 --- a/sd/zk/client.go +++ b/sd/zk/client.go @@ -16,6 +16,8 @@ DefaultACL = zk.WorldACL(zk.PermAll) ErrInvalidCredentials = errors.New("invalid credentials provided") ErrClientClosed = errors.New("client service closed") + ErrNotRegistered = errors.New("not registered") + ErrNodeNotFound = errors.New("node not found") ) const ( @@ -35,6 +37,10 @@ // CreateParentNodes should try to create the path in case it does not exist // yet on ZooKeeper. CreateParentNodes(path string) error + // Register a service with ZooKeeper. + Register(s *Service) error + // Deregister a service with ZooKeeper. + Deregister(s *Service) error // Stop should properly shutdown the client implementation Stop() } @@ -223,6 +229,42 @@ return resp, eventc, nil } +// Register implements the ZooKeeper Client interface. +func (c *client) Register(s *Service) error { + if s.Path[len(s.Path)-1] != '/' { + s.Path += "/" + } + path := s.Path + s.Name + if err := c.CreateParentNodes(path); err != nil { + return err + } + node, err := c.CreateProtectedEphemeralSequential(path, s.Data, c.acl) + if err != nil { + return err + } + s.node = node + return nil +} + +// Deregister implements the ZooKeeper Client interface. +func (c *client) Deregister(s *Service) error { + if s.node == "" { + return ErrNotRegistered + } + path := s.Path + s.Name + found, stat, err := c.Exists(path) + if err != nil { + return err + } + if !found { + return ErrNodeNotFound + } + if err := c.Delete(path, stat.Version); err != nil { + return err + } + return nil +} + // Stop implements the ZooKeeper Client interface. func (c *client) Stop() { c.active = false diff --git a/sd/zk/integration_test.go b/sd/zk/integration_test.go index 0e67679..6084415 100644 --- a/sd/zk/integration_test.go +++ b/sd/zk/integration_test.go @@ -52,7 +52,7 @@ } defer s.Stop() - services, err := s.Services() + services, err := s.Endpoints() if err != nil { t.Fatal(err) } @@ -124,7 +124,7 @@ } func TestGetEntriesOnServer(t *testing.T) { - var instancePayload = "protocol://hostname:port/routing" + var instancePayload = "10.0.3.204:8002" c1, err := NewClient(host, logger) if err != nil { @@ -140,29 +140,26 @@ } defer c2.Stop() - c2impl, _ := c2.(*client) - _, err = c2impl.Create( - path+"/instance1", - []byte(instancePayload), - stdzk.FlagEphemeral|stdzk.FlagSequence, - stdzk.WorldACL(stdzk.PermAll), - ) - if err != nil { - t.Fatalf("Unable to create test ephemeral znode 1: %v", err) - } - _, err = c2impl.Create( - path+"/instance2", - []byte(instancePayload+"2"), - stdzk.FlagEphemeral|stdzk.FlagSequence, - stdzk.WorldACL(stdzk.PermAll), - ) - if err != nil { - t.Fatalf("Unable to create test ephemeral znode 2: %v", err) + instance1 := &Service{ + Path: path, + Name: "instance1", + Data: []byte(instancePayload), + } + if err = c2.Register(instance1); err != nil { + t.Fatalf("Unable to create test ephemeral znode 1: %+v", err) + } + instance2 := &Service{ + Path: path, + Name: "instance2", + Data: []byte(instancePayload), + } + if err = c2.Register(instance2); err != nil { + t.Fatalf("Unable to create test ephemeral znode 2: %+v", err) } time.Sleep(50 * time.Millisecond) - services, err := s.Services() + services, err := s.Endpoints() if err != nil { t.Fatal(err) } @@ -180,22 +177,36 @@ if err != nil { t.Fatal(err) } - _, err = c.(*client).Create( - path+"/instance3", - []byte("just some payload"), - stdzk.FlagEphemeral|stdzk.FlagSequence, - stdzk.WorldACL(stdzk.PermAll), - ) - if err != nil { - t.Fatalf("Unable to create test ephemeral znode: %v", err) - } + + instance3 := Service{ + Path: path, + Name: "instance3", + Data: []byte("just some payload"), + } + registrar := NewRegistrar(c, instance3, logger) + registrar.Register() select { case event := <-eventc: if want, have := stdzk.EventNodeChildrenChanged.String(), event.Type.String(); want != have { t.Errorf("want %s, have %s", want, have) } - case <-time.After(20 * time.Millisecond): + case <-time.After(100 * time.Millisecond): t.Errorf("expected incoming watch event, timeout occurred") } -} + _, eventc, err = c.GetEntries(path) + if err != nil { + t.Fatal(err) + } + + registrar.Deregister() + select { + case event := <-eventc: + if want, have := stdzk.EventNodeChildrenChanged.String(), event.Type.String(); want != have { + t.Errorf("want %s, have %s", want, have) + } + case <-time.After(100 * time.Millisecond): + t.Errorf("expected incoming watch event, timeout occurred") + } + +} diff --git a/sd/zk/registrar.go b/sd/zk/registrar.go new file mode 100644 index 0000000..dcfae39 --- /dev/null +++ b/sd/zk/registrar.go @@ -0,0 +1,51 @@ +package zk + +import "github.com/go-kit/kit/log" + +// Registrar registers service instance liveness information to ZooKeeper. +type Registrar struct { + client Client + service Service + logger log.Logger +} + +// Service holds the root path, service name and instance identifying data you +// want to publish to ZooKeeper. +type Service struct { + Path string // discovery namespace, example: /myorganization/myplatform/ + Name string // service name, example: addscv + Data []byte // instance data to store for discovery, example: 10.0.2.10:80 + node string // Client will record the ephemeral node name so we can deregister +} + +// NewRegistrar returns a ZooKeeper Registrar acting on the provided catalog +// registration. +func NewRegistrar(client Client, service Service, logger log.Logger) *Registrar { + return &Registrar{ + client: client, + service: service, + logger: log.NewContext(logger).With( + "service", service.Name, + "path", service.Path, + "data", string(service.Data), + ), + } +} + +// Register implements sd.Registrar interface. +func (r *Registrar) Register() { + if err := r.client.Register(&r.service); err != nil { + r.logger.Log("err", err) + } else { + r.logger.Log("action", "register") + } +} + +// Deregister implements sd.Registrar interface. +func (r *Registrar) Deregister() { + if err := r.client.Deregister(&r.service); err != nil { + r.logger.Log("err", err) + } else { + r.logger.Log("action", "deregister") + } +} diff --git a/sd/zk/util_test.go b/sd/zk/util_test.go index 2a4e1fe..c77fde9 100644 --- a/sd/zk/util_test.go +++ b/sd/zk/util_test.go @@ -71,6 +71,14 @@ c.ch <- zk.Event{} } +func (c *fakeClient) Register(s *Service) error { + return nil +} + +func (c *fakeClient) Deregister(s *Service) error { + return nil +} + func (c *fakeClient) SendErrorOnWatch() { c.mtx.Lock() defer c.mtx.Unlock()