Codebase list golang-github-go-kit-kit / 62e6b81
Add NATS transport (#680) NATS is an open-source, cloud-native messaging system (https://www.nats.io). The functional provides API that lets one works with NATS in a similar way as HTTP. - nats.MsgHandler could be used in queue or simple subscriber - Sync publisher Kirill Parasotchenko authored 5 years ago Peter Bourgon committed 5 years ago
8 changed file(s) with 1268 addition(s) and 0 deletion(s). Raw diff Collapse all Expand all
0 package main
1
2 import (
3 "context"
4 "encoding/json"
5 "errors"
6 "log"
7 "strings"
8 "flag"
9 "net/http"
10
11 "github.com/go-kit/kit/endpoint"
12 natstransport "github.com/go-kit/kit/transport/nats"
13 httptransport "github.com/go-kit/kit/transport/http"
14
15 "github.com/nats-io/go-nats"
16 )
17
18 // StringService provides operations on strings.
19 type StringService interface {
20 Uppercase(context.Context, string) (string, error)
21 Count(context.Context, string) int
22 }
23
24 // stringService is a concrete implementation of StringService
25 type stringService struct{}
26
27 func (stringService) Uppercase(_ context.Context, s string) (string, error) {
28 if s == "" {
29 return "", ErrEmpty
30 }
31 return strings.ToUpper(s), nil
32 }
33
34 func (stringService) Count(_ context.Context, s string) int {
35 return len(s)
36 }
37
38 // ErrEmpty is returned when an input string is empty.
39 var ErrEmpty = errors.New("empty string")
40
41 // For each method, we define request and response structs
42 type uppercaseRequest struct {
43 S string `json:"s"`
44 }
45
46 type uppercaseResponse struct {
47 V string `json:"v"`
48 Err string `json:"err,omitempty"` // errors don't define JSON marshaling
49 }
50
51 type countRequest struct {
52 S string `json:"s"`
53 }
54
55 type countResponse struct {
56 V int `json:"v"`
57 }
58
59 // Endpoints are a primary abstraction in go-kit. An endpoint represents a single RPC (method in our service interface)
60 func makeUppercaseHTTPEndpoint(nc *nats.Conn) endpoint.Endpoint {
61 return natstransport.NewPublisher(
62 nc,
63 "stringsvc.uppercase",
64 natstransport.EncodeJSONRequest,
65 decodeUppercaseResponse,
66 ).Endpoint()
67 }
68
69 func makeCountHTTPEndpoint(nc *nats.Conn) endpoint.Endpoint {
70 return natstransport.NewPublisher(
71 nc,
72 "stringsvc.count",
73 natstransport.EncodeJSONRequest,
74 decodeCountResponse,
75 ).Endpoint()
76 }
77
78 func makeUppercaseEndpoint(svc StringService) endpoint.Endpoint {
79 return func(ctx context.Context, request interface{}) (interface{}, error) {
80 req := request.(uppercaseRequest)
81 v, err := svc.Uppercase(ctx, req.S)
82 if err != nil {
83 return uppercaseResponse{v, err.Error()}, nil
84 }
85 return uppercaseResponse{v, ""}, nil
86 }
87 }
88
89 func makeCountEndpoint(svc StringService) endpoint.Endpoint {
90 return func(ctx context.Context, request interface{}) (interface{}, error) {
91 req := request.(countRequest)
92 v := svc.Count(ctx, req.S)
93 return countResponse{v}, nil
94 }
95 }
96
97 // Transports expose the service to the network. In this fourth example we utilize JSON over NATS and HTTP.
98 func main() {
99 svc := stringService{}
100
101 natsURL := flag.String("nats-url", nats.DefaultURL, "URL for connection to NATS")
102 flag.Parse()
103
104 nc, err := nats.Connect(*natsURL)
105 if err != nil {
106 log.Fatal(err)
107 }
108 defer nc.Close()
109
110 uppercaseHTTPHandler := httptransport.NewServer(
111 makeUppercaseHTTPEndpoint(nc),
112 decodeUppercaseHTTPRequest,
113 httptransport.EncodeJSONResponse,
114 )
115
116 countHTTPHandler := httptransport.NewServer(
117 makeCountHTTPEndpoint(nc),
118 decodeCountHTTPRequest,
119 httptransport.EncodeJSONResponse,
120 )
121
122 uppercaseHandler := natstransport.NewSubscriber(
123 makeUppercaseEndpoint(svc),
124 decodeUppercaseRequest,
125 natstransport.EncodeJSONResponse,
126 )
127
128 countHandler := natstransport.NewSubscriber(
129 makeCountEndpoint(svc),
130 decodeCountRequest,
131 natstransport.EncodeJSONResponse,
132 )
133
134 uSub, err := nc.QueueSubscribe("stringsvc.uppercase", "stringsvc", uppercaseHandler.ServeMsg(nc))
135 if err != nil {
136 log.Fatal(err)
137 }
138 defer uSub.Unsubscribe()
139
140 cSub, err := nc.QueueSubscribe("stringsvc.count", "stringsvc", countHandler.ServeMsg(nc))
141 if err != nil {
142 log.Fatal(err)
143 }
144 defer cSub.Unsubscribe()
145
146 http.Handle("/uppercase", uppercaseHTTPHandler)
147 http.Handle("/count", countHTTPHandler)
148 log.Fatal(http.ListenAndServe(":8080", nil))
149
150 }
151
152 func decodeUppercaseHTTPRequest(_ context.Context, r *http.Request) (interface{}, error) {
153 var request uppercaseRequest
154 if err := json.NewDecoder(r.Body).Decode(&request); err != nil {
155 return nil, err
156 }
157 return request, nil
158 }
159
160 func decodeCountHTTPRequest(_ context.Context, r *http.Request) (interface{}, error) {
161 var request countRequest
162 if err := json.NewDecoder(r.Body).Decode(&request); err != nil {
163 return nil, err
164 }
165 return request, nil
166 }
167
168 func decodeUppercaseResponse(_ context.Context, msg *nats.Msg) (interface{}, error) {
169 var response uppercaseResponse
170
171 if err := json.Unmarshal(msg.Data, &response); err != nil {
172 return nil, err
173 }
174
175 return response, nil
176 }
177
178 func decodeCountResponse(_ context.Context, msg *nats.Msg) (interface{}, error) {
179 var response countResponse
180
181 if err := json.Unmarshal(msg.Data, &response); err != nil {
182 return nil, err
183 }
184
185 return response, nil
186 }
187
188 func decodeUppercaseRequest(_ context.Context, msg *nats.Msg) (interface{}, error) {
189 var request uppercaseRequest
190
191 if err := json.Unmarshal(msg.Data, &request); err != nil {
192 return nil, err
193 }
194 return request, nil
195 }
196
197 func decodeCountRequest(_ context.Context, msg *nats.Msg) (interface{}, error) {
198 var request countRequest
199
200 if err := json.Unmarshal(msg.Data, &request); err != nil {
201 return nil, err
202 }
203 return request, nil
204 }
205
0 // Package nats provides a NATS transport.
1 package nats
0 package nats
1
2 import (
3 "context"
4
5 "github.com/nats-io/go-nats"
6 )
7
8 // DecodeRequestFunc extracts a user-domain request object from a publisher
9 // request object. It's designed to be used in NATS subscribers, for subscriber-side
10 // endpoints. One straightforward DecodeRequestFunc could be something that
11 // JSON decodes from the request body to the concrete response type.
12 type DecodeRequestFunc func(context.Context, *nats.Msg) (request interface{}, err error)
13
14 // EncodeRequestFunc encodes the passed request object into the NATS request
15 // object. It's designed to be used in NATS publishers, for publisher-side
16 // endpoints. One straightforward EncodeRequestFunc could something that JSON
17 // encodes the object directly to the request payload.
18 type EncodeRequestFunc func(context.Context, *nats.Msg, interface{}) error
19
20 // EncodeResponseFunc encodes the passed response object to the subscriber reply.
21 // It's designed to be used in NATS subscribers, for subscriber-side
22 // endpoints. One straightforward EncodeResponseFunc could be something that
23 // JSON encodes the object directly to the response body.
24 type EncodeResponseFunc func(context.Context, string, *nats.Conn, interface{}) error
25
26 // DecodeResponseFunc extracts a user-domain response object from an NATS
27 // response object. It's designed to be used in NATS publisher, for publisher-side
28 // endpoints. One straightforward DecodeResponseFunc could be something that
29 // JSON decodes from the response payload to the concrete response type.
30 type DecodeResponseFunc func(context.Context, *nats.Msg) (response interface{}, err error)
31
0 package nats
1
2 import (
3 "context"
4 "encoding/json"
5 "github.com/go-kit/kit/endpoint"
6 "github.com/nats-io/go-nats"
7 "time"
8 )
9
10 // Publisher wraps a URL and provides a method that implements endpoint.Endpoint.
11 type Publisher struct {
12 publisher *nats.Conn
13 subject string
14 enc EncodeRequestFunc
15 dec DecodeResponseFunc
16 before []RequestFunc
17 after []PublisherResponseFunc
18 timeout time.Duration
19 }
20
21 // NewClient constructs a usable Publisher for a single remote method.
22 func NewPublisher(
23 publisher *nats.Conn,
24 subject string,
25 enc EncodeRequestFunc,
26 dec DecodeResponseFunc,
27 options ...PublisherOption,
28 ) *Publisher {
29 p := &Publisher{
30 publisher: publisher,
31 subject: subject,
32 enc: enc,
33 dec: dec,
34 timeout: 10 * time.Second,
35 }
36 for _, option := range options {
37 option(p)
38 }
39 return p
40 }
41
42 // PublisherOption sets an optional parameter for clients.
43 type PublisherOption func(*Publisher)
44
45 // PublisherBefore sets the RequestFuncs that are applied to the outgoing NATS
46 // request before it's invoked.
47 func PublisherBefore(before ...RequestFunc) PublisherOption {
48 return func(p *Publisher) { p.before = append(p.before, before...) }
49 }
50
51 // PublisherAfter sets the ClientResponseFuncs applied to the incoming NATS
52 // request prior to it being decoded. This is useful for obtaining anything off
53 // of the response and adding onto the context prior to decoding.
54 func PublisherAfter(after ...PublisherResponseFunc) PublisherOption {
55 return func(p *Publisher) { p.after = append(p.after, after...) }
56 }
57
58 // PublisherTimeout sets the available timeout for NATS request.
59 func PublisherTimeout(timeout time.Duration) PublisherOption {
60 return func(p *Publisher) { p.timeout = timeout }
61 }
62
63 // Endpoint returns a usable endpoint that invokes the remote endpoint.
64 func (p Publisher) Endpoint() endpoint.Endpoint {
65 return func(ctx context.Context, request interface{}) (interface{}, error) {
66 ctx, cancel := context.WithTimeout(context.Background(), p.timeout)
67 defer cancel()
68
69 msg := nats.Msg{Subject: p.subject}
70
71 if err := p.enc(ctx, &msg, request); err != nil {
72 return nil, err
73 }
74
75 for _, f := range p.before {
76 ctx = f(ctx, &msg)
77 }
78
79 resp, err := p.publisher.RequestWithContext(ctx, msg.Subject, msg.Data)
80 if err != nil {
81 return nil, err
82 }
83
84 for _, f := range p.after {
85 ctx = f(ctx, resp)
86 }
87
88 response, err := p.dec(ctx, resp)
89 if err != nil {
90 return nil, err
91 }
92
93 return response, nil
94 }
95 }
96
97 // EncodeJSONRequest is an EncodeRequestFunc that serializes the request as a
98 // JSON object to the Data of the Msg. Many JSON-over-NATS services can use it as
99 // a sensible default.
100 func EncodeJSONRequest(_ context.Context, msg *nats.Msg, request interface{}) error {
101 b, err := json.Marshal(request)
102 if err != nil {
103 return err
104 }
105
106 msg.Data = b
107
108 return nil
109 }
0 package nats_test
1
2 import (
3 "testing"
4 "context"
5 "time"
6 "strings"
7
8 "github.com/nats-io/go-nats"
9 natstransport "github.com/go-kit/kit/transport/nats"
10 )
11
12 func TestPublisher(t *testing.T) {
13 var (
14 testdata = "testdata"
15 encode = func(context.Context, *nats.Msg, interface{}) error { return nil }
16 decode = func(_ context.Context, msg *nats.Msg) (interface{}, error) {
17 return TestResponse{string(msg.Data), ""}, nil
18 }
19 )
20
21 nc, err := nats.Connect(nats.DefaultURL)
22 if err != nil {
23 t.Fatal(err)
24 }
25 defer nc.Close()
26
27 sub, err := nc.QueueSubscribe("natstransport.test", "natstransport", func(msg *nats.Msg) {
28 if err := nc.Publish(msg.Reply, []byte(testdata)); err != nil {
29 t.Fatal(err)
30 }
31 })
32 if err != nil {
33 t.Fatal(err)
34 }
35 defer sub.Unsubscribe()
36
37 publisher := natstransport.NewPublisher(
38 nc,
39 "natstransport.test",
40 encode,
41 decode,
42 )
43
44 res, err := publisher.Endpoint()(context.Background(), struct{}{})
45 if err != nil {
46 t.Fatal(err)
47 }
48
49 response, ok := res.(TestResponse)
50 if !ok {
51 t.Fatal("response should be TestResponse")
52 }
53 if want, have := testdata, response.String; want != have {
54 t.Errorf("want %q, have %q", want, have)
55 }
56
57 }
58
59 func TestPublisherBefore(t *testing.T) {
60 var (
61 testdata = "testdata"
62 encode = func(context.Context, *nats.Msg, interface{}) error { return nil }
63 decode = func(_ context.Context, msg *nats.Msg) (interface{}, error) {
64 return TestResponse{string(msg.Data), ""}, nil
65 }
66 )
67
68 nc, err := nats.Connect(nats.DefaultURL)
69 if err != nil {
70 t.Fatal(err)
71 }
72 defer nc.Close()
73
74 sub, err := nc.QueueSubscribe("natstransport.test", "natstransport", func(msg *nats.Msg) {
75 if err := nc.Publish(msg.Reply, msg.Data); err != nil {
76 t.Fatal(err)
77 }
78 })
79 if err != nil {
80 t.Fatal(err)
81 }
82 defer sub.Unsubscribe()
83
84 publisher := natstransport.NewPublisher(
85 nc,
86 "natstransport.test",
87 encode,
88 decode,
89 natstransport.PublisherBefore(func(ctx context.Context, msg *nats.Msg) context.Context {
90 msg.Data = []byte(strings.ToUpper(string(testdata)))
91 return ctx
92 }),
93 )
94
95 res, err := publisher.Endpoint()(context.Background(), struct{}{})
96 if err != nil {
97 t.Fatal(err)
98 }
99
100 response, ok := res.(TestResponse)
101 if !ok {
102 t.Fatal("response should be TestResponse")
103 }
104 if want, have := strings.ToUpper(testdata), response.String; want != have {
105 t.Errorf("want %q, have %q", want, have)
106 }
107
108 }
109
110 func TestPublisherAfter(t *testing.T) {
111 var (
112 testdata = "testdata"
113 encode = func(context.Context, *nats.Msg, interface{}) error { return nil }
114 decode = func(_ context.Context, msg *nats.Msg) (interface{}, error) {
115 return TestResponse{string(msg.Data), ""}, nil
116 }
117 )
118
119 nc, err := nats.Connect(nats.DefaultURL)
120 if err != nil {
121 t.Fatal(err)
122 }
123 defer nc.Close()
124
125 sub, err := nc.QueueSubscribe("natstransport.test", "natstransport", func(msg *nats.Msg) {
126 if err := nc.Publish(msg.Reply, []byte(testdata)); err != nil {
127 t.Fatal(err)
128 }
129 })
130 if err != nil {
131 t.Fatal(err)
132 }
133 defer sub.Unsubscribe()
134
135 publisher := natstransport.NewPublisher(
136 nc,
137 "natstransport.test",
138 encode,
139 decode,
140 natstransport.PublisherAfter(func(ctx context.Context, msg *nats.Msg) context.Context {
141 msg.Data = []byte(strings.ToUpper(string(msg.Data)))
142 return ctx
143 }),
144 )
145
146 res, err := publisher.Endpoint()(context.Background(), struct{}{})
147 if err != nil {
148 t.Fatal(err)
149 }
150
151 response, ok := res.(TestResponse)
152 if !ok {
153 t.Fatal("response should be TestResponse")
154 }
155 if want, have := strings.ToUpper(testdata), response.String; want != have {
156 t.Errorf("want %q, have %q", want, have)
157 }
158
159 }
160
161 func TestPublisherTimeout(t *testing.T) {
162 var (
163 encode = func(context.Context, *nats.Msg, interface{}) error { return nil }
164 decode = func(_ context.Context, msg *nats.Msg) (interface{}, error) {
165 return TestResponse{string(msg.Data), ""}, nil
166 }
167 )
168
169 nc, err := nats.Connect(nats.DefaultURL)
170 if err != nil {
171 t.Fatal(err)
172 }
173 defer nc.Close()
174
175 ch := make(chan struct{})
176 defer close(ch)
177
178 sub, err := nc.QueueSubscribe("natstransport.test", "natstransport", func(msg *nats.Msg) {
179 <-ch
180 })
181 if err != nil {
182 t.Fatal(err)
183 }
184 defer sub.Unsubscribe()
185
186 publisher := natstransport.NewPublisher(
187 nc,
188 "natstransport.test",
189 encode,
190 decode,
191 natstransport.PublisherTimeout(time.Second),
192 )
193
194 _, err = publisher.Endpoint()(context.Background(), struct{}{})
195 if err != context.DeadlineExceeded {
196 t.Errorf("want %s, have %s", context.DeadlineExceeded, err)
197
198 }
199
200 }
201
202 func TestEncodeJSONRequest(t *testing.T) {
203 var data string
204
205 nc, err := nats.Connect(nats.DefaultURL)
206 if err != nil {
207 t.Fatal(err)
208 }
209 defer nc.Close()
210
211 sub, err := nc.QueueSubscribe("natstransport.test", "natstransport", func(msg *nats.Msg) {
212 data = string(msg.Data)
213
214 if err := nc.Publish(msg.Reply, []byte("")); err != nil {
215 t.Fatal(err)
216 }
217 })
218 if err != nil {
219 t.Fatal(err)
220 }
221 defer sub.Unsubscribe()
222
223 publisher := natstransport.NewPublisher(
224 nc,
225 "natstransport.test",
226 natstransport.EncodeJSONRequest,
227 func(context.Context, *nats.Msg) (interface{}, error) { return nil, nil },
228 ).Endpoint()
229
230 for _, test := range []struct {
231 value interface{}
232 body string
233 }{
234 {nil, "null"},
235 {12, "12"},
236 {1.2, "1.2"},
237 {true, "true"},
238 {"test", "\"test\""},
239 {struct{ Foo string `json:"foo"` }{"foo"}, "{\"foo\":\"foo\"}"},
240 } {
241 if _, err := publisher(context.Background(), test.value); err != nil {
242 t.Fatal(err)
243 continue
244 }
245
246 if data != test.body {
247 t.Errorf("%v: actual %#v, expected %#v", test.value, data, test.body)
248 }
249 }
250
251 }
0 package nats
1
2 import (
3 "context"
4
5 "github.com/nats-io/go-nats"
6 )
7
8 // RequestFunc may take information from a publisher request and put it into a
9 // request context. In Subscribers, RequestFuncs are executed prior to invoking the
10 // endpoint.
11 type RequestFunc func(context.Context, *nats.Msg) context.Context
12
13 // SubscriberResponseFunc may take information from a request context and use it to
14 // manipulate a Publisher. SubscriberResponseFuncs are only executed in
15 // subscribers, after invoking the endpoint but prior to publishing a reply.
16 type SubscriberResponseFunc func(context.Context, *nats.Conn) context.Context
17
18 // PublisherResponseFunc may take information from an NATS request and make the
19 // response available for consumption. ClientResponseFuncs are only executed in
20 // clients, after a request has been made, but prior to it being decoded.
21 type PublisherResponseFunc func(context.Context, *nats.Msg) context.Context
0 package nats
1
2 import (
3 "context"
4 "encoding/json"
5
6 "github.com/go-kit/kit/endpoint"
7 "github.com/go-kit/kit/log"
8
9 "github.com/nats-io/go-nats"
10 )
11
12 // Subscriber wraps an endpoint and provides nats.MsgHandler.
13 type Subscriber struct {
14 e endpoint.Endpoint
15 dec DecodeRequestFunc
16 enc EncodeResponseFunc
17 before []RequestFunc
18 after []SubscriberResponseFunc
19 errorEncoder ErrorEncoder
20 logger log.Logger
21 }
22
23 // NewSubscriber constructs a new subscriber, which provides nats.MsgHandler and wraps
24 // the provided endpoint.
25 func NewSubscriber(
26 e endpoint.Endpoint,
27 dec DecodeRequestFunc,
28 enc EncodeResponseFunc,
29 options ...SubscriberOption,
30 ) *Subscriber {
31 s := &Subscriber{
32 e: e,
33 dec: dec,
34 enc: enc,
35 errorEncoder: DefaultErrorEncoder,
36 logger: log.NewNopLogger(),
37 }
38 for _, option := range options {
39 option(s)
40 }
41 return s
42 }
43
44 // SubscriberOption sets an optional parameter for subscribers.
45 type SubscriberOption func(*Subscriber)
46
47 // SubscriberBefore functions are executed on the publisher request object before the
48 // request is decoded.
49 func SubscriberBefore(before ...RequestFunc) SubscriberOption {
50 return func(s *Subscriber) { s.before = append(s.before, before...) }
51 }
52
53 // SubscriberAfter functions are executed on the subscriber reply after the
54 // endpoint is invoked, but before anything is published to the reply.
55 func SubscriberAfter(after ...SubscriberResponseFunc) SubscriberOption {
56 return func(s *Subscriber) { s.after = append(s.after, after...) }
57 }
58
59 // SubscriberErrorEncoder is used to encode errors to the subscriber reply
60 // whenever they're encountered in the processing of a request. Clients can
61 // use this to provide custom error formatting. By default,
62 // errors will be published with the DefaultErrorEncoder.
63 func SubscriberErrorEncoder(ee ErrorEncoder) SubscriberOption {
64 return func(s *Subscriber) { s.errorEncoder = ee }
65 }
66
67 // SubscriberErrorLogger is used to log non-terminal errors. By default, no errors
68 // are logged. This is intended as a diagnostic measure. Finer-grained control
69 // of error handling, including logging in more detail, should be performed in a
70 // custom SubscriberErrorEncoder which has access to the context.
71 func SubscriberErrorLogger(logger log.Logger) SubscriberOption {
72 return func(s *Subscriber) { s.logger = logger }
73 }
74
75 // ServeMsg provides nats.MsgHandler.
76 func (s Subscriber) ServeMsg(nc *nats.Conn) func(msg *nats.Msg) {
77 return func(msg *nats.Msg) {
78 ctx, cancel := context.WithCancel(context.Background())
79 defer cancel()
80
81 for _, f := range s.before {
82 ctx = f(ctx, msg)
83 }
84
85 request, err := s.dec(ctx, msg)
86 if err != nil {
87 s.logger.Log("err", err)
88 if msg.Reply == "" {
89 return
90 }
91 s.errorEncoder(ctx, err, msg.Reply, nc)
92 return
93 }
94
95 response, err := s.e(ctx, request)
96 if err != nil {
97 s.logger.Log("err", err)
98 if msg.Reply == "" {
99 return
100 }
101 s.errorEncoder(ctx, err, msg.Reply, nc)
102 return
103 }
104
105 for _, f := range s.after {
106 ctx = f(ctx, nc)
107 }
108
109 if msg.Reply == "" {
110 return
111 }
112
113 if err := s.enc(ctx, msg.Reply, nc, response); err != nil {
114 s.logger.Log("err", err)
115 s.errorEncoder(ctx, err, msg.Reply, nc)
116 return
117 }
118 }
119 }
120
121 // ErrorEncoder is responsible for encoding an error to the subscriber reply.
122 // Users are encouraged to use custom ErrorEncoders to encode errors to
123 // their replies, and will likely want to pass and check for their own error
124 // types.
125 type ErrorEncoder func(ctx context.Context, err error, reply string, nc *nats.Conn)
126
127 // NopRequestDecoder is a DecodeRequestFunc that can be used for requests that do not
128 // need to be decoded, and simply returns nil, nil.
129 func NopRequestDecoder(_ context.Context, _ *nats.Msg) (interface{}, error) {
130 return nil, nil
131 }
132
133 // EncodeJSONResponse is a EncodeResponseFunc that serializes the response as a
134 // JSON object to the subscriber reply. Many JSON-over services can use it as
135 // a sensible default.
136 func EncodeJSONResponse(_ context.Context, reply string, nc *nats.Conn, response interface{}) error {
137 b, err := json.Marshal(response)
138 if err != nil {
139 return err
140 }
141
142 return nc.Publish(reply, b)
143 }
144
145 // DefaultErrorEncoder writes the error to the subscriber reply.
146 func DefaultErrorEncoder(_ context.Context, err error, reply string, nc *nats.Conn) {
147 logger := log.NewNopLogger()
148
149 type Response struct {
150 Error string `json:"err"`
151 }
152
153 var response Response
154
155 response.Error = err.Error()
156
157 b, err := json.Marshal(response)
158 if err != nil {
159 logger.Log("err", err)
160 return
161 }
162
163 if err := nc.Publish(reply, b); err != nil {
164 logger.Log("err", err)
165 }
166 }
0 package nats_test
1
2 import (
3 "testing"
4 "context"
5 "errors"
6 "time"
7 "sync"
8 "strings"
9 "encoding/json"
10
11 "github.com/nats-io/go-nats"
12 "github.com/nats-io/gnatsd/server"
13
14 natstransport "github.com/go-kit/kit/transport/nats"
15 "github.com/go-kit/kit/endpoint"
16 )
17
18 type TestResponse struct {
19 String string `json:"str"`
20 Error string `json:"err"`
21 }
22
23 func init() {
24 opts := server.Options{Host: "localhost", Port: 4222}
25 natsServer := server.New(&opts)
26
27 go func() {
28 natsServer.Start()
29 }()
30
31 if ok := natsServer.ReadyForConnections(2 * time.Second); !ok {
32 panic("Failed start of NATS")
33 }
34 }
35
36 func TestSubscriberBadDecode(t *testing.T) {
37 nc, err := nats.Connect(nats.DefaultURL)
38 if err != nil {
39 t.Fatal(err)
40 }
41 defer nc.Close()
42
43 handler := natstransport.NewSubscriber(
44 func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil },
45 func(context.Context, *nats.Msg) (interface{}, error) { return struct{}{}, errors.New("dang") },
46 func(context.Context, string, *nats.Conn, interface{}) error { return nil },
47 )
48
49 resp := testRequest(t, nc, handler)
50
51 if want, have := "dang", resp.Error; want != have {
52 t.Errorf("want %s, have %s", want, have)
53 }
54
55 }
56
57 func TestSubscriberBadEndpoint(t *testing.T) {
58 nc, err := nats.Connect(nats.DefaultURL)
59 if err != nil {
60 t.Fatal(err)
61 }
62 defer nc.Close()
63
64 handler := natstransport.NewSubscriber(
65 func(context.Context, interface{}) (interface{}, error) { return struct{}{}, errors.New("dang") },
66 func(context.Context, *nats.Msg) (interface{}, error) { return struct{}{}, nil },
67 func(context.Context, string, *nats.Conn, interface{}) error { return nil },
68 )
69
70 resp := testRequest(t, nc, handler)
71
72 if want, have := "dang", resp.Error; want != have {
73 t.Errorf("want %s, have %s", want, have)
74 }
75 }
76
77 func TestSubscriberBadEncode(t *testing.T) {
78 nc, err := nats.Connect(nats.DefaultURL)
79 if err != nil {
80 t.Fatal(err)
81 }
82 defer nc.Close()
83
84 handler := natstransport.NewSubscriber(
85 func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil },
86 func(context.Context, *nats.Msg) (interface{}, error) { return struct{}{}, nil },
87 func(context.Context, string, *nats.Conn, interface{}) error { return errors.New("dang") },
88 )
89
90 resp := testRequest(t, nc, handler)
91
92 if want, have := "dang", resp.Error; want != have {
93 t.Errorf("want %s, have %s", want, have)
94 }
95 }
96
97 func TestSubscriberErrorEncoder(t *testing.T) {
98 nc, err := nats.Connect(nats.DefaultURL)
99 if err != nil {
100 t.Fatal(err)
101 }
102 defer nc.Close()
103
104 errTeapot := errors.New("teapot")
105 code := func(err error) error {
106 if err == errTeapot {
107 return err
108 }
109 return errors.New("dang")
110 }
111 handler := natstransport.NewSubscriber(
112 func(context.Context, interface{}) (interface{}, error) { return struct{}{}, errTeapot },
113 func(context.Context, *nats.Msg) (interface{}, error) { return struct{}{}, nil },
114 func(context.Context, string, *nats.Conn, interface{}) error { return nil },
115 natstransport.SubscriberErrorEncoder(func(_ context.Context, err error, reply string, nc *nats.Conn) {
116 var r TestResponse
117 r.Error = code(err).Error()
118
119 b, err := json.Marshal(r)
120 if err != nil {
121 t.Fatal(err)
122 }
123
124 if err := nc.Publish(reply, b); err != nil {
125 t.Fatal(err)
126 }
127 }),
128 )
129
130 resp := testRequest(t, nc, handler)
131
132 if want, have := errTeapot.Error(), resp.Error; want != have {
133 t.Errorf("want %s, have %s", want, have)
134 }
135 }
136
137 func TestSubscriberHappySubject(t *testing.T) {
138 step, response := testSubscriber(t)
139 step()
140 r := <-response
141
142 var resp TestResponse
143 err := json.Unmarshal(r.Data, &resp)
144 if err != nil {
145 t.Fatal(err)
146 }
147
148 if want, have := "", resp.Error; want != have {
149 t.Errorf("want %s, have %s (%s)", want, have, r.Data)
150 }
151 }
152
153 func TestMultipleSubscriberBefore(t *testing.T) {
154 nc, err := nats.Connect(nats.DefaultURL)
155 if err != nil {
156 t.Fatal(err)
157 }
158 defer nc.Close()
159
160 var (
161 response = struct{ Body string }{"go eat a fly ugly\n"}
162 wg sync.WaitGroup
163 done = make(chan struct{})
164 )
165 handler := natstransport.NewSubscriber(
166 endpoint.Nop,
167 func(context.Context, *nats.Msg) (interface{}, error) {
168 return struct{}{}, nil
169 },
170 func(_ context.Context, reply string, nc *nats.Conn, _ interface{}) error {
171 b, err := json.Marshal(response)
172 if err != nil {
173 return err
174 }
175
176 return nc.Publish(reply, b)
177 },
178 natstransport.SubscriberBefore(func(ctx context.Context, _ *nats.Msg) context.Context {
179 ctx = context.WithValue(ctx, "one", 1)
180
181 return ctx
182 }),
183 natstransport.SubscriberBefore(func(ctx context.Context, _ *nats.Msg) context.Context {
184 if _, ok := ctx.Value("one").(int); !ok {
185 t.Error("Value was not set properly when multiple ServerBefores are used")
186 }
187
188 close(done)
189 return ctx
190 }),
191 )
192
193 sub, err := nc.QueueSubscribe("natstransport.test", "natstransport", handler.ServeMsg(nc))
194 if err != nil {
195 t.Fatal(err)
196 }
197 defer sub.Unsubscribe()
198
199 wg.Add(1)
200 go func() {
201 defer wg.Done()
202 _, err := nc.Request("natstransport.test", []byte("test data"), 2*time.Second)
203 if err != nil {
204 t.Fatal(err)
205 }
206 }()
207
208 select {
209 case <-done:
210 case <-time.After(time.Second):
211 t.Fatal("timeout waiting for finalizer")
212 }
213
214 wg.Wait()
215 }
216
217 func TestMultipleSubscriberAfter(t *testing.T) {
218 nc, err := nats.Connect(nats.DefaultURL)
219 if err != nil {
220 t.Fatal(err)
221 }
222 defer nc.Close()
223
224 var (
225 response = struct{ Body string }{"go eat a fly ugly\n"}
226 wg sync.WaitGroup
227 done = make(chan struct{})
228 )
229 handler := natstransport.NewSubscriber(
230 endpoint.Nop,
231 func(context.Context, *nats.Msg) (interface{}, error) {
232 return struct{}{}, nil
233 },
234 func(_ context.Context, reply string, nc *nats.Conn, _ interface{}) error {
235 b, err := json.Marshal(response)
236 if err != nil {
237 return err
238 }
239
240 return nc.Publish(reply, b)
241 },
242 natstransport.SubscriberAfter(func(ctx context.Context, nc *nats.Conn) context.Context {
243 ctx = context.WithValue(ctx, "one", 1)
244
245 return ctx
246 }),
247 natstransport.SubscriberAfter(func(ctx context.Context, nc *nats.Conn) context.Context {
248 if _, ok := ctx.Value("one").(int); !ok {
249 t.Error("Value was not set properly when multiple ServerAfters are used")
250 }
251
252 close(done)
253 return ctx
254 }),
255 )
256
257 sub, err := nc.QueueSubscribe("natstransport.test", "natstransport", handler.ServeMsg(nc))
258 if err != nil {
259 t.Fatal(err)
260 }
261 defer sub.Unsubscribe()
262
263 wg.Add(1)
264 go func() {
265 defer wg.Done()
266 _, err := nc.Request("natstransport.test", []byte("test data"), 2*time.Second)
267 if err != nil {
268 t.Fatal(err)
269 }
270 }()
271
272 select {
273 case <-done:
274 case <-time.After(time.Second):
275 t.Fatal("timeout waiting for finalizer")
276 }
277
278 wg.Wait()
279 }
280
281 func TestEncodeJSONResponse(t *testing.T) {
282 nc, err := nats.Connect(nats.DefaultURL)
283 if err != nil {
284 t.Fatal(err)
285 }
286 defer nc.Close()
287
288 handler := natstransport.NewSubscriber(
289 func(context.Context, interface{}) (interface{}, error) { return struct{ Foo string `json:"foo"` }{"bar"}, nil },
290 func(context.Context, *nats.Msg) (interface{}, error) { return struct{}{}, nil },
291 natstransport.EncodeJSONResponse,
292 )
293
294 sub, err := nc.QueueSubscribe("natstransport.test", "natstransport", handler.ServeMsg(nc))
295 if err != nil {
296 t.Fatal(err)
297 }
298 defer sub.Unsubscribe()
299
300 r, err := nc.Request("natstransport.test", []byte("test data"), 2*time.Second)
301 if err != nil {
302 t.Fatal(err)
303 }
304
305 if want, have := `{"foo":"bar"}`, strings.TrimSpace(string(r.Data)); want != have {
306 t.Errorf("Body: want %s, have %s", want, have)
307 }
308 }
309
310 type responseError struct {
311 msg string
312 }
313
314 func (m responseError) Error() string {
315 return m.msg
316 }
317
318 func TestErrorEncoder(t *testing.T) {
319 nc, err := nats.Connect(nats.DefaultURL)
320 if err != nil {
321 t.Fatal(err)
322 }
323 defer nc.Close()
324
325 errResp := struct{ Error string `json:"err"` }{"oh no"}
326 handler := natstransport.NewSubscriber(
327 func(context.Context, interface{}) (interface{}, error) {
328 return nil, responseError{msg: errResp.Error}
329 },
330 func(context.Context, *nats.Msg) (interface{}, error) { return struct{}{}, nil },
331 natstransport.EncodeJSONResponse,
332 )
333
334 sub, err := nc.QueueSubscribe("natstransport.test", "natstransport", handler.ServeMsg(nc))
335 if err != nil {
336 t.Fatal(err)
337 }
338 defer sub.Unsubscribe()
339
340 r, err := nc.Request("natstransport.test", []byte("test data"), 2*time.Second)
341 if err != nil {
342 t.Fatal(err)
343 }
344
345 b, err := json.Marshal(errResp)
346 if err != nil {
347 t.Fatal(err)
348 }
349 if string(b) != string(r.Data) {
350 t.Errorf("ErrorEncoder: got: %q, expected: %q", r.Data, b)
351 }
352 }
353
354 type noContentResponse struct{}
355
356 func TestEncodeNoContent(t *testing.T) {
357 nc, err := nats.Connect(nats.DefaultURL)
358 if err != nil {
359 t.Fatal(err)
360 }
361 defer nc.Close()
362
363 handler := natstransport.NewSubscriber(
364 func(context.Context, interface{}) (interface{}, error) { return noContentResponse{}, nil },
365 func(context.Context, *nats.Msg) (interface{}, error) { return struct{}{}, nil },
366 natstransport.EncodeJSONResponse,
367 )
368
369 sub, err := nc.QueueSubscribe("natstransport.test", "natstransport", handler.ServeMsg(nc))
370 if err != nil {
371 t.Fatal(err)
372 }
373 defer sub.Unsubscribe()
374
375 r, err := nc.Request("natstransport.test", []byte("test data"), 2*time.Second)
376 if err != nil {
377 t.Fatal(err)
378 }
379
380 if want, have := `{}`, strings.TrimSpace(string(r.Data)); want != have {
381 t.Errorf("Body: want %s, have %s", want, have)
382 }
383 }
384
385 func TestNoOpRequestDecoder(t *testing.T) {
386 nc, err := nats.Connect(nats.DefaultURL)
387 if err != nil {
388 t.Fatal(err)
389 }
390 defer nc.Close()
391
392 handler := natstransport.NewSubscriber(
393 func(ctx context.Context, request interface{}) (interface{}, error) {
394 if request != nil {
395 t.Error("Expected nil request in endpoint when using NopRequestDecoder")
396 }
397 return nil, nil
398 },
399 natstransport.NopRequestDecoder,
400 natstransport.EncodeJSONResponse,
401 )
402
403 sub, err := nc.QueueSubscribe("natstransport.test", "natstransport", handler.ServeMsg(nc))
404 if err != nil {
405 t.Fatal(err)
406 }
407 defer sub.Unsubscribe()
408
409 r, err := nc.Request("natstransport.test", []byte("test data"), 2*time.Second)
410 if err != nil {
411 t.Fatal(err)
412 }
413
414 if want, have := `null`, strings.TrimSpace(string(r.Data)); want != have {
415 t.Errorf("Body: want %s, have %s", want, have)
416 }
417 }
418
419 func testSubscriber(t *testing.T) (step func(), resp <-chan *nats.Msg) {
420 var (
421 stepch = make(chan bool)
422 endpoint = func(context.Context, interface{}) (interface{}, error) { <-stepch; return struct{}{}, nil }
423 response = make(chan *nats.Msg)
424 handler = natstransport.NewSubscriber(
425 endpoint,
426 func(context.Context, *nats.Msg) (interface{}, error) { return struct{}{}, nil },
427 natstransport.EncodeJSONResponse,
428 natstransport.SubscriberBefore(func(ctx context.Context, msg *nats.Msg) context.Context { return ctx }),
429 natstransport.SubscriberAfter(func(ctx context.Context, nc *nats.Conn) context.Context { return ctx }),
430 )
431 )
432
433 go func() {
434 nc, err := nats.Connect(nats.DefaultURL)
435 if err != nil {
436 t.Fatal(err)
437 }
438 defer nc.Close()
439
440 sub, err := nc.QueueSubscribe("natstransport.test", "natstransport", handler.ServeMsg(nc))
441 if err != nil {
442 t.Fatal(err)
443 }
444 defer sub.Unsubscribe()
445
446 r, err := nc.Request("natstransport.test", []byte("test data"), 2*time.Second)
447 if err != nil {
448 t.Fatal(err)
449 }
450
451 response <- r
452 }()
453
454 return func() { stepch <- true }, response
455 }
456
457 func testRequest(t *testing.T, nc *nats.Conn, handler *natstransport.Subscriber) TestResponse {
458 sub, err := nc.QueueSubscribe("natstransport.test", "natstransport", handler.ServeMsg(nc))
459 if err != nil {
460 t.Fatal(err)
461 }
462 defer sub.Unsubscribe()
463
464 r, err := nc.Request("natstransport.test", []byte("test data"), 2*time.Second)
465 if err != nil {
466 t.Fatal(err)
467 }
468
469 var resp TestResponse
470 err = json.Unmarshal(r.Data, &resp)
471 if err != nil {
472 t.Fatal(err)
473 }
474
475 return resp
476 }