Codebase list golang-github-go-kit-kit / 4e226bf
Add Consul publisher Implements a Publisher which connects to Consul and returns the Endpoints for the requested service. Addtionally filtered by tags where every tag beyond the first one needs to be filtered in the client. Alexander Simmerl 8 years ago
3 changed file(s) with 412 addition(s) and 0 deletion(s). Raw diff Collapse all Expand all
0 package consul
1
2 import consul "github.com/hashicorp/consul/api"
3
4 // Client is a wrapper around the Consul API.
5 type Client interface {
6 Service(service string, tag string, queryOpts *consul.QueryOptions) ([]*consul.ServiceEntry, *consul.QueryMeta, error)
7 }
8
9 type client struct {
10 consul *consul.Client
11 }
12
13 // NewClient returns an implementation of the Client interface expecting a fully
14 // setup Consul Client.
15 func NewClient(c *consul.Client) Client {
16 return &client{
17 consul: c,
18 }
19 }
20
21 // GetInstances returns the list of healthy entries for a given service filtered
22 // by tag.
23 func (c *client) Service(
24 service string,
25 tag string,
26 opts *consul.QueryOptions,
27 ) ([]*consul.ServiceEntry, *consul.QueryMeta, error) {
28 return c.consul.Health().Service(service, tag, true, opts)
29 }
0 package consul
1
2 import (
3 "fmt"
4
5 consul "github.com/hashicorp/consul/api"
6
7 "github.com/go-kit/kit/endpoint"
8 "github.com/go-kit/kit/loadbalancer"
9 "github.com/go-kit/kit/log"
10 )
11
12 const defaultIndex = 0
13
14 // Publisher yields endpoints for a service in Consul. Updates to the service
15 // are watched and will update the Publisher endpoints.
16 type Publisher struct {
17 cache *loadbalancer.EndpointCache
18 client Client
19 logger log.Logger
20 service string
21 tags []string
22 endpointsc chan []endpoint.Endpoint
23 quitc chan struct{}
24 }
25
26 // NewPublisher returns a Consul publisher which returns Endpoints for the
27 // requested service. It only returns instances for which all of the passed
28 // tags are present.
29 func NewPublisher(
30 client Client,
31 factory loadbalancer.Factory,
32 logger log.Logger,
33 service string,
34 tags ...string,
35 ) (*Publisher, error) {
36 logger = log.NewContext(logger).With("component", "Consul Publisher")
37
38 p := &Publisher{
39 cache: loadbalancer.NewEndpointCache(factory, logger),
40 client: client,
41 logger: logger,
42 service: service,
43 tags: tags,
44 quitc: make(chan struct{}),
45 }
46
47 is, index, err := p.getInstances(defaultIndex)
48 if err != nil {
49 return nil, err
50 }
51
52 p.cache.Replace(is)
53
54 go p.loop(index)
55
56 return p, nil
57 }
58
59 // Endpoints implements the Publisher interface.
60 func (p *Publisher) Endpoints() ([]endpoint.Endpoint, error) {
61 return p.cache.Endpoints()
62 }
63
64 // Stop terminates the publisher.
65 func (p *Publisher) Stop() {
66 close(p.quitc)
67 }
68
69 func (p *Publisher) loop(lastIndex uint64) {
70 var (
71 errc = make(chan error, 1)
72 resc = make(chan response, 1)
73 )
74
75 for {
76 go func() {
77 is, index, err := p.getInstances(lastIndex)
78 if err != nil {
79 errc <- err
80 return
81 }
82
83 resc <- response{
84 index: index,
85 instances: is,
86 }
87 }()
88
89 select {
90 case err := <-errc:
91 p.logger.Log("service", p.service, "err", err)
92 case res := <-resc:
93 p.cache.Replace(res.instances)
94 lastIndex = res.index
95 case <-p.quitc:
96 return
97 }
98 }
99 }
100
101 func (p *Publisher) getInstances(lastIndex uint64) ([]string, uint64, error) {
102 tag := ""
103
104 if len(p.tags) > 0 {
105 tag = p.tags[0]
106 }
107
108 entries, meta, err := p.client.Service(
109 p.service,
110 tag,
111 &consul.QueryOptions{
112 WaitIndex: lastIndex,
113 },
114 )
115 if err != nil {
116 return nil, 0, err
117 }
118
119 // If more than one tag is passed we need to filter it in the publisher until
120 // Consul supports multiple tags[0].
121 //
122 // [0] https://github.com/hashicorp/consul/issues/294
123 if len(p.tags) > 1 {
124 entries = filterEntries(entries, p.tags[1:]...)
125 }
126
127 return makeInstances(entries), meta.LastIndex, nil
128 }
129
130 // response is used as container to transport instances as well as the updated
131 // index.
132 type response struct {
133 index uint64
134 instances []string
135 }
136
137 func filterEntries(entries []*consul.ServiceEntry, tags ...string) []*consul.ServiceEntry {
138 var es []*consul.ServiceEntry
139
140 ENTRIES:
141 for _, entry := range entries {
142 ts := make(map[string]struct{}, len(entry.Service.Tags))
143
144 for _, tag := range entry.Service.Tags {
145 ts[tag] = struct{}{}
146 }
147
148 for _, tag := range tags {
149 if _, ok := ts[tag]; !ok {
150 continue ENTRIES
151 }
152 }
153
154 es = append(es, entry)
155 }
156
157 return es
158 }
159
160 func makeInstances(entries []*consul.ServiceEntry) []string {
161 is := make([]string, len(entries))
162
163 for i, entry := range entries {
164 addr := entry.Node.Address
165
166 if entry.Service.Address != "" {
167 addr = entry.Service.Address
168 }
169
170 is[i] = fmt.Sprintf("%s:%d", addr, entry.Service.Port)
171 }
172
173 return is
174 }
0 package consul
1
2 import (
3 "io"
4 "testing"
5
6 consul "github.com/hashicorp/consul/api"
7 "golang.org/x/net/context"
8
9 "github.com/go-kit/kit/endpoint"
10 "github.com/go-kit/kit/log"
11 )
12
13 var consulState = []*consul.ServiceEntry{
14 &consul.ServiceEntry{
15 Node: &consul.Node{
16 Address: "10.0.0.0",
17 Node: "app00.local",
18 },
19 Service: &consul.AgentService{
20 ID: "search-api-0",
21 Port: 8000,
22 Service: "search",
23 Tags: []string{
24 "api",
25 "v1",
26 },
27 },
28 },
29 &consul.ServiceEntry{
30 Node: &consul.Node{
31 Address: "10.0.0.1",
32 Node: "app01.local",
33 },
34 Service: &consul.AgentService{
35 ID: "search-api-1",
36 Port: 8001,
37 Service: "search",
38 Tags: []string{
39 "api",
40 "v2",
41 },
42 },
43 },
44 &consul.ServiceEntry{
45 Node: &consul.Node{
46 Address: "10.0.0.1",
47 Node: "app01.local",
48 },
49 Service: &consul.AgentService{
50 Address: "10.0.0.10",
51 ID: "search-db-0",
52 Port: 9000,
53 Service: "search",
54 Tags: []string{
55 "db",
56 },
57 },
58 },
59 }
60
61 func TestPublisher(t *testing.T) {
62 var (
63 logger = log.NewNopLogger()
64 client = newTestClient(consulState)
65 )
66
67 p, err := NewPublisher(client, testFactory, logger, "search", "api")
68 if err != nil {
69 t.Fatalf("publisher setup failed: %s", err)
70 }
71 defer p.Stop()
72
73 eps, err := p.Endpoints()
74 if err != nil {
75 t.Fatalf("endpoints failed: %s", err)
76 }
77
78 if have, want := len(eps), 2; have != want {
79 t.Errorf("have %v, want %v", have, want)
80 }
81 }
82
83 func TestPublisherNoService(t *testing.T) {
84 var (
85 logger = log.NewNopLogger()
86 client = newTestClient(consulState)
87 )
88
89 p, err := NewPublisher(client, testFactory, logger, "feed")
90 if err != nil {
91 t.Fatalf("publisher setup failed: %s", err)
92 }
93 defer p.Stop()
94
95 eps, err := p.Endpoints()
96 if err != nil {
97 t.Fatalf("endpoints failed: %s", err)
98 }
99
100 if have, want := len(eps), 0; have != want {
101 t.Fatalf("have %v, want %v", have, want)
102 }
103 }
104
105 func TestPublisherWithTags(t *testing.T) {
106 var (
107 logger = log.NewNopLogger()
108 client = newTestClient(consulState)
109 )
110
111 p, err := NewPublisher(client, testFactory, logger, "search", "api", "v2")
112 if err != nil {
113 t.Fatalf("publisher setup failed: %s", err)
114 }
115 defer p.Stop()
116
117 eps, err := p.Endpoints()
118 if err != nil {
119 t.Fatalf("endpoints failed: %s", err)
120 }
121
122 if have, want := len(eps), 1; have != want {
123 t.Fatalf("have %v, want %v", have, want)
124 }
125 }
126
127 func TestPublisherAddressOverride(t *testing.T) {
128 var (
129 ctx = context.Background()
130 logger = log.NewNopLogger()
131 client = newTestClient(consulState)
132 )
133
134 p, err := NewPublisher(client, testFactory, logger, "search", "db")
135 if err != nil {
136 t.Fatalf("publisher setup failed: %s", err)
137 }
138 defer p.Stop()
139
140 eps, err := p.Endpoints()
141 if err != nil {
142 t.Fatalf("endpoints failed: %s", err)
143 }
144
145 if have, want := len(eps), 1; have != want {
146 t.Fatalf("have %v, want %v", have, want)
147 }
148
149 ins, err := eps[0](ctx, struct{}{})
150 if err != nil {
151 t.Fatal(err)
152 }
153
154 if have, want := ins.(string), "10.0.0.10:9000"; have != want {
155 t.Errorf("have %#v, want %#v", have, want)
156 }
157 }
158
159 type testClient struct {
160 entries []*consul.ServiceEntry
161 }
162
163 func newTestClient(entries []*consul.ServiceEntry) Client {
164 if entries == nil {
165 entries = []*consul.ServiceEntry{}
166 }
167
168 return &testClient{
169 entries: entries,
170 }
171 }
172
173 func (c *testClient) Service(
174 service string,
175 tag string,
176 opts *consul.QueryOptions,
177 ) ([]*consul.ServiceEntry, *consul.QueryMeta, error) {
178 es := []*consul.ServiceEntry{}
179
180 for _, e := range c.entries {
181 if e.Service.Service != service {
182 continue
183 }
184 if tag != "" {
185 tagMap := map[string]struct{}{}
186
187 for _, t := range e.Service.Tags {
188 tagMap[t] = struct{}{}
189 }
190
191 if _, ok := tagMap[tag]; !ok {
192 continue
193 }
194 }
195
196 es = append(es, e)
197 }
198
199 return es, &consul.QueryMeta{}, nil
200 }
201
202 func testFactory(ins string) (endpoint.Endpoint, io.Closer, error) {
203 return func(context.Context, interface{}) (interface{}, error) {
204 return ins, nil
205 }, nil, nil
206 }