diff --git a/sd/etcd/client.go b/sd/etcd/client.go index 3501a54..0508723 100644 --- a/sd/etcd/client.go +++ b/sd/etcd/client.go @@ -49,7 +49,8 @@ ctx context.Context } -// ClientOptions defines options for the etcd client. +// ClientOptions defines options for the etcd client. All values are optional. +// If any duration is not specified, a default of 3 seconds will be used. type ClientOptions struct { Cert string Key string @@ -59,11 +60,21 @@ HeaderTimeoutPerRequest time.Duration } -// NewClient returns an *etcd.Client with a connection to the named machines. -// It will return an error if a connection to the cluster cannot be made. -// The parameter machines needs to be a full URL with schemas. -// e.g. "http://localhost:2379" will work, but "localhost:2379" will not. +// NewClient returns Client with a connection to the named machines. It will +// return an error if a connection to the cluster cannot be made. The parameter +// machines needs to be a full URL with schemas. e.g. "http://localhost:2379" +// will work, but "localhost:2379" will not. func NewClient(ctx context.Context, machines []string, options ClientOptions) (Client, error) { + if options.DialTimeout == 0 { + options.DialTimeout = 3 * time.Second + } + if options.DialKeepAlive == 0 { + options.DialKeepAlive = 3 * time.Second + } + if options.HeaderTimeoutPerRequest == 0 { + options.HeaderTimeoutPerRequest = 3 * time.Second + } + transport := etcd.DefaultTransport if options.Cert != "" && options.Key != "" { tlsCert, err := tls.LoadX509KeyPair(options.Cert, options.Key) diff --git a/sd/etcd/doc.go b/sd/etcd/doc.go index f8bd5bf..11add79 100644 --- a/sd/etcd/doc.go +++ b/sd/etcd/doc.go @@ -1,2 +1,4 @@ -// Package etcd provides a subscriber implementation for etcd. +// Package etcd provides a Subscriber and Registrar implementation for etcd. If +// you use etcd as your service discovery system, this package will help you +// implement the registration and client-side load balancing patterns. package etcd diff --git a/sd/etcd/example_test.go b/sd/etcd/example_test.go new file mode 100644 index 0000000..795164c --- /dev/null +++ b/sd/etcd/example_test.go @@ -0,0 +1,67 @@ +package etcd + +import ( + "io" + "time" + + "golang.org/x/net/context" + + "github.com/go-kit/kit/endpoint" + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/sd/lb" +) + +func Example() { + // Let's say this is a service that means to register itself. + // First, we will set up some context. + var ( + etcdServer = "http://10.0.0.1:2379" // don't forget schema and port! + prefix = "/services/foosvc/" // known at compile time + instance = "1.2.3.4:8080" // taken from runtime or platform, somehow + key = prefix + instance // should be globally unique + value = "http://" + instance // based on our transport + ctx = context.Background() + ) + + // Build the client. + client, err := NewClient(ctx, []string{etcdServer}, ClientOptions{}) + if err != nil { + panic(err) + } + + // Build the registrar. + registrar := NewRegistrar(client, Service{ + Key: key, + Value: value, + }, log.NewNopLogger()) + + // Register our instance. + registrar.Register() + + // At the end of our service lifecycle, for example at the end of func main, + // we should make sure to deregister ourselves. This is important! Don't + // accidentally skip this step by invoking a log.Fatal or os.Exit in the + // interim, which bypasses the defer stack. + defer registrar.Deregister() + + // It's likely that we'll also want to connect to other services and call + // their methods. We can build a subscriber to listen for changes from etcd + // and build endpoints, wrap it with a load-balancer to pick a single + // endpoint, and finally wrap it with a retry strategy to get something that + // can be used as an endpoint directly. + barPrefix := "/services/barsvc" + subscriber, err := NewSubscriber(client, barPrefix, barFactory, log.NewNopLogger()) + if err != nil { + panic(err) + } + balancer := lb.NewRoundRobin(subscriber) + retry := lb.Retry(3, 3*time.Second, balancer) + + // And now retry can be used like any other endpoint. + req := struct{}{} + if _, err = retry(ctx, req); err != nil { + panic(err) + } +} + +func barFactory(string) (endpoint.Endpoint, io.Closer, error) { return endpoint.Nop, nil, nil } diff --git a/sd/etcd/registrar.go b/sd/etcd/registrar.go index 4e3d15b..52b632a 100644 --- a/sd/etcd/registrar.go +++ b/sd/etcd/registrar.go @@ -2,6 +2,7 @@ import ( etcd "github.com/coreos/etcd/client" + "github.com/go-kit/kit/log" ) @@ -12,28 +13,30 @@ logger log.Logger } -// Service holds the key, value and instance identifying data you -// want to publish to etcd. +// Service holds the instance identifying data you want to publish to etcd. Key +// must be unique, and value is the string returned to subscribers, typically +// called the "instance" string in other parts of package sd. type Service struct { - Key string // discovery key, example: /myorganization/myplatform/ - Value string // service name value, example: addsvc + Key string // unique key, e.g. "/service/foobar/1.2.3.4:8080" + Value string // returned to subscribers, e.g. "http://1.2.3.4:8080" DeleteOptions *etcd.DeleteOptions } // NewRegistrar returns a etcd Registrar acting on the provided catalog -// registration. +// registration (service). func NewRegistrar(client Client, service Service, logger log.Logger) *Registrar { return &Registrar{ client: client, service: service, logger: log.NewContext(logger).With( + "key", service.Key, "value", service.Value, - "key", service.Key, ), } } -// Register implements sd.Registrar interface. +// Register implements the sd.Registrar interface. Call it when you want your +// service to be registered in etcd, typically at startup. func (r *Registrar) Register() { if err := r.client.Register(r.service); err != nil { r.logger.Log("err", err) @@ -42,7 +45,8 @@ } } -// Deregister implements sd.Registrar interface. +// Deregister implements the sd.Registrar interface. Call it when you want your +// service to be deregistered from etcd, typically just prior to shutdown. func (r *Registrar) Deregister() { if err := r.client.Deregister(r.service); err != nil { r.logger.Log("err", err)