Codebase list golang-github-go-kit-kit / 62b26ab
adds NopResponse to AMQP transport (#850) Matthew Fung authored 3 years ago Peter Bourgon committed 3 years ago
4 changed file(s) with 214 addition(s) and 61 deletion(s). Raw diff Collapse all Expand all
1414 // Publisher wraps an AMQP channel and queue, and provides a method that
1515 // implements endpoint.Endpoint.
1616 type Publisher struct {
17 ch Channel
18 q *amqp.Queue
19 enc EncodeRequestFunc
20 dec DecodeResponseFunc
21 before []RequestFunc
22 after []PublisherResponseFunc
23 timeout time.Duration
17 ch Channel
18 q *amqp.Queue
19 enc EncodeRequestFunc
20 dec DecodeResponseFunc
21 before []RequestFunc
22 after []PublisherResponseFunc
23 deliverer Deliverer
24 timeout time.Duration
2425 }
2526
2627 // NewPublisher constructs a usable Publisher for a single remote method.
3233 options ...PublisherOption,
3334 ) *Publisher {
3435 p := &Publisher{
35 ch: ch,
36 q: q,
37 enc: enc,
38 dec: dec,
39 timeout: 10 * time.Second,
36 ch: ch,
37 q: q,
38 enc: enc,
39 dec: dec,
40 deliverer: DefaultDeliverer,
41 timeout: 10 * time.Second,
4042 }
4143 for _, option := range options {
4244 option(p)
5860 // of the response and adding onto the context prior to decoding.
5961 func PublisherAfter(after ...PublisherResponseFunc) PublisherOption {
6062 return func(p *Publisher) { p.after = append(p.after, after...) }
63 }
64
65 // PublisherDeliverer sets the deliverer function that the Publisher invokes.
66 func PublisherDeliverer(deliverer Deliverer) PublisherOption {
67 return func(p *Publisher) { p.deliverer = deliverer }
6168 }
6269
6370 // PublisherTimeout sets the available timeout for an AMQP request.
8592 ctx = f(ctx, &pub, nil)
8693 }
8794
88 deliv, err := p.publishAndConsumeFirstMatchingResponse(ctx, &pub)
95 deliv, err := p.deliverer(ctx, p, &pub)
8996 if err != nil {
9097 return nil, err
9198 }
102109 }
103110 }
104111
105 // publishAndConsumeFirstMatchingResponse publishes the specified Publishing
112 // Deliverer is invoked by the Publisher to publish the specified Publishing, and to
113 // retrieve the appropriate response Delivery object.
114 type Deliverer func(
115 context.Context,
116 Publisher,
117 *amqp.Publishing,
118 ) (*amqp.Delivery, error)
119
120 // DefaultDeliverer is a deliverer that publishes the specified Publishing
106121 // and returns the first Delivery object with the matching correlationId.
107122 // If the context times out while waiting for a reply, an error will be returned.
108 func (p Publisher) publishAndConsumeFirstMatchingResponse(
123 func DefaultDeliverer(
109124 ctx context.Context,
125 p Publisher,
110126 pub *amqp.Publishing,
111127 ) (*amqp.Delivery, error) {
112128 err := p.ch.Publish(
150166 }
151167
152168 }
169
170 // SendAndForgetDeliverer delivers the supplied publishing and
171 // returns a nil response.
172 // When using this deliverer please ensure that the supplied DecodeResponseFunc and
173 // PublisherResponseFunc are able to handle nil-type responses.
174 func SendAndForgetDeliverer(
175 ctx context.Context,
176 p Publisher,
177 pub *amqp.Publishing,
178 ) (*amqp.Delivery, error) {
179 err := p.ch.Publish(
180 getPublishExchange(ctx),
181 getPublishKey(ctx),
182 false, //mandatory
183 false, //immediate
184 *pub,
185 )
186 return nil, err
187 }
223223 t.Errorf("want %s, have %s", want, have)
224224 }
225225 }
226
227 // TestSendAndForgetPublisher tests that the SendAndForgetDeliverer is working
228 func TestSendAndForgetPublisher(t *testing.T) {
229 ch := &mockChannel{
230 f: nullFunc,
231 c: make(chan amqp.Publishing, 1),
232 deliveries: []amqp.Delivery{}, // no reply from mock subscriber
233 }
234 q := &amqp.Queue{Name: "some queue"}
235
236 pub := amqptransport.NewPublisher(
237 ch,
238 q,
239 func(context.Context, *amqp.Publishing, interface{}) error { return nil },
240 func(context.Context, *amqp.Delivery) (response interface{}, err error) {
241 return struct{}{}, nil
242 },
243 amqptransport.PublisherDeliverer(amqptransport.SendAndForgetDeliverer),
244 amqptransport.PublisherTimeout(50*time.Millisecond),
245 )
246
247 var err error
248 errChan := make(chan error, 1)
249 finishChan := make(chan bool, 1)
250 go func() {
251 _, err := pub.Endpoint()(context.Background(), struct{}{})
252 if err != nil {
253 errChan <- err
254 } else {
255 finishChan <- true
256 }
257
258 }()
259
260 select {
261 case <-finishChan:
262 break
263 case err = <-errChan:
264 t.Errorf("unexpected error %s", err)
265 case <-time.After(100 * time.Millisecond):
266 t.Fatal("timed out waiting for result")
267 }
268
269 }
1111
1212 // Subscriber wraps an endpoint and provides a handler for AMQP Delivery messages.
1313 type Subscriber struct {
14 e endpoint.Endpoint
15 dec DecodeRequestFunc
16 enc EncodeResponseFunc
17 before []RequestFunc
18 after []SubscriberResponseFunc
19 errorEncoder ErrorEncoder
20 logger log.Logger
14 e endpoint.Endpoint
15 dec DecodeRequestFunc
16 enc EncodeResponseFunc
17 before []RequestFunc
18 after []SubscriberResponseFunc
19 responsePublisher ResponsePublisher
20 errorEncoder ErrorEncoder
21 logger log.Logger
2122 }
2223
2324 // NewSubscriber constructs a new subscriber, which provides a handler
2930 options ...SubscriberOption,
3031 ) *Subscriber {
3132 s := &Subscriber{
32 e: e,
33 dec: dec,
34 enc: enc,
35 errorEncoder: DefaultErrorEncoder,
36 logger: log.NewNopLogger(),
33 e: e,
34 dec: dec,
35 enc: enc,
36 responsePublisher: DefaultResponsePublisher,
37 errorEncoder: DefaultErrorEncoder,
38 logger: log.NewNopLogger(),
3739 }
3840 for _, option := range options {
3941 option(s)
5456 // endpoint is invoked, but before anything is published to the reply.
5557 func SubscriberAfter(after ...SubscriberResponseFunc) SubscriberOption {
5658 return func(s *Subscriber) { s.after = append(s.after, after...) }
59 }
60
61 // SubscriberResponsePublisher is used by the subscriber to deliver response
62 // objects to the original sender.
63 // By default, the DefaultResponsePublisher is used.
64 func SubscriberResponsePublisher(rp ResponsePublisher) SubscriberOption {
65 return func(s *Subscriber) { s.responsePublisher = rp }
5766 }
5867
5968 // SubscriberErrorEncoder is used to encode errors to the subscriber reply
110119 return
111120 }
112121
113 if err := s.publishResponse(ctx, deliv, ch, &pub); err != nil {
114 s.logger.Log("err", err)
115 s.errorEncoder(ctx, err, deliv, ch, &pub)
116 return
117 }
118 }
119
120 }
121
122 func (s Subscriber) publishResponse(
122 if err := s.responsePublisher(ctx, deliv, ch, &pub); err != nil {
123 s.logger.Log("err", err)
124 s.errorEncoder(ctx, err, deliv, ch, &pub)
125 return
126 }
127 }
128
129 }
130
131 // EncodeJSONResponse marshals the response as JSON as part of the
132 // payload of the AMQP Publishing object.
133 func EncodeJSONResponse(
134 ctx context.Context,
135 pub *amqp.Publishing,
136 response interface{},
137 ) error {
138 b, err := json.Marshal(response)
139 if err != nil {
140 return err
141 }
142 pub.Body = b
143 return nil
144 }
145
146 // EncodeNopResponse is a response function that does nothing.
147 func EncodeNopResponse(
148 ctx context.Context,
149 pub *amqp.Publishing,
150 response interface{},
151 ) error {
152 return nil
153 }
154
155 // ResponsePublisher functions are executed by the subscriber to
156 // publish response object to the original sender.
157 // Please note that the word "publisher" does not refer
158 // to the publisher of pub/sub.
159 // Rather, publisher is merely a function that publishes, or sends responses.
160 type ResponsePublisher func(
161 context.Context,
162 *amqp.Delivery,
163 Channel,
164 *amqp.Publishing,
165 ) error
166
167 // DefaultResponsePublisher extracts the reply exchange and reply key
168 // from the request, and sends the response object to that destination.
169 func DefaultResponsePublisher(
123170 ctx context.Context,
124171 deliv *amqp.Delivery,
125172 ch Channel,
144191 )
145192 }
146193
147 // EncodeJSONResponse marshals the response as JSON as part of the
148 // payload of the AMQP Publishing object.
149 func EncodeJSONResponse(
150 ctx context.Context,
151 pub *amqp.Publishing,
152 response interface{},
153 ) error {
154 b, err := json.Marshal(response)
155 if err != nil {
156 return err
157 }
158 pub.Body = b
159 return nil
160 }
161
162 // EncodeNopResponse is a response function that does nothing.
163 func EncodeNopResponse(
164 ctx context.Context,
165 pub *amqp.Publishing,
166 response interface{},
194 // NopResponsePublisher does not deliver a response to the original sender.
195 // This response publisher is used when the user wants the subscriber to
196 // receive and forget.
197 func NopResponsePublisher(
198 ctx context.Context,
199 deliv *amqp.Delivery,
200 ch Channel,
201 pub *amqp.Publishing,
167202 ) error {
168203 return nil
169204 }
1111 )
1212
1313 var (
14 typeAssertionError = errors.New("type assertion error")
14 errTypeAssertion = errors.New("type assertion error")
1515 )
1616
1717 // mockChannel is a mock of *amqp.Channel.
204204 }
205205 res, ok := response.(testRes)
206206 if !ok {
207 t.Error(typeAssertionError)
207 t.Error(errTypeAssertion)
208208 }
209209
210210 if want, have := obj.Squadron, res.Squadron; want != have {
212212 }
213213 if want, have := names[obj.Squadron], res.Name; want != have {
214214 t.Errorf("want %s, have %s", want, have)
215 }
216 }
217
218 // TestNopResponseSubscriber checks if setting responsePublisher to
219 // NopResponsePublisher works properly by disabling response.
220 func TestNopResponseSubscriber(t *testing.T) {
221 cid := "correlation"
222 replyTo := "sender"
223 obj := testReq{
224 Squadron: 436,
225 }
226 b, err := json.Marshal(obj)
227 if err != nil {
228 t.Fatal(err)
229 }
230
231 sub := amqptransport.NewSubscriber(
232 testEndpoint,
233 testReqDecoder,
234 amqptransport.EncodeJSONResponse,
235 amqptransport.SubscriberResponsePublisher(amqptransport.NopResponsePublisher),
236 amqptransport.SubscriberErrorEncoder(amqptransport.ReplyErrorEncoder),
237 )
238
239 checkReplyToFunc := func(exchange, key string, mandatory, immediate bool) {}
240
241 outputChan := make(chan amqp.Publishing, 1)
242 ch := &mockChannel{f: checkReplyToFunc, c: outputChan}
243 sub.ServeDelivery(ch)(&amqp.Delivery{
244 CorrelationId: cid,
245 ReplyTo: replyTo,
246 Body: b,
247 })
248
249 select {
250 case <-outputChan:
251 t.Fatal("Subscriber with NopResponsePublisher replied.")
252 case <-time.After(100 * time.Millisecond):
253 break
215254 }
216255 }
217256
293332 amqptransport.EncodeJSONResponse,
294333 amqptransport.SubscriberErrorEncoder(amqptransport.ReplyErrorEncoder),
295334 )
296 checkReplyToFunc := func(exch, k string, mandatory, immediate bool) { return }
335 checkReplyToFunc := func(exch, k string, mandatory, immediate bool) {}
297336 outputChan := make(chan amqp.Publishing, 1)
298337 ch := &mockChannel{f: checkReplyToFunc, c: outputChan}
299338 sub.ServeDelivery(ch)(&amqp.Delivery{})
343382 func testEndpoint(_ context.Context, request interface{}) (interface{}, error) {
344383 req, ok := request.(testReq)
345384 if !ok {
346 return nil, typeAssertionError
385 return nil, errTypeAssertion
347386 }
348387 name, prs := names[req.Squadron]
349388 if !prs {