diff --git a/sd/etcd/client.go b/sd/etcd/client.go index b9e2904..6a1ecb4 100644 --- a/sd/etcd/client.go +++ b/sd/etcd/client.go @@ -3,6 +3,7 @@ import ( "crypto/tls" "crypto/x509" + "errors" "io/ioutil" "net" "net/http" @@ -10,6 +11,11 @@ etcd "github.com/coreos/etcd/client" "golang.org/x/net/context" +) + +var ( + ErrNoKey = errors.New("no key provided") + ErrNoValue = errors.New("no value provided") ) // Client is a wrapper around the etcd client. @@ -20,6 +26,11 @@ // WatchPrefix starts watching every change for given prefix in etcd. When an // change is detected it will populate the responseChan when an *etcd.Response. WatchPrefix(prefix string, responseChan chan *etcd.Response) + + // Register a service with etcd. + Register(s Service) error + // Deregister a service with etcd. + Deregister(s Service) error } type client struct { @@ -112,10 +123,16 @@ } entries := make([]string, len(resp.Node.Nodes)) - for i, node := range resp.Node.Nodes { - entries[i] = node.Value + + if len(entries) > 0 { + for i, node := range resp.Node.Nodes { + entries[i] = node.Value + } + } else { + entries = append(entries, resp.Node.Value) } return entries, nil + } // WatchPrefix implements the etcd Client interface. @@ -129,3 +146,22 @@ responseChan <- res } } + +func (c *client) Register(s Service) error { + if s.Key == "" { + return ErrNoKey + } + if s.Value == "" { + return ErrNoValue + } + _, err := c.keysAPI.Create(c.ctx, s.Key, s.Value) + return err +} + +func (c *client) Deregister(s Service) error { + if s.Key == "" { + return ErrNoKey + } + _, err := c.keysAPI.Delete(c.ctx, s.Key, s.DeleteOptions) + return err +} diff --git a/sd/etcd/client_test.go b/sd/etcd/client_test.go new file mode 100644 index 0000000..1735b77 --- /dev/null +++ b/sd/etcd/client_test.go @@ -0,0 +1,70 @@ +package etcd + +import ( + "testing" + "time" + + "golang.org/x/net/context" +) + +func TestNewClient(t *testing.T) { + ClientOptions := ClientOptions{ + Cert: "", + Key: "", + CaCert: "", + DialTimeout: (2 * time.Second), + DialKeepAline: (2 * time.Second), + HeaderTimeoutPerRequest: (2 * time.Second), + } + + client, err := NewClient( + context.Background(), + []string{"http://irrelevant:12345"}, + ClientOptions, + ) + if err != nil { + t.Fatalf("unexpected error creating client: %v", err) + } + if client == nil { + t.Fatal("expected new Client, got nil") + } +} + +func TestOptions(t *testing.T) { + //creating new client should fail when providing invalid or missing endpoints + a, err := NewClient( + context.Background(), + []string{}, + ClientOptions{ + Cert: "", + Key: "", + CaCert: "", + DialTimeout: (2 * time.Second), + DialKeepAline: (2 * time.Second), + HeaderTimeoutPerRequest: (2 * time.Second), + }) + + if err == nil { + t.Errorf("expected error: %v", err) + } + if a != nil { + t.Fatalf("expected client to be nil on failure") + } + + //creating new client should fail when providing invalid or missing endpoints + _, err = NewClient( + context.Background(), + []string{"http://irrelevant:12345"}, + ClientOptions{ + Cert: "blank.crt", + Key: "blank.key", + CaCert: "blank.cacert", + DialTimeout: (2 * time.Second), + DialKeepAline: (2 * time.Second), + HeaderTimeoutPerRequest: (2 * time.Second), + }) + + if err == nil { + t.Errorf("expected error: %v", err) + } +} diff --git a/sd/etcd/integration_test.go b/sd/etcd/integration_test.go new file mode 100644 index 0000000..72027e0 --- /dev/null +++ b/sd/etcd/integration_test.go @@ -0,0 +1,117 @@ +// +build integration + +package etcd + +import ( + "flag" + "kit/log" + "os" + "testing" + "time" + + etcdc "github.com/coreos/etcd/client" + etcdi "github.com/coreos/etcd/integration" + "golang.org/x/net/context" +) + +var ( + host []string + kitClientOptions ClientOptions +) + +func TestMain(m *testing.M) { + flag.Parse() + + kitClientOptions = ClientOptions{ + Cert: "", + Key: "", + CaCert: "", + DialTimeout: (2 * time.Second), + DialKeepAline: (2 * time.Second), + HeaderTimeoutPerRequest: (2 * time.Second), + } + + code := m.Run() + + os.Exit(code) +} + +func TestRegistrar(t *testing.T) { + ts := etcdi.NewCluster(t, 1) + ts.Launch(t) + kitClient, err := NewClient(context.Background(), []string{ts.URL(0)}, kitClientOptions) + + // Valid registrar should pass + registrar := NewRegistrar(kitClient, Service{ + Key: "somekey", + Value: "somevalue", + DeleteOptions: &etcdc.DeleteOptions{ + PrevValue: "", + PrevIndex: 0, + Recursive: true, + Dir: false, + }, + }, log.NewNopLogger()) + + registrar.Register() + r1, err := kitClient.GetEntries(registrar.service.Key) + if err != nil { + t.Fatalf("unexpected error when getting value for deregistered key: %v", err) + } + + if want, have := registrar.service.Value, r1[0]; want != have { + t.Fatalf("want %q, have %q", want, have) + } + + registrar.Deregister() + r2, err := kitClient.GetEntries(registrar.service.Key) + if len(r2) > 0 { + t.Fatalf("unexpected value found for deregistered key: %s", r2) + } + + // Registrar with no key should register but value will be blank + registrarNoKey := NewRegistrar(kitClient, Service{ + Key: "", + Value: "somevalue", + DeleteOptions: &etcdc.DeleteOptions{ + PrevValue: "", + PrevIndex: 0, + Recursive: true, + Dir: false, + }, + }, log.NewNopLogger()) + + registrarNoKey.Register() + r3, err := kitClient.GetEntries(registrarNoKey.service.Key) + if err != nil { + t.Errorf("unexpected error when getting value for entry with no key: %v", err) + } + + if want, have := "", r3[0]; want != have { + t.Fatalf("want %q, have %q", want, have) + } + + // Registrar with no value should not register anything + registrarNoValue := NewRegistrar(kitClient, Service{ + Key: "somekey", + Value: "", + DeleteOptions: &etcdc.DeleteOptions{ + PrevValue: "", + PrevIndex: 0, + Recursive: true, + Dir: false, + }, + }, log.NewNopLogger()) + + registrarNoValue.Register() + r4, err := kitClient.GetEntries(registrarNoValue.service.Key) + if err == nil { + t.Errorf("expected error when getting value for entry key which attempted to register with no value") + } + + if len(r4) > 0 { + t.Fatalf("unexpected value retreived when getting value for entry with no value") + } + + ts.Terminate(t) +} diff --git a/sd/etcd/registrar.go b/sd/etcd/registrar.go new file mode 100644 index 0000000..4e3d15b --- /dev/null +++ b/sd/etcd/registrar.go @@ -0,0 +1,52 @@ +package etcd + +import ( + etcd "github.com/coreos/etcd/client" + "github.com/go-kit/kit/log" +) + +// Registrar registers service instance liveness information to etcd. +type Registrar struct { + client Client + service Service + logger log.Logger +} + +// Service holds the key, value and instance identifying data you +// want to publish to etcd. +type Service struct { + Key string // discovery key, example: /myorganization/myplatform/ + Value string // service name value, example: addsvc + DeleteOptions *etcd.DeleteOptions +} + +// NewRegistrar returns a etcd Registrar acting on the provided catalog +// registration. +func NewRegistrar(client Client, service Service, logger log.Logger) *Registrar { + return &Registrar{ + client: client, + service: service, + logger: log.NewContext(logger).With( + "value", service.Value, + "key", service.Key, + ), + } +} + +// Register implements sd.Registrar interface. +func (r *Registrar) Register() { + if err := r.client.Register(r.service); err != nil { + r.logger.Log("err", err) + } else { + r.logger.Log("action", "register") + } +} + +// Deregister implements sd.Registrar interface. +func (r *Registrar) Deregister() { + if err := r.client.Deregister(r.service); err != nil { + r.logger.Log("err", err) + } else { + r.logger.Log("action", "deregister") + } +} diff --git a/sd/etcd/subscriber_test.go b/sd/etcd/subscriber_test.go index 0073e1e..84f7813 100644 --- a/sd/etcd/subscriber_test.go +++ b/sd/etcd/subscriber_test.go @@ -87,3 +87,10 @@ } func (c *fakeClient) WatchPrefix(prefix string, responseChan chan *stdetcd.Response) {} + +func (c *fakeClient) Register(Service) error { + return nil +} +func (c *fakeClient) Deregister(Service) error { + return nil +}