Add FinalizerFunc to NATS transport (#790) (#790)
Kirill Parasotchenko authored 5 years ago
Peter Bourgon committed 5 years ago
17 | 17 | before []RequestFunc |
18 | 18 | after []SubscriberResponseFunc |
19 | 19 | errorEncoder ErrorEncoder |
20 | finalizer []SubscriberFinalizerFunc | |
20 | 21 | logger log.Logger |
21 | 22 | } |
22 | 23 | |
72 | 73 | return func(s *Subscriber) { s.logger = logger } |
73 | 74 | } |
74 | 75 | |
76 | // SubscriberFinalizer is executed at the end of every request from a publisher through NATS. | |
77 | // By default, no finalizer is registered. | |
78 | func SubscriberFinalizer(f ...SubscriberFinalizerFunc) SubscriberOption { | |
79 | return func(s *Subscriber) { s.finalizer = f } | |
80 | } | |
81 | ||
75 | 82 | // ServeMsg provides nats.MsgHandler. |
76 | 83 | func (s Subscriber) ServeMsg(nc *nats.Conn) func(msg *nats.Msg) { |
77 | 84 | return func(msg *nats.Msg) { |
78 | 85 | ctx, cancel := context.WithCancel(context.Background()) |
79 | 86 | defer cancel() |
87 | ||
88 | if len(s.finalizer) > 0 { | |
89 | defer func() { | |
90 | for _, f := range s.finalizer { | |
91 | f(ctx, msg) | |
92 | } | |
93 | }() | |
94 | } | |
80 | 95 | |
81 | 96 | for _, f := range s.before { |
82 | 97 | ctx = f(ctx, msg) |
124 | 139 | // types. |
125 | 140 | type ErrorEncoder func(ctx context.Context, err error, reply string, nc *nats.Conn) |
126 | 141 | |
142 | // ServerFinalizerFunc can be used to perform work at the end of an request | |
143 | // from a publisher, after the response has been written to the publisher. The principal | |
144 | // intended use is for request logging. | |
145 | type SubscriberFinalizerFunc func(ctx context.Context, msg *nats.Msg) | |
146 | ||
127 | 147 | // NopRequestDecoder is a DecodeRequestFunc that can be used for requests that do not |
128 | 148 | // need to be decoded, and simply returns nil, nil. |
129 | 149 | func NopRequestDecoder(_ context.Context, _ *nats.Msg) (interface{}, error) { |
288 | 288 | wg.Wait() |
289 | 289 | } |
290 | 290 | |
291 | func TestSubscriberFinalizerFunc(t *testing.T) { | |
292 | nc := newNatsConn(t) | |
293 | defer nc.Close() | |
294 | ||
295 | var ( | |
296 | response = struct{ Body string }{"go eat a fly ugly\n"} | |
297 | wg sync.WaitGroup | |
298 | done = make(chan struct{}) | |
299 | ) | |
300 | handler := natstransport.NewSubscriber( | |
301 | endpoint.Nop, | |
302 | func(context.Context, *nats.Msg) (interface{}, error) { | |
303 | return struct{}{}, nil | |
304 | }, | |
305 | func(_ context.Context, reply string, nc *nats.Conn, _ interface{}) error { | |
306 | b, err := json.Marshal(response) | |
307 | if err != nil { | |
308 | return err | |
309 | } | |
310 | ||
311 | return nc.Publish(reply, b) | |
312 | }, | |
313 | natstransport.SubscriberFinalizer(func(ctx context.Context, _ *nats.Msg) { | |
314 | close(done) | |
315 | }), | |
316 | ) | |
317 | ||
318 | sub, err := nc.QueueSubscribe("natstransport.test", "natstransport", handler.ServeMsg(nc)) | |
319 | if err != nil { | |
320 | t.Fatal(err) | |
321 | } | |
322 | defer sub.Unsubscribe() | |
323 | ||
324 | wg.Add(1) | |
325 | go func() { | |
326 | defer wg.Done() | |
327 | _, err := nc.Request("natstransport.test", []byte("test data"), 2*time.Second) | |
328 | if err != nil { | |
329 | t.Fatal(err) | |
330 | } | |
331 | }() | |
332 | ||
333 | select { | |
334 | case <-done: | |
335 | case <-time.After(time.Second): | |
336 | t.Fatal("timeout waiting for finalizer") | |
337 | } | |
338 | ||
339 | wg.Wait() | |
340 | } | |
341 | ||
291 | 342 | func TestEncodeJSONResponse(t *testing.T) { |
292 | 343 | nc := newNatsConn(t) |
293 | 344 | defer nc.Close() |