Updates, cleanup, and replace example
Peter Bourgon
7 years ago
48 | 48 | ctx context.Context |
49 | 49 | } |
50 | 50 | |
51 | // ClientOptions defines options for the etcd client. | |
51 | // ClientOptions defines options for the etcd client. All values are optional. | |
52 | // If any duration is not specified, a default of 3 seconds will be used. | |
52 | 53 | type ClientOptions struct { |
53 | 54 | Cert string |
54 | 55 | Key string |
58 | 59 | HeaderTimeoutPerRequest time.Duration |
59 | 60 | } |
60 | 61 | |
61 | // NewClient returns an *etcd.Client with a connection to the named machines. | |
62 | // It will return an error if a connection to the cluster cannot be made. | |
63 | // The parameter machines needs to be a full URL with schemas. | |
64 | // e.g. "http://localhost:2379" will work, but "localhost:2379" will not. | |
62 | // NewClient returns Client with a connection to the named machines. It will | |
63 | // return an error if a connection to the cluster cannot be made. The parameter | |
64 | // machines needs to be a full URL with schemas. e.g. "http://localhost:2379" | |
65 | // will work, but "localhost:2379" will not. | |
65 | 66 | func NewClient(ctx context.Context, machines []string, options ClientOptions) (Client, error) { |
67 | if options.DialTimeout == 0 { | |
68 | options.DialTimeout = 3 * time.Second | |
69 | } | |
70 | if options.DialKeepAlive == 0 { | |
71 | options.DialKeepAlive = 3 * time.Second | |
72 | } | |
73 | if options.HeaderTimeoutPerRequest == 0 { | |
74 | options.HeaderTimeoutPerRequest = 3 * time.Second | |
75 | } | |
76 | ||
66 | 77 | transport := etcd.DefaultTransport |
67 | 78 | if options.Cert != "" && options.Key != "" { |
68 | 79 | tlsCert, err := tls.LoadX509KeyPair(options.Cert, options.Key) |
0 | // Package etcd provides a subscriber implementation for etcd. | |
0 | // Package etcd provides a Subscriber and Registrar implementation for etcd. If | |
1 | // you use etcd as your service discovery system, this package will help you | |
2 | // implement the registration and client-side load balancing patterns. | |
1 | 3 | package etcd |
0 | package etcd | |
1 | ||
2 | import ( | |
3 | "io" | |
4 | "time" | |
5 | ||
6 | "golang.org/x/net/context" | |
7 | ||
8 | "github.com/go-kit/kit/endpoint" | |
9 | "github.com/go-kit/kit/log" | |
10 | "github.com/go-kit/kit/sd/lb" | |
11 | ) | |
12 | ||
13 | func Example() { | |
14 | // Let's say this is a service that means to register itself. | |
15 | // First, we will set up some context. | |
16 | var ( | |
17 | etcdServer = "http://10.0.0.1:2379" // don't forget schema and port! | |
18 | prefix = "/services/foosvc/" // known at compile time | |
19 | instance = "1.2.3.4:8080" // taken from runtime or platform, somehow | |
20 | key = prefix + instance // should be globally unique | |
21 | value = "http://" + instance // based on our transport | |
22 | ctx = context.Background() | |
23 | ) | |
24 | ||
25 | // Build the client. | |
26 | client, err := NewClient(ctx, []string{etcdServer}, ClientOptions{}) | |
27 | if err != nil { | |
28 | panic(err) | |
29 | } | |
30 | ||
31 | // Build the registrar. | |
32 | registrar := NewRegistrar(client, Service{ | |
33 | Key: key, | |
34 | Value: value, | |
35 | }, log.NewNopLogger()) | |
36 | ||
37 | // Register our instance. | |
38 | registrar.Register() | |
39 | ||
40 | // At the end of our service lifecycle, for example at the end of func main, | |
41 | // we should make sure to deregister ourselves. This is important! Don't | |
42 | // accidentally skip this step by invoking a log.Fatal or os.Exit in the | |
43 | // interim, which bypasses the defer stack. | |
44 | defer registrar.Deregister() | |
45 | ||
46 | // It's likely that we'll also want to connect to other services and call | |
47 | // their methods. We can build a subscriber to listen for changes from etcd | |
48 | // and build endpoints, wrap it with a load-balancer to pick a single | |
49 | // endpoint, and finally wrap it with a retry strategy to get something that | |
50 | // can be used as an endpoint directly. | |
51 | barPrefix := "/services/barsvc" | |
52 | subscriber, err := NewSubscriber(client, barPrefix, barFactory, log.NewNopLogger()) | |
53 | if err != nil { | |
54 | panic(err) | |
55 | } | |
56 | balancer := lb.NewRoundRobin(subscriber) | |
57 | retry := lb.Retry(3, 3*time.Second, balancer) | |
58 | ||
59 | // And now retry can be used like any other endpoint. | |
60 | req := struct{}{} | |
61 | if _, err = retry(ctx, req); err != nil { | |
62 | panic(err) | |
63 | } | |
64 | } | |
65 | ||
66 | func barFactory(string) (endpoint.Endpoint, io.Closer, error) { return endpoint.Nop, nil, nil } |
1 | 1 | |
2 | 2 | import ( |
3 | 3 | etcd "github.com/coreos/etcd/client" |
4 | ||
4 | 5 | "github.com/go-kit/kit/log" |
5 | 6 | ) |
6 | 7 | |
11 | 12 | logger log.Logger |
12 | 13 | } |
13 | 14 | |
14 | // Service holds the key, value and instance identifying data you | |
15 | // want to publish to etcd. | |
15 | // Service holds the instance identifying data you want to publish to etcd. Key | |
16 | // must be unique, and value is the string returned to subscribers, typically | |
17 | // called the "instance" string in other parts of package sd. | |
16 | 18 | type Service struct { |
17 | Key string // discovery key, example: /myorganization/myplatform/ | |
18 | Value string // service name value, example: addsvc | |
19 | Key string // unique key, e.g. "/service/foobar/1.2.3.4:8080" | |
20 | Value string // returned to subscribers, e.g. "http://1.2.3.4:8080" | |
19 | 21 | DeleteOptions *etcd.DeleteOptions |
20 | 22 | } |
21 | 23 | |
22 | 24 | // NewRegistrar returns a etcd Registrar acting on the provided catalog |
23 | // registration. | |
25 | // registration (service). | |
24 | 26 | func NewRegistrar(client Client, service Service, logger log.Logger) *Registrar { |
25 | 27 | return &Registrar{ |
26 | 28 | client: client, |
27 | 29 | service: service, |
28 | 30 | logger: log.NewContext(logger).With( |
31 | "key", service.Key, | |
29 | 32 | "value", service.Value, |
30 | "key", service.Key, | |
31 | 33 | ), |
32 | 34 | } |
33 | 35 | } |
34 | 36 | |
35 | // Register implements sd.Registrar interface. | |
37 | // Register implements the sd.Registrar interface. Call it when you want your | |
38 | // service to be registered in etcd, typically at startup. | |
36 | 39 | func (r *Registrar) Register() { |
37 | 40 | if err := r.client.Register(r.service); err != nil { |
38 | 41 | r.logger.Log("err", err) |
41 | 44 | } |
42 | 45 | } |
43 | 46 | |
44 | // Deregister implements sd.Registrar interface. | |
47 | // Deregister implements the sd.Registrar interface. Call it when you want your | |
48 | // service to be deregistered from etcd, typically just prior to shutdown. | |
45 | 49 | func (r *Registrar) Deregister() { |
46 | 50 | if err := r.client.Deregister(r.service); err != nil { |
47 | 51 | r.logger.Log("err", err) |