Codebase list golang-github-go-kit-kit / 72f9852
Fix race condition in NATS transport tests (#705) * Make sure connections are really closed after tests * Check for subscriptions instead of connections Justin Nuß authored 5 years ago Peter Bourgon committed 5 years ago
2 changed file(s) with 80 addition(s) and 91 deletion(s). Raw diff Collapse all Expand all
00 package nats_test
11
22 import (
3 "context"
4 "strings"
35 "testing"
4 "context"
56 "time"
6 "strings"
7
7
8 natstransport "github.com/go-kit/kit/transport/nats"
89 "github.com/nats-io/go-nats"
9 natstransport "github.com/go-kit/kit/transport/nats"
1010 )
1111
1212 func TestPublisher(t *testing.T) {
1818 }
1919 )
2020
21 nc, err := nats.Connect(nats.DefaultURL)
22 if err != nil {
23 t.Fatal(err)
24 }
21 nc := newNatsConn(t)
2522 defer nc.Close()
2623
2724 sub, err := nc.QueueSubscribe("natstransport.test", "natstransport", func(msg *nats.Msg) {
6562 }
6663 )
6764
68 nc, err := nats.Connect(nats.DefaultURL)
69 if err != nil {
70 t.Fatal(err)
71 }
65 nc := newNatsConn(t)
7266 defer nc.Close()
7367
7468 sub, err := nc.QueueSubscribe("natstransport.test", "natstransport", func(msg *nats.Msg) {
116110 }
117111 )
118112
119 nc, err := nats.Connect(nats.DefaultURL)
120 if err != nil {
121 t.Fatal(err)
122 }
113 nc := newNatsConn(t)
123114 defer nc.Close()
124115
125116 sub, err := nc.QueueSubscribe("natstransport.test", "natstransport", func(msg *nats.Msg) {
166157 }
167158 )
168159
169 nc, err := nats.Connect(nats.DefaultURL)
170 if err != nil {
171 t.Fatal(err)
172 }
160 nc := newNatsConn(t)
173161 defer nc.Close()
174162
175163 ch := make(chan struct{})
194182 _, err = publisher.Endpoint()(context.Background(), struct{}{})
195183 if err != context.DeadlineExceeded {
196184 t.Errorf("want %s, have %s", context.DeadlineExceeded, err)
197
198 }
199
185 }
200186 }
201187
202188 func TestEncodeJSONRequest(t *testing.T) {
203189 var data string
204190
205 nc, err := nats.Connect(nats.DefaultURL)
206 if err != nil {
207 t.Fatal(err)
208 }
191 nc := newNatsConn(t)
209192 defer nc.Close()
210193
211194 sub, err := nc.QueueSubscribe("natstransport.test", "natstransport", func(msg *nats.Msg) {
236219 {1.2, "1.2"},
237220 {true, "true"},
238221 {"test", "\"test\""},
239 {struct{ Foo string `json:"foo"` }{"foo"}, "{\"foo\":\"foo\"}"},
222 {struct {
223 Foo string `json:"foo"`
224 }{"foo"}, "{\"foo\":\"foo\"}"},
240225 } {
241226 if _, err := publisher(context.Background(), test.value); err != nil {
242227 t.Fatal(err)
00 package nats_test
11
22 import (
3 "context"
4 "encoding/json"
5 "errors"
6 "strings"
7 "sync"
38 "testing"
4 "context"
5 "errors"
69 "time"
7 "sync"
8 "strings"
9 "encoding/json"
10
10
11 "github.com/nats-io/gnatsd/server"
1112 "github.com/nats-io/go-nats"
12 "github.com/nats-io/gnatsd/server"
13
13
14 "github.com/go-kit/kit/endpoint"
1415 natstransport "github.com/go-kit/kit/transport/nats"
15 "github.com/go-kit/kit/endpoint"
1616 )
1717
1818 type TestResponse struct {
2020 Error string `json:"err"`
2121 }
2222
23 var natsServer *server.Server
24
2325 func init() {
24 opts := server.Options{Host: "localhost", Port: 4222}
25 natsServer := server.New(&opts)
26 natsServer = server.New(&server.Options{
27 Host: "localhost",
28 Port: 4222,
29 })
2630
2731 go func() {
2832 natsServer.Start()
3337 }
3438 }
3539
40 func newNatsConn(t *testing.T) *nats.Conn {
41 // Subscriptions and connections are closed asynchronously, so it's possible
42 // that there's still a subscription from an old connection that must be closed
43 // before the current test can be run.
44 for tries := 20; tries > 0; tries-- {
45 if natsServer.NumSubscriptions() == 0 {
46 break
47 }
48
49 time.Sleep(5 * time.Millisecond)
50 }
51
52 if n := natsServer.NumSubscriptions(); n > 0 {
53 t.Fatalf("found %d active subscriptions on the server", n)
54 }
55
56 nc, err := nats.Connect("nats://"+natsServer.Addr().String(), nats.Name(t.Name()))
57 if err != nil {
58 t.Fatalf("failed to connect to gnatsd server: %s", err)
59 }
60
61 return nc
62 }
63
3664 func TestSubscriberBadDecode(t *testing.T) {
37 nc, err := nats.Connect(nats.DefaultURL)
38 if err != nil {
39 t.Fatal(err)
40 }
65 nc := newNatsConn(t)
4166 defer nc.Close()
4267
4368 handler := natstransport.NewSubscriber(
5580 }
5681
5782 func TestSubscriberBadEndpoint(t *testing.T) {
58 nc, err := nats.Connect(nats.DefaultURL)
59 if err != nil {
60 t.Fatal(err)
61 }
83 nc := newNatsConn(t)
6284 defer nc.Close()
6385
6486 handler := natstransport.NewSubscriber(
7597 }
7698
7799 func TestSubscriberBadEncode(t *testing.T) {
78 nc, err := nats.Connect(nats.DefaultURL)
79 if err != nil {
80 t.Fatal(err)
81 }
100 nc := newNatsConn(t)
82101 defer nc.Close()
83102
84103 handler := natstransport.NewSubscriber(
95114 }
96115
97116 func TestSubscriberErrorEncoder(t *testing.T) {
98 nc, err := nats.Connect(nats.DefaultURL)
99 if err != nil {
100 t.Fatal(err)
101 }
117 nc := newNatsConn(t)
102118 defer nc.Close()
103119
104120 errTeapot := errors.New("teapot")
151167 }
152168
153169 func TestMultipleSubscriberBefore(t *testing.T) {
154 nc, err := nats.Connect(nats.DefaultURL)
155 if err != nil {
156 t.Fatal(err)
157 }
170 nc := newNatsConn(t)
158171 defer nc.Close()
159172
160173 var (
215228 }
216229
217230 func TestMultipleSubscriberAfter(t *testing.T) {
218 nc, err := nats.Connect(nats.DefaultURL)
219 if err != nil {
220 t.Fatal(err)
221 }
231 nc := newNatsConn(t)
222232 defer nc.Close()
223233
224234 var (
279289 }
280290
281291 func TestEncodeJSONResponse(t *testing.T) {
282 nc, err := nats.Connect(nats.DefaultURL)
283 if err != nil {
284 t.Fatal(err)
285 }
286 defer nc.Close()
287
288 handler := natstransport.NewSubscriber(
289 func(context.Context, interface{}) (interface{}, error) { return struct{ Foo string `json:"foo"` }{"bar"}, nil },
292 nc := newNatsConn(t)
293 defer nc.Close()
294
295 handler := natstransport.NewSubscriber(
296 func(context.Context, interface{}) (interface{}, error) {
297 return struct {
298 Foo string `json:"foo"`
299 }{"bar"}, nil
300 },
290301 func(context.Context, *nats.Msg) (interface{}, error) { return struct{}{}, nil },
291302 natstransport.EncodeJSONResponse,
292303 )
316327 }
317328
318329 func TestErrorEncoder(t *testing.T) {
319 nc, err := nats.Connect(nats.DefaultURL)
320 if err != nil {
321 t.Fatal(err)
322 }
323 defer nc.Close()
324
325 errResp := struct{ Error string `json:"err"` }{"oh no"}
330 nc := newNatsConn(t)
331 defer nc.Close()
332
333 errResp := struct {
334 Error string `json:"err"`
335 }{"oh no"}
326336 handler := natstransport.NewSubscriber(
327337 func(context.Context, interface{}) (interface{}, error) {
328338 return nil, responseError{msg: errResp.Error}
354364 type noContentResponse struct{}
355365
356366 func TestEncodeNoContent(t *testing.T) {
357 nc, err := nats.Connect(nats.DefaultURL)
358 if err != nil {
359 t.Fatal(err)
360 }
367 nc := newNatsConn(t)
361368 defer nc.Close()
362369
363370 handler := natstransport.NewSubscriber(
383390 }
384391
385392 func TestNoOpRequestDecoder(t *testing.T) {
386 nc, err := nats.Connect(nats.DefaultURL)
387 if err != nil {
388 t.Fatal(err)
389 }
393 nc := newNatsConn(t)
390394 defer nc.Close()
391395
392396 handler := natstransport.NewSubscriber(
419423 func testSubscriber(t *testing.T) (step func(), resp <-chan *nats.Msg) {
420424 var (
421425 stepch = make(chan bool)
422 endpoint = func(context.Context, interface{}) (interface{}, error) { <-stepch; return struct{}{}, nil }
426 endpoint = func(context.Context, interface{}) (interface{}, error) {
427 <-stepch
428 return struct{}{}, nil
429 }
423430 response = make(chan *nats.Msg)
424431 handler = natstransport.NewSubscriber(
425432 endpoint,
431438 )
432439
433440 go func() {
434 nc, err := nats.Connect(nats.DefaultURL)
435 if err != nil {
436 t.Fatal(err)
437 }
441 nc := newNatsConn(t)
438442 defer nc.Close()
439443
440444 sub, err := nc.QueueSubscribe("natstransport.test", "natstransport", handler.ServeMsg(nc))