Codebase list golang-github-go-kit-kit / 07d89a2
Use given context in NATS endpoint (#876) Previous implementation used a new background context which prevented context cancellation. Nathan Smith authored 3 years ago Peter Bourgon committed 3 years ago
2 changed file(s) with 39 addition(s) and 1 deletion(s). Raw diff Collapse all Expand all
6363 // Endpoint returns a usable endpoint that invokes the remote endpoint.
6464 func (p Publisher) Endpoint() endpoint.Endpoint {
6565 return func(ctx context.Context, request interface{}) (interface{}, error) {
66 ctx, cancel := context.WithTimeout(context.Background(), p.timeout)
66 ctx, cancel := context.WithTimeout(ctx, p.timeout)
6767 defer cancel()
6868
6969 msg := nats.Msg{Subject: p.subject}
182182 _, err = publisher.Endpoint()(context.Background(), struct{}{})
183183 if err != context.DeadlineExceeded {
184184 t.Errorf("want %s, have %s", context.DeadlineExceeded, err)
185 }
186 }
187
188 func TestPublisherCancellation(t *testing.T) {
189 var (
190 testdata = "testdata"
191 encode = func(context.Context, *nats.Msg, interface{}) error { return nil }
192 decode = func(_ context.Context, msg *nats.Msg) (interface{}, error) {
193 return TestResponse{string(msg.Data), ""}, nil
194 }
195 )
196
197 nc := newNatsConn(t)
198 defer nc.Close()
199
200 sub, err := nc.QueueSubscribe("natstransport.test", "natstransport", func(msg *nats.Msg) {
201 if err := nc.Publish(msg.Reply, []byte(testdata)); err != nil {
202 t.Fatal(err)
203 }
204 })
205 if err != nil {
206 t.Fatal(err)
207 }
208 defer sub.Unsubscribe()
209
210 publisher := natstransport.NewPublisher(
211 nc,
212 "natstransport.test",
213 encode,
214 decode,
215 )
216
217 ctx, cancel := context.WithCancel(context.Background())
218 cancel()
219
220 _, err = publisher.Endpoint()(ctx, struct{}{})
221 if err != context.Canceled {
222 t.Errorf("want %s, have %s", context.Canceled, err)
185223 }
186224 }
187225