Merge pull request #198 from basvanbeek/master
tracing/zipkin updates (new kafka collector)
Peter Bourgon
8 years ago
16 | 16 | // received from an endpoint. |
17 | 17 | var ErrBadCast = errors.New("bad cast") |
18 | 18 | |
19 | // ContextCanceled indicates the request context was canceled. | |
19 | // ErrContextCanceled indicates the request context was canceled. | |
20 | 20 | var ErrContextCanceled = errors.New("context canceled") |
21 | 21 | |
22 | 22 | // Chain is a helper function for composing middlewares. Requests will |
11 | 11 | "golang.org/x/net/context" |
12 | 12 | ) |
13 | 13 | |
14 | // Client is a wrapper arround the etcd client. | |
14 | // Client is a wrapper around the etcd client. | |
15 | 15 | type Client interface { |
16 | 16 | // GetEntries will query the given prefix in etcd and returns a set of entries. |
17 | 17 | GetEntries(prefix string) ([]string, error) |
32 | 32 | return nil, err |
33 | 33 | } |
34 | 34 | |
35 | // intial node retrieval and cache fill | |
35 | // initial node retrieval and cache fill | |
36 | 36 | instances, eventc, err := p.client.GetEntries(p.path) |
37 | 37 | if err != nil { |
38 | 38 | logger.Log("path", p.path, "msg", "failed to retrieve entries", "err", err) |
13 | 13 | hot spots, and diagnosing errors. All microservice infrastructures will |
14 | 14 | benefit from request tracing; sufficiently large infrastructures will require |
15 | 15 | it. |
16 | ||
17 | ## Test Setup | |
18 | ||
19 | Setting up [Zipkin] is not an easy thing to do. It will also demand quite some | |
20 | resources. To help you get started with development and testing we've made a | |
21 | [VirtualBox] image available through [Vagrant] (*The box will require about 6GB | |
22 | internal memory*). | |
23 | ||
24 | First make sure you've installed [Vagrant] on your machine. Then you can | |
25 | download and run the [Vagrant] image like this from the command line: | |
26 | ||
27 | ``` | |
28 | # create a new directory to store your vagrant configuration and image files | |
29 | mkdir zipkin | |
30 | cd zipkin | |
31 | vagrant init bas303/zipkin | |
32 | vagrant up --provider virtualbox | |
33 | ``` | |
34 | ||
35 | [Zipkin]: http://zipkin.io/ | |
36 | [VirtualBox]: https://www.virtualbox.org/ | |
37 | [Vagrant]: https://www.vagrantup.com/ | |
38 | ||
39 | You probably need to adjust the `Vagrantfile` configuration to meet your | |
40 | networking needs. The file itself is documented so should not be hard to get | |
41 | configured. After the change you can reload your box with the updated settings | |
42 | like this: | |
43 | ||
44 | ``` | |
45 | vagrant reload | |
46 | ``` | |
47 | ||
48 | As mentioned the box is quite heavy and may take a few minutes to fully boot up. | |
49 | To get into the box connect through ssh and use `vagrant` for both username and | |
50 | password. | |
51 | ||
52 | The following services have been set-up to run: | |
53 | - Apache ZooKeeper (port: 2181) | |
54 | - Apache Kafka (port: 9092) | |
55 | - MySQL Server 5.5 (port: 3306) | |
56 | - Zipkin Collector (Kafka, MySQL) | |
57 | - Zipkin Query (MySQL) | |
58 | - Zipkin Web (port: 8080, 9990) | |
59 | ||
60 | To inspect if everything booted up properly check the log files in these | |
61 | directories: | |
62 | ``` | |
63 | /var/log/zookeeper | |
64 | /var/log/kafka | |
65 | /var/log/mysql | |
66 | /var/log/zipkin | |
67 | ``` | |
68 | ||
69 | The individual services can be managed with the `service` command: | |
70 | - zookeeper | |
71 | - kafka | |
72 | - mysql | |
73 | - collector | |
74 | - query | |
75 | - web | |
16 | 76 | |
17 | 77 | ## Usage |
18 | 78 |
0 | 0 | package zipkin |
1 | 1 | |
2 | import ( | |
3 | "encoding/base64" | |
4 | "errors" | |
5 | "fmt" | |
6 | "math" | |
7 | "math/rand" | |
8 | "net" | |
9 | "strings" | |
10 | "time" | |
11 | ||
12 | "github.com/apache/thrift/lib/go/thrift" | |
13 | ||
14 | "github.com/go-kit/kit/log" | |
15 | "github.com/go-kit/kit/tracing/zipkin/_thrift/gen-go/scribe" | |
16 | ) | |
2 | import "strings" | |
17 | 3 | |
18 | 4 | // Collector represents a Zipkin trace collector, which is probably a set of |
19 | 5 | // remote endpoints. |
20 | 6 | type Collector interface { |
21 | 7 | Collect(*Span) error |
22 | } | |
23 | ||
24 | // ScribeCollector implements Collector by forwarding spans to a Scribe | |
25 | // service, in batches. | |
26 | type ScribeCollector struct { | |
27 | client scribe.Scribe | |
28 | factory func() (scribe.Scribe, error) | |
29 | spanc chan *Span | |
30 | sendc chan struct{} | |
31 | batch []*scribe.LogEntry | |
32 | nextSend time.Time | |
33 | batchInterval time.Duration | |
34 | batchSize int | |
35 | sampleRate float64 | |
36 | sampleSalt int64 | |
37 | logger log.Logger | |
38 | } | |
39 | ||
40 | // NewScribeCollector returns a new Scribe-backed Collector. addr should be a | |
41 | // TCP endpoint of the form "host:port". timeout is passed to the Thrift dial | |
42 | // function NewTSocketFromAddrTimeout. batchSize and batchInterval control the | |
43 | // maximum size and interval of a batch of spans; as soon as either limit is | |
44 | // reached, the batch is sent. The logger is used to log errors, such as batch | |
45 | // send failures; users should provide an appropriate context, if desired. | |
46 | func NewScribeCollector(addr string, timeout time.Duration, options ...ScribeOption) (Collector, error) { | |
47 | factory := scribeClientFactory(addr, timeout) | |
48 | client, err := factory() | |
49 | if err != nil { | |
50 | return nil, err | |
51 | } | |
52 | defaultBatchInterval := time.Second | |
53 | c := &ScribeCollector{ | |
54 | client: client, | |
55 | factory: factory, | |
56 | spanc: make(chan *Span), | |
57 | sendc: make(chan struct{}), | |
58 | batch: []*scribe.LogEntry{}, | |
59 | nextSend: time.Now().Add(defaultBatchInterval), | |
60 | batchInterval: defaultBatchInterval, | |
61 | batchSize: 100, | |
62 | sampleRate: 1.0, | |
63 | sampleSalt: rand.Int63(), | |
64 | logger: log.NewNopLogger(), | |
65 | } | |
66 | for _, option := range options { | |
67 | option(c) | |
68 | } | |
69 | go c.loop() | |
70 | return c, nil | |
71 | } | |
72 | ||
73 | // Collect implements Collector. | |
74 | func (c *ScribeCollector) Collect(s *Span) error { | |
75 | c.spanc <- s | |
76 | return nil // accepted | |
77 | } | |
78 | ||
79 | func (c *ScribeCollector) loop() { | |
80 | tickc := time.Tick(c.batchInterval / 10) | |
81 | ||
82 | for { | |
83 | select { | |
84 | case span := <-c.spanc: | |
85 | if !shouldSample(span.traceID, c.sampleSalt, c.sampleRate) { | |
86 | continue | |
87 | } | |
88 | c.batch = append(c.batch, &scribe.LogEntry{ | |
89 | Category: "zipkin", // TODO parameterize? | |
90 | Message: serialize(span), | |
91 | }) | |
92 | if len(c.batch) >= c.batchSize { | |
93 | go c.sendNow() | |
94 | } | |
95 | ||
96 | case <-tickc: | |
97 | if time.Now().After(c.nextSend) { | |
98 | go c.sendNow() | |
99 | } | |
100 | ||
101 | case <-c.sendc: | |
102 | c.nextSend = time.Now().Add(c.batchInterval) | |
103 | if err := c.send(c.batch); err != nil { | |
104 | c.logger.Log("err", err.Error()) | |
105 | } | |
106 | c.batch = c.batch[:0] | |
107 | } | |
108 | } | |
109 | } | |
110 | ||
111 | func (c *ScribeCollector) sendNow() { | |
112 | c.sendc <- struct{}{} | |
113 | } | |
114 | ||
115 | func (c *ScribeCollector) send(batch []*scribe.LogEntry) error { | |
116 | if c.client == nil { | |
117 | var err error | |
118 | if c.client, err = c.factory(); err != nil { | |
119 | return fmt.Errorf("during reconnect: %v", err) | |
120 | } | |
121 | } | |
122 | if rc, err := c.client.Log(c.batch); err != nil { | |
123 | c.client = nil | |
124 | return fmt.Errorf("during Log: %v", err) | |
125 | } else if rc != scribe.ResultCode_OK { | |
126 | // probably transient error; don't reset client | |
127 | return fmt.Errorf("remote returned %s", rc) | |
128 | } | |
129 | return nil | |
130 | } | |
131 | ||
132 | // ScribeOption sets a parameter for the StdlibAdapter. | |
133 | type ScribeOption func(s *ScribeCollector) | |
134 | ||
135 | // ScribeBatchSize sets the maximum batch size, after which a collect will be | |
136 | // triggered. The default batch size is 100 traces. | |
137 | func ScribeBatchSize(n int) ScribeOption { | |
138 | return func(s *ScribeCollector) { s.batchSize = n } | |
139 | } | |
140 | ||
141 | // ScribeBatchInterval sets the maximum duration we will buffer traces before | |
142 | // emitting them to the collector. The default batch interval is 1 second. | |
143 | func ScribeBatchInterval(d time.Duration) ScribeOption { | |
144 | return func(s *ScribeCollector) { s.batchInterval = d } | |
145 | } | |
146 | ||
147 | // ScribeSampleRate sets the sample rate used to determine if a trace will be | |
148 | // sent to the collector. By default, the sample rate is 1.0, i.e. all traces | |
149 | // are sent. | |
150 | func ScribeSampleRate(f float64) ScribeOption { | |
151 | return func(s *ScribeCollector) { s.sampleRate = f } | |
152 | } | |
153 | ||
154 | // ScribeLogger sets the logger used to report errors in the collection | |
155 | // process. By default, a no-op logger is used, i.e. no errors are logged | |
156 | // anywhere. It's important to set this option in a production service. | |
157 | func ScribeLogger(logger log.Logger) ScribeOption { | |
158 | return func(s *ScribeCollector) { s.logger = logger } | |
159 | } | |
160 | ||
161 | func scribeClientFactory(addr string, timeout time.Duration) func() (scribe.Scribe, error) { | |
162 | return func() (scribe.Scribe, error) { | |
163 | a, err := net.ResolveTCPAddr("tcp", addr) | |
164 | if err != nil { | |
165 | return nil, err | |
166 | } | |
167 | socket := thrift.NewTSocketFromAddrTimeout(a, timeout) | |
168 | transport := thrift.NewTFramedTransport(socket) | |
169 | if err := transport.Open(); err != nil { | |
170 | socket.Close() | |
171 | return nil, err | |
172 | } | |
173 | proto := thrift.NewTBinaryProtocolTransport(transport) | |
174 | client := scribe.NewScribeClientProtocol(transport, proto, proto) | |
175 | return client, nil | |
176 | } | |
177 | } | |
178 | ||
179 | func serialize(s *Span) string { | |
180 | t := thrift.NewTMemoryBuffer() | |
181 | p := thrift.NewTBinaryProtocolTransport(t) | |
182 | if err := s.Encode().Write(p); err != nil { | |
183 | panic(err) | |
184 | } | |
185 | return base64.StdEncoding.EncodeToString(t.Buffer.Bytes()) | |
186 | } | |
187 | ||
188 | func shouldSample(id int64, salt int64, rate float64) bool { | |
189 | if rate <= 0 { | |
190 | return false | |
191 | } | |
192 | if rate >= 1.0 { | |
193 | return true | |
194 | } | |
195 | return int64(math.Abs(float64(id^salt)))%10000 < int64(rate*10000) | |
8 | Close() error | |
196 | 9 | } |
197 | 10 | |
198 | 11 | // NopCollector implements Collector but performs no work. |
201 | 14 | // Collect implements Collector. |
202 | 15 | func (NopCollector) Collect(*Span) error { return nil } |
203 | 16 | |
17 | // Close implements Collector. | |
18 | func (NopCollector) Close() error { return nil } | |
19 | ||
204 | 20 | // MultiCollector implements Collector by sending spans to all collectors. |
205 | 21 | type MultiCollector []Collector |
206 | 22 | |
207 | 23 | // Collect implements Collector. |
208 | 24 | func (c MultiCollector) Collect(s *Span) error { |
25 | return c.aggregateErrors(func(coll Collector) error { return coll.Collect(s) }) | |
26 | } | |
27 | ||
28 | // Close implements Collector. | |
29 | func (c MultiCollector) Close() error { | |
30 | return c.aggregateErrors(func(coll Collector) error { return coll.Close() }) | |
31 | } | |
32 | ||
33 | func (c MultiCollector) aggregateErrors(f func(c Collector) error) error { | |
34 | var e *collectionError | |
35 | for i, collector := range c { | |
36 | if err := f(collector); err != nil { | |
37 | if e == nil { | |
38 | e = &collectionError{ | |
39 | errs: make([]error, len(c)), | |
40 | } | |
41 | } | |
42 | e.errs[i] = err | |
43 | } | |
44 | } | |
45 | return e | |
46 | } | |
47 | ||
48 | // CollectionError represents an array of errors returned by one or more | |
49 | // failed Collector methods. | |
50 | type CollectionError interface { | |
51 | Error() string | |
52 | GetErrors() []error | |
53 | } | |
54 | ||
55 | type collectionError struct { | |
56 | errs []error | |
57 | } | |
58 | ||
59 | func (c *collectionError) Error() string { | |
209 | 60 | errs := []string{} |
210 | for _, collector := range c { | |
211 | if err := collector.Collect(s); err != nil { | |
61 | for _, err := range c.errs { | |
62 | if err != nil { | |
212 | 63 | errs = append(errs, err.Error()) |
213 | 64 | } |
214 | 65 | } |
215 | if len(errs) > 0 { | |
216 | return errors.New(strings.Join(errs, "; ")) | |
217 | } | |
218 | return nil | |
66 | return strings.Join(errs, "; ") | |
219 | 67 | } |
68 | ||
69 | // GetErrors implements CollectionError | |
70 | func (c *collectionError) GetErrors() []error { | |
71 | return c.errs | |
72 | } |
0 | package zipkin | |
1 | ||
2 | import "testing" | |
3 | ||
4 | func TestShouldSample(t *testing.T) { | |
5 | type triple struct { | |
6 | id, salt int64 | |
7 | rate float64 | |
8 | } | |
9 | for input, want := range map[triple]bool{ | |
10 | triple{123, 456, 1.0}: true, | |
11 | triple{123, 456, 999}: true, | |
12 | triple{123, 456, 0.0}: false, | |
13 | triple{123, 456, -42}: false, | |
14 | triple{1229998, 0, 0.01}: false, | |
15 | triple{1229999, 0, 0.01}: false, | |
16 | triple{1230000, 0, 0.01}: true, | |
17 | triple{1230001, 0, 0.01}: true, | |
18 | triple{1230098, 0, 0.01}: true, | |
19 | triple{1230099, 0, 0.01}: true, | |
20 | triple{1230100, 0, 0.01}: false, | |
21 | triple{1230101, 0, 0.01}: false, | |
22 | triple{1, 9999999, 0.01}: false, | |
23 | triple{999, 0, 0.99}: true, | |
24 | triple{9999, 0, 0.99}: false, | |
25 | } { | |
26 | if have := shouldSample(input.id, input.salt, input.rate); want != have { | |
27 | t.Errorf("%#+v: want %v, have %v", input, want, have) | |
28 | } | |
29 | } | |
30 | } |
0 | 0 | package zipkin_test |
1 | 1 | |
2 | 2 | import ( |
3 | "encoding/base64" | |
4 | 3 | "fmt" |
5 | "math/rand" | |
6 | "net" | |
7 | "sync" | |
8 | 4 | "testing" |
9 | "time" | |
10 | ||
11 | "github.com/apache/thrift/lib/go/thrift" | |
12 | 5 | |
13 | 6 | "github.com/go-kit/kit/tracing/zipkin" |
14 | "github.com/go-kit/kit/tracing/zipkin/_thrift/gen-go/scribe" | |
15 | "github.com/go-kit/kit/tracing/zipkin/_thrift/gen-go/zipkincore" | |
16 | 7 | ) |
17 | 8 | |
18 | func TestScribeCollector(t *testing.T) { | |
19 | server := newScribeServer(t) | |
9 | var s = zipkin.NewSpan("203.0.113.10:1234", "service1", "avg", 123, 456, 0) | |
20 | 10 | |
21 | timeout := time.Second | |
22 | batchInterval := time.Millisecond | |
23 | c, err := zipkin.NewScribeCollector(server.addr(), timeout, zipkin.ScribeBatchSize(0), zipkin.ScribeBatchInterval(batchInterval)) | |
24 | if err != nil { | |
25 | t.Fatal(err) | |
11 | func TestNopCollector(t *testing.T) { | |
12 | c := zipkin.NopCollector{} | |
13 | if err := c.Collect(s); err != nil { | |
14 | t.Error(err) | |
26 | 15 | } |
27 | ||
28 | var ( | |
29 | serviceName = "service" | |
30 | methodName = "method" | |
31 | traceID = int64(123) | |
32 | spanID = int64(456) | |
33 | parentSpanID = int64(0) | |
34 | value = "foo" | |
35 | duration = 42 * time.Millisecond | |
36 | ) | |
37 | ||
38 | span := zipkin.NewSpan("1.2.3.4:1234", serviceName, methodName, traceID, spanID, parentSpanID) | |
39 | span.AnnotateDuration("foo", 42*time.Millisecond) | |
40 | if err := c.Collect(span); err != nil { | |
41 | t.Errorf("error during collection: %v", err) | |
42 | } | |
43 | ||
44 | // Need to yield to the select loop to accept the send request, and then | |
45 | // yield again to the send operation to write to the socket. I think the | |
46 | // best way to do that is just give it some time. | |
47 | ||
48 | deadline := time.Now().Add(1 * time.Second) | |
49 | for { | |
50 | if time.Now().After(deadline) { | |
51 | t.Fatalf("never received a span") | |
52 | } | |
53 | if want, have := 1, len(server.spans()); want != have { | |
54 | time.Sleep(time.Millisecond) | |
55 | continue | |
56 | } | |
57 | break | |
58 | } | |
59 | ||
60 | gotSpan := server.spans()[0] | |
61 | if want, have := methodName, gotSpan.GetName(); want != have { | |
62 | t.Errorf("want %q, have %q", want, have) | |
63 | } | |
64 | if want, have := traceID, gotSpan.GetTraceId(); want != have { | |
65 | t.Errorf("want %d, have %d", want, have) | |
66 | } | |
67 | if want, have := spanID, gotSpan.GetId(); want != have { | |
68 | t.Errorf("want %d, have %d", want, have) | |
69 | } | |
70 | if want, have := parentSpanID, gotSpan.GetParentId(); want != have { | |
71 | t.Errorf("want %d, have %d", want, have) | |
72 | } | |
73 | ||
74 | if want, have := 1, len(gotSpan.GetAnnotations()); want != have { | |
75 | t.Fatalf("want %d, have %d", want, have) | |
76 | } | |
77 | ||
78 | gotAnnotation := gotSpan.GetAnnotations()[0] | |
79 | if want, have := value, gotAnnotation.GetValue(); want != have { | |
80 | t.Errorf("want %q, have %q", want, have) | |
81 | } | |
82 | if want, have := duration, time.Duration(gotAnnotation.GetDuration())*time.Microsecond; want != have { | |
83 | t.Errorf("want %s, have %s", want, have) | |
16 | if err := c.Close(); err != nil { | |
17 | t.Error(err) | |
84 | 18 | } |
85 | 19 | } |
86 | 20 | |
87 | type scribeServer struct { | |
88 | t *testing.T | |
89 | transport *thrift.TServerSocket | |
90 | address string | |
91 | server *thrift.TSimpleServer | |
92 | handler *scribeHandler | |
21 | type stubCollector struct { | |
22 | errid int | |
23 | collected bool | |
24 | closed bool | |
93 | 25 | } |
94 | 26 | |
95 | func newScribeServer(t *testing.T) *scribeServer { | |
96 | protocolFactory := thrift.NewTBinaryProtocolFactoryDefault() | |
97 | transportFactory := thrift.NewTFramedTransportFactory(thrift.NewTTransportFactory()) | |
27 | func (c *stubCollector) Collect(*zipkin.Span) error { | |
28 | c.collected = true | |
29 | if c.errid != 0 { | |
30 | return fmt.Errorf("error %d", c.errid) | |
31 | } | |
32 | return nil | |
33 | } | |
98 | 34 | |
99 | var port int | |
100 | var transport *thrift.TServerSocket | |
101 | var err error | |
102 | for i := 0; i < 10; i++ { | |
103 | port = 10000 + rand.Intn(10000) | |
104 | transport, err = thrift.NewTServerSocket(fmt.Sprintf(":%d", port)) | |
105 | if err != nil { | |
106 | t.Logf("port %d: %v", port, err) | |
107 | continue | |
108 | } | |
109 | break | |
35 | func (c *stubCollector) Close() error { | |
36 | c.closed = true | |
37 | if c.errid != 0 { | |
38 | return fmt.Errorf("error %d", c.errid) | |
110 | 39 | } |
111 | if err != nil { | |
112 | t.Fatal(err) | |
40 | return nil | |
41 | } | |
42 | ||
43 | func TestMultiCollector(t *testing.T) { | |
44 | cs := zipkin.MultiCollector{ | |
45 | &stubCollector{errid: 1}, | |
46 | &stubCollector{}, | |
47 | &stubCollector{errid: 2}, | |
48 | } | |
49 | err := cs.Collect(s) | |
50 | if err == nil { | |
51 | t.Fatal("wanted error, got none") | |
52 | } | |
53 | if want, have := "error 1; error 2", err.Error(); want != have { | |
54 | t.Errorf("want %q, have %q", want, have) | |
55 | } | |
56 | collectionError := err.(zipkin.CollectionError).GetErrors() | |
57 | if want, have := 3, len(collectionError); want != have { | |
58 | t.Fatalf("want %d, have %d", want, have) | |
59 | } | |
60 | if want, have := cs[0].Collect(s).Error(), collectionError[0].Error(); want != have { | |
61 | t.Errorf("want %q, have %q", want, have) | |
62 | } | |
63 | if want, have := cs[1].Collect(s), collectionError[1]; want != have { | |
64 | t.Errorf("want %q, have %q", want, have) | |
65 | } | |
66 | if want, have := cs[2].Collect(s).Error(), collectionError[2].Error(); want != have { | |
67 | t.Errorf("want %q, have %q", want, have) | |
113 | 68 | } |
114 | 69 | |
115 | handler := newScribeHandler(t) | |
116 | server := thrift.NewTSimpleServer4( | |
117 | scribe.NewScribeProcessor(handler), | |
118 | transport, | |
119 | transportFactory, | |
120 | protocolFactory, | |
121 | ) | |
122 | ||
123 | go server.Serve() | |
124 | ||
125 | deadline := time.Now().Add(time.Second) | |
126 | for !canConnect(port) { | |
127 | if time.Now().After(deadline) { | |
128 | t.Fatal("server never started") | |
70 | for _, c := range cs { | |
71 | if !c.(*stubCollector).collected { | |
72 | t.Error("collect not called") | |
129 | 73 | } |
130 | time.Sleep(time.Millisecond) | |
131 | } | |
132 | ||
133 | return &scribeServer{ | |
134 | transport: transport, | |
135 | address: fmt.Sprintf("127.0.0.1:%d", port), | |
136 | handler: handler, | |
137 | 74 | } |
138 | 75 | } |
139 | 76 | |
140 | func (s *scribeServer) addr() string { | |
141 | return s.address | |
77 | func TestMultiCollectorClose(t *testing.T) { | |
78 | cs := zipkin.MultiCollector{ | |
79 | &stubCollector{errid: 1}, | |
80 | &stubCollector{}, | |
81 | &stubCollector{errid: 2}, | |
82 | } | |
83 | err := cs.Close() | |
84 | if err == nil { | |
85 | t.Fatal("wanted error, got none") | |
86 | } | |
87 | if want, have := "error 1; error 2", err.Error(); want != have { | |
88 | t.Errorf("want %q, have %q", want, have) | |
89 | } | |
90 | ||
91 | for _, c := range cs { | |
92 | if !c.(*stubCollector).closed { | |
93 | t.Error("close not called") | |
94 | } | |
95 | } | |
142 | 96 | } |
143 | ||
144 | func (s *scribeServer) spans() []*zipkincore.Span { | |
145 | return s.handler.spans() | |
146 | } | |
147 | ||
148 | type scribeHandler struct { | |
149 | t *testing.T | |
150 | sync.RWMutex | |
151 | entries []*scribe.LogEntry | |
152 | } | |
153 | ||
154 | func newScribeHandler(t *testing.T) *scribeHandler { | |
155 | return &scribeHandler{t: t} | |
156 | } | |
157 | ||
158 | func (h *scribeHandler) Log(messages []*scribe.LogEntry) (scribe.ResultCode, error) { | |
159 | h.Lock() | |
160 | defer h.Unlock() | |
161 | for _, m := range messages { | |
162 | h.entries = append(h.entries, m) | |
163 | } | |
164 | return scribe.ResultCode_OK, nil | |
165 | } | |
166 | ||
167 | func (h *scribeHandler) spans() []*zipkincore.Span { | |
168 | h.RLock() | |
169 | defer h.RUnlock() | |
170 | spans := []*zipkincore.Span{} | |
171 | for _, m := range h.entries { | |
172 | decoded, err := base64.StdEncoding.DecodeString(m.GetMessage()) | |
173 | if err != nil { | |
174 | h.t.Error(err) | |
175 | continue | |
176 | } | |
177 | buffer := thrift.NewTMemoryBuffer() | |
178 | if _, err := buffer.Write(decoded); err != nil { | |
179 | h.t.Error(err) | |
180 | continue | |
181 | } | |
182 | transport := thrift.NewTBinaryProtocolTransport(buffer) | |
183 | zs := &zipkincore.Span{} | |
184 | if err := zs.Read(transport); err != nil { | |
185 | h.t.Error(err) | |
186 | continue | |
187 | } | |
188 | spans = append(spans, zs) | |
189 | } | |
190 | return spans | |
191 | } | |
192 | ||
193 | func canConnect(port int) bool { | |
194 | c, err := net.Dial("tcp", fmt.Sprintf("127.0.0.1:%d", port)) | |
195 | if err != nil { | |
196 | return false | |
197 | } | |
198 | c.Close() | |
199 | return true | |
200 | } |
0 | package zipkin | |
1 | ||
2 | import ( | |
3 | "math/rand" | |
4 | ||
5 | "github.com/apache/thrift/lib/go/thrift" | |
6 | "gopkg.in/Shopify/sarama.v1" | |
7 | ||
8 | "github.com/go-kit/kit/log" | |
9 | ) | |
10 | ||
11 | // defaultKafkaTopic sets the standard Kafka topic our Collector will publish | |
12 | // on. The default topic for zipkin-receiver-kafka is "zipkin", see: | |
13 | // https://github.com/openzipkin/zipkin/tree/master/zipkin-receiver-kafka | |
14 | const defaultKafkaTopic = "zipkin" | |
15 | ||
16 | // KafkaCollector implements Collector by publishing spans to a Kafka | |
17 | // broker. | |
18 | type KafkaCollector struct { | |
19 | producer sarama.AsyncProducer | |
20 | logger log.Logger | |
21 | topic string | |
22 | shouldSample Sampler | |
23 | } | |
24 | ||
25 | // KafkaOption sets a parameter for the KafkaCollector | |
26 | type KafkaOption func(c *KafkaCollector) | |
27 | ||
28 | // KafkaLogger sets the logger used to report errors in the collection | |
29 | // process. By default, a no-op logger is used, i.e. no errors are logged | |
30 | // anywhere. It's important to set this option. | |
31 | func KafkaLogger(logger log.Logger) KafkaOption { | |
32 | return func(c *KafkaCollector) { c.logger = logger } | |
33 | } | |
34 | ||
35 | // KafkaProducer sets the producer used to produce to Kafka. | |
36 | func KafkaProducer(p sarama.AsyncProducer) KafkaOption { | |
37 | return func(c *KafkaCollector) { c.producer = p } | |
38 | } | |
39 | ||
40 | // KafkaTopic sets the kafka topic to attach the collector producer on. | |
41 | func KafkaTopic(t string) KafkaOption { | |
42 | return func(c *KafkaCollector) { c.topic = t } | |
43 | } | |
44 | ||
45 | // KafkaSampleRate sets the sample rate used to determine if a trace will be | |
46 | // sent to the collector. By default, the sample rate is 1.0, i.e. all traces | |
47 | // are sent. | |
48 | func KafkaSampleRate(sr Sampler) KafkaOption { | |
49 | return func(c *KafkaCollector) { c.shouldSample = sr } | |
50 | } | |
51 | ||
52 | // NewKafkaCollector returns a new Kafka-backed Collector. addrs should be a | |
53 | // slice of TCP endpoints of the form "host:port". | |
54 | func NewKafkaCollector(addrs []string, options ...KafkaOption) (Collector, error) { | |
55 | c := &KafkaCollector{ | |
56 | logger: log.NewNopLogger(), | |
57 | topic: defaultKafkaTopic, | |
58 | shouldSample: SampleRate(1.0, rand.Int63()), | |
59 | } | |
60 | ||
61 | for _, option := range options { | |
62 | option(c) | |
63 | } | |
64 | ||
65 | if c.producer == nil { | |
66 | p, err := sarama.NewAsyncProducer(addrs, nil) | |
67 | if err != nil { | |
68 | return nil, err | |
69 | } | |
70 | c.producer = p | |
71 | } | |
72 | ||
73 | go c.logErrors() | |
74 | ||
75 | return c, nil | |
76 | } | |
77 | ||
78 | func (c *KafkaCollector) logErrors() { | |
79 | for pe := range c.producer.Errors() { | |
80 | c.logger.Log("msg", pe.Msg, "err", pe.Err, "result", "failed to produce msg") | |
81 | } | |
82 | } | |
83 | ||
84 | // Collect implements Collector. | |
85 | func (c *KafkaCollector) Collect(s *Span) error { | |
86 | if c.shouldSample(s.traceID) { | |
87 | c.producer.Input() <- &sarama.ProducerMessage{ | |
88 | Topic: c.topic, | |
89 | Key: nil, | |
90 | Value: sarama.ByteEncoder(kafkaSerialize(s)), | |
91 | } | |
92 | } | |
93 | return nil | |
94 | } | |
95 | ||
96 | // Close implements Collector. | |
97 | func (c *KafkaCollector) Close() error { | |
98 | return c.producer.Close() | |
99 | } | |
100 | ||
101 | func kafkaSerialize(s *Span) []byte { | |
102 | t := thrift.NewTMemoryBuffer() | |
103 | p := thrift.NewTBinaryProtocolTransport(t) | |
104 | if err := s.Encode().Write(p); err != nil { | |
105 | panic(err) | |
106 | } | |
107 | return t.Buffer.Bytes() | |
108 | } |
0 | package zipkin_test | |
1 | ||
2 | import ( | |
3 | "errors" | |
4 | "testing" | |
5 | "time" | |
6 | ||
7 | "github.com/apache/thrift/lib/go/thrift" | |
8 | "gopkg.in/Shopify/sarama.v1" | |
9 | ||
10 | "github.com/go-kit/kit/log" | |
11 | "github.com/go-kit/kit/tracing/zipkin" | |
12 | "github.com/go-kit/kit/tracing/zipkin/_thrift/gen-go/zipkincore" | |
13 | ) | |
14 | ||
15 | type stubProducer struct { | |
16 | in chan *sarama.ProducerMessage | |
17 | err chan *sarama.ProducerError | |
18 | kdown bool | |
19 | closed bool | |
20 | } | |
21 | ||
22 | func (p *stubProducer) AsyncClose() {} | |
23 | func (p *stubProducer) Close() error { | |
24 | if p.kdown { | |
25 | return errors.New("Kafka is down") | |
26 | } | |
27 | p.closed = true | |
28 | return nil | |
29 | } | |
30 | func (p *stubProducer) Input() chan<- *sarama.ProducerMessage { return p.in } | |
31 | func (p *stubProducer) Successes() <-chan *sarama.ProducerMessage { return nil } | |
32 | func (p *stubProducer) Errors() <-chan *sarama.ProducerError { return p.err } | |
33 | ||
34 | func newStubProducer(kdown bool) *stubProducer { | |
35 | return &stubProducer{ | |
36 | make(chan *sarama.ProducerMessage), | |
37 | make(chan *sarama.ProducerError), | |
38 | kdown, | |
39 | false, | |
40 | } | |
41 | } | |
42 | ||
43 | var spans = []*zipkin.Span{ | |
44 | zipkin.NewSpan("203.0.113.10:1234", "service1", "avg", 123, 456, 0), | |
45 | zipkin.NewSpan("203.0.113.10:1234", "service2", "sum", 123, 789, 456), | |
46 | zipkin.NewSpan("203.0.113.10:1234", "service2", "div", 123, 101112, 456), | |
47 | } | |
48 | ||
49 | func TestKafkaProduce(t *testing.T) { | |
50 | p := newStubProducer(false) | |
51 | c, err := zipkin.NewKafkaCollector( | |
52 | []string{"192.0.2.10:9092"}, zipkin.KafkaProducer(p), | |
53 | ) | |
54 | if err != nil { | |
55 | t.Fatal(err) | |
56 | } | |
57 | ||
58 | for _, want := range spans { | |
59 | m := collectSpan(t, c, p, want) | |
60 | testMetadata(t, m) | |
61 | got := deserializeSpan(t, m.Value) | |
62 | testEqual(t, want, got) | |
63 | } | |
64 | } | |
65 | ||
66 | func TestKafkaClose(t *testing.T) { | |
67 | p := newStubProducer(false) | |
68 | c, err := zipkin.NewKafkaCollector( | |
69 | []string{"192.0.2.10:9092"}, zipkin.KafkaProducer(p), | |
70 | ) | |
71 | if err != nil { | |
72 | t.Fatal(err) | |
73 | } | |
74 | if err = c.Close(); err != nil { | |
75 | t.Fatal(err) | |
76 | } | |
77 | if !p.closed { | |
78 | t.Fatal("producer not closed") | |
79 | } | |
80 | } | |
81 | ||
82 | func TestKafkaCloseError(t *testing.T) { | |
83 | p := newStubProducer(true) | |
84 | c, err := zipkin.NewKafkaCollector( | |
85 | []string{"192.0.2.10:9092"}, zipkin.KafkaProducer(p), | |
86 | ) | |
87 | if err != nil { | |
88 | t.Fatal(err) | |
89 | } | |
90 | if err = c.Close(); err == nil { | |
91 | t.Error("no error on close") | |
92 | } | |
93 | } | |
94 | ||
95 | func TestKafkaErrors(t *testing.T) { | |
96 | p := newStubProducer(true) | |
97 | errs := make(chan []interface{}, len(spans)) | |
98 | lg := log.Logger(log.LoggerFunc(func(keyvals ...interface{}) error { | |
99 | for i := 0; i < len(keyvals); i += 2 { | |
100 | if keyvals[i] == "result" && keyvals[i+1] == "failed to produce msg" { | |
101 | errs <- keyvals | |
102 | } | |
103 | } | |
104 | return nil | |
105 | })) | |
106 | c, err := zipkin.NewKafkaCollector( | |
107 | []string{"192.0.2.10:9092"}, | |
108 | zipkin.KafkaProducer(p), | |
109 | zipkin.KafkaLogger(lg), | |
110 | ) | |
111 | if err != nil { | |
112 | t.Fatal(err) | |
113 | } | |
114 | for _, want := range spans { | |
115 | _ = collectSpan(t, c, p, want) | |
116 | } | |
117 | ||
118 | for i := 0; i < len(spans); i++ { | |
119 | select { | |
120 | case <-errs: | |
121 | case <-time.After(100 * time.Millisecond): | |
122 | t.Fatalf("errors not logged. got %d, wanted %d", i, len(spans)) | |
123 | } | |
124 | } | |
125 | } | |
126 | ||
127 | func collectSpan(t *testing.T, c zipkin.Collector, p *stubProducer, s *zipkin.Span) *sarama.ProducerMessage { | |
128 | var m *sarama.ProducerMessage | |
129 | rcvd := make(chan bool, 1) | |
130 | go func() { | |
131 | select { | |
132 | case m = <-p.in: | |
133 | rcvd <- true | |
134 | if p.kdown { | |
135 | p.err <- &sarama.ProducerError{m, errors.New("kafka is down")} | |
136 | } | |
137 | case <-time.After(100 * time.Millisecond): | |
138 | rcvd <- false | |
139 | } | |
140 | }() | |
141 | ||
142 | if err := c.Collect(s); err != nil { | |
143 | t.Errorf("error during collection: %v", err) | |
144 | } | |
145 | if !<-rcvd { | |
146 | t.Fatal("span message was not produced") | |
147 | } | |
148 | return m | |
149 | } | |
150 | ||
151 | func testMetadata(t *testing.T, m *sarama.ProducerMessage) { | |
152 | if m.Topic != "zipkin" { | |
153 | t.Errorf("produced to topic %q, want %q", m.Topic, "zipkin") | |
154 | } | |
155 | if m.Key != nil { | |
156 | t.Errorf("produced with key %q, want nil", m.Key) | |
157 | } | |
158 | } | |
159 | ||
160 | func deserializeSpan(t *testing.T, e sarama.Encoder) *zipkincore.Span { | |
161 | bytes, err := e.Encode() | |
162 | if err != nil { | |
163 | t.Errorf("error in encoding: %v", err) | |
164 | } | |
165 | s := zipkincore.NewSpan() | |
166 | mb := thrift.NewTMemoryBufferLen(len(bytes)) | |
167 | mb.Write(bytes) | |
168 | mb.Flush() | |
169 | pt := thrift.NewTBinaryProtocolTransport(mb) | |
170 | err = s.Read(pt) | |
171 | if err != nil { | |
172 | t.Errorf("error in decoding: %v", err) | |
173 | } | |
174 | return s | |
175 | } | |
176 | ||
177 | func testEqual(t *testing.T, want *zipkin.Span, got *zipkincore.Span) { | |
178 | if got.TraceId != want.TraceID() { | |
179 | t.Errorf("trace_id %d, want %d", got.TraceId, want.TraceID()) | |
180 | } | |
181 | if got.Id != want.SpanID() { | |
182 | t.Errorf("id %d, want %d", got.Id, want.SpanID()) | |
183 | } | |
184 | if got.ParentId == nil { | |
185 | if want.ParentSpanID() != 0 { | |
186 | t.Errorf("parent_id %d, want %d", got.ParentId, want.ParentSpanID()) | |
187 | } | |
188 | } else if *got.ParentId != want.ParentSpanID() { | |
189 | t.Errorf("parent_id %d, want %d", got.ParentId, want.ParentSpanID()) | |
190 | } | |
191 | } |
0 | package zipkin | |
1 | ||
2 | import "math" | |
3 | ||
4 | // Sampler functions return if a Zipkin span should be sampled, based on its | |
5 | // traceID. | |
6 | type Sampler func(id int64) bool | |
7 | ||
8 | // SampleRate returns a sampler function using a particular sample rate and a | |
9 | // sample salt to identify if a Zipkin span based on its spanID should be | |
10 | // collected. | |
11 | func SampleRate(rate float64, salt int64) Sampler { | |
12 | if rate <= 0 { | |
13 | return func(_ int64) bool { | |
14 | return false | |
15 | } | |
16 | } | |
17 | if rate >= 1.0 { | |
18 | return func(_ int64) bool { | |
19 | return true | |
20 | } | |
21 | } | |
22 | return func(id int64) bool { | |
23 | return int64(math.Abs(float64(id^salt)))%10000 < int64(rate*10000) | |
24 | } | |
25 | } |
0 | package zipkin | |
1 | ||
2 | import "testing" | |
3 | ||
4 | func TestSampleRate(t *testing.T) { | |
5 | type triple struct { | |
6 | id, salt int64 | |
7 | rate float64 | |
8 | } | |
9 | for input, want := range map[triple]bool{ | |
10 | triple{123, 456, 1.0}: true, | |
11 | triple{123, 456, 999}: true, | |
12 | triple{123, 456, 0.0}: false, | |
13 | triple{123, 456, -42}: false, | |
14 | triple{1229998, 0, 0.01}: false, | |
15 | triple{1229999, 0, 0.01}: false, | |
16 | triple{1230000, 0, 0.01}: true, | |
17 | triple{1230001, 0, 0.01}: true, | |
18 | triple{1230098, 0, 0.01}: true, | |
19 | triple{1230099, 0, 0.01}: true, | |
20 | triple{1230100, 0, 0.01}: false, | |
21 | triple{1230101, 0, 0.01}: false, | |
22 | triple{1, 9999999, 0.01}: false, | |
23 | triple{999, 0, 0.99}: true, | |
24 | triple{9999, 0, 0.99}: false, | |
25 | } { | |
26 | sampler := SampleRate(input.rate, input.salt) | |
27 | if have := sampler(input.id); want != have { | |
28 | t.Errorf("%#+v: want %v, have %v", input, want, have) | |
29 | } | |
30 | } | |
31 | } |
0 | package zipkin | |
1 | ||
2 | import ( | |
3 | "encoding/base64" | |
4 | "fmt" | |
5 | "math/rand" | |
6 | "net" | |
7 | "time" | |
8 | ||
9 | "github.com/apache/thrift/lib/go/thrift" | |
10 | ||
11 | "github.com/go-kit/kit/log" | |
12 | "github.com/go-kit/kit/tracing/zipkin/_thrift/gen-go/scribe" | |
13 | ) | |
14 | ||
15 | const defaultScribeCategory = "zipkin" | |
16 | ||
17 | // defaultBatchInterval in seconds | |
18 | const defaultBatchInterval = 1 | |
19 | ||
20 | // ScribeCollector implements Collector by forwarding spans to a Scribe | |
21 | // service, in batches. | |
22 | type ScribeCollector struct { | |
23 | client scribe.Scribe | |
24 | factory func() (scribe.Scribe, error) | |
25 | spanc chan *Span | |
26 | sendc chan struct{} | |
27 | batch []*scribe.LogEntry | |
28 | nextSend time.Time | |
29 | batchInterval time.Duration | |
30 | batchSize int | |
31 | shouldSample Sampler | |
32 | logger log.Logger | |
33 | category string | |
34 | quit chan struct{} | |
35 | } | |
36 | ||
37 | // NewScribeCollector returns a new Scribe-backed Collector. addr should be a | |
38 | // TCP endpoint of the form "host:port". timeout is passed to the Thrift dial | |
39 | // function NewTSocketFromAddrTimeout. batchSize and batchInterval control the | |
40 | // maximum size and interval of a batch of spans; as soon as either limit is | |
41 | // reached, the batch is sent. The logger is used to log errors, such as batch | |
42 | // send failures; users should provide an appropriate context, if desired. | |
43 | func NewScribeCollector(addr string, timeout time.Duration, options ...ScribeOption) (Collector, error) { | |
44 | factory := scribeClientFactory(addr, timeout) | |
45 | client, err := factory() | |
46 | if err != nil { | |
47 | return nil, err | |
48 | } | |
49 | c := &ScribeCollector{ | |
50 | client: client, | |
51 | factory: factory, | |
52 | spanc: make(chan *Span), | |
53 | sendc: make(chan struct{}), | |
54 | batch: []*scribe.LogEntry{}, | |
55 | batchInterval: defaultBatchInterval * time.Second, | |
56 | batchSize: 100, | |
57 | shouldSample: SampleRate(1.0, rand.Int63()), | |
58 | logger: log.NewNopLogger(), | |
59 | category: defaultScribeCategory, | |
60 | quit: make(chan struct{}), | |
61 | } | |
62 | for _, option := range options { | |
63 | option(c) | |
64 | } | |
65 | c.nextSend = time.Now().Add(c.batchInterval) | |
66 | go c.loop() | |
67 | return c, nil | |
68 | } | |
69 | ||
70 | // Collect implements Collector. | |
71 | func (c *ScribeCollector) Collect(s *Span) error { | |
72 | c.spanc <- s | |
73 | return nil // accepted | |
74 | } | |
75 | ||
76 | // Close implements Collector. | |
77 | func (c *ScribeCollector) Close() error { | |
78 | close(c.quit) | |
79 | return nil | |
80 | } | |
81 | ||
82 | func (c *ScribeCollector) loop() { | |
83 | tickc := time.Tick(c.batchInterval / 10) | |
84 | ||
85 | for { | |
86 | select { | |
87 | case span := <-c.spanc: | |
88 | if !c.shouldSample(span.traceID) { | |
89 | continue | |
90 | } | |
91 | c.batch = append(c.batch, &scribe.LogEntry{ | |
92 | Category: c.category, | |
93 | Message: scribeSerialize(span), | |
94 | }) | |
95 | if len(c.batch) >= c.batchSize { | |
96 | go c.sendNow() | |
97 | } | |
98 | ||
99 | case <-tickc: | |
100 | if time.Now().After(c.nextSend) { | |
101 | go c.sendNow() | |
102 | } | |
103 | ||
104 | case <-c.sendc: | |
105 | c.nextSend = time.Now().Add(c.batchInterval) | |
106 | if err := c.send(c.batch); err != nil { | |
107 | c.logger.Log("err", err.Error()) | |
108 | } | |
109 | c.batch = c.batch[:0] | |
110 | case <-c.quit: | |
111 | return | |
112 | } | |
113 | } | |
114 | } | |
115 | ||
116 | func (c *ScribeCollector) sendNow() { | |
117 | c.sendc <- struct{}{} | |
118 | } | |
119 | ||
120 | func (c *ScribeCollector) send(batch []*scribe.LogEntry) error { | |
121 | if c.client == nil { | |
122 | var err error | |
123 | if c.client, err = c.factory(); err != nil { | |
124 | return fmt.Errorf("during reconnect: %v", err) | |
125 | } | |
126 | } | |
127 | if rc, err := c.client.Log(c.batch); err != nil { | |
128 | c.client = nil | |
129 | return fmt.Errorf("during Log: %v", err) | |
130 | } else if rc != scribe.ResultCode_OK { | |
131 | // probably transient error; don't reset client | |
132 | return fmt.Errorf("remote returned %s", rc) | |
133 | } | |
134 | return nil | |
135 | } | |
136 | ||
137 | // ScribeOption sets a parameter for the StdlibAdapter. | |
138 | type ScribeOption func(s *ScribeCollector) | |
139 | ||
140 | // ScribeBatchSize sets the maximum batch size, after which a collect will be | |
141 | // triggered. The default batch size is 100 traces. | |
142 | func ScribeBatchSize(n int) ScribeOption { | |
143 | return func(s *ScribeCollector) { s.batchSize = n } | |
144 | } | |
145 | ||
146 | // ScribeBatchInterval sets the maximum duration we will buffer traces before | |
147 | // emitting them to the collector. The default batch interval is 1 second. | |
148 | func ScribeBatchInterval(d time.Duration) ScribeOption { | |
149 | return func(s *ScribeCollector) { s.batchInterval = d } | |
150 | } | |
151 | ||
152 | // ScribeSampleRate sets the sample rate used to determine if a trace will be | |
153 | // sent to the collector. By default, the sample rate is 1.0, i.e. all traces | |
154 | // are sent. | |
155 | func ScribeSampleRate(sr Sampler) ScribeOption { | |
156 | return func(s *ScribeCollector) { s.shouldSample = sr } | |
157 | } | |
158 | ||
159 | // ScribeLogger sets the logger used to report errors in the collection | |
160 | // process. By default, a no-op logger is used, i.e. no errors are logged | |
161 | // anywhere. It's important to set this option in a production service. | |
162 | func ScribeLogger(logger log.Logger) ScribeOption { | |
163 | return func(s *ScribeCollector) { s.logger = logger } | |
164 | } | |
165 | ||
166 | // ScribeCategory sets the Scribe category used to transmit the spans. | |
167 | func ScribeCategory(category string) ScribeOption { | |
168 | return func(s *ScribeCollector) { s.category = category } | |
169 | } | |
170 | ||
171 | func scribeClientFactory(addr string, timeout time.Duration) func() (scribe.Scribe, error) { | |
172 | return func() (scribe.Scribe, error) { | |
173 | a, err := net.ResolveTCPAddr("tcp", addr) | |
174 | if err != nil { | |
175 | return nil, err | |
176 | } | |
177 | socket := thrift.NewTSocketFromAddrTimeout(a, timeout) | |
178 | transport := thrift.NewTFramedTransport(socket) | |
179 | if err := transport.Open(); err != nil { | |
180 | socket.Close() | |
181 | return nil, err | |
182 | } | |
183 | proto := thrift.NewTBinaryProtocolTransport(transport) | |
184 | client := scribe.NewScribeClientProtocol(transport, proto, proto) | |
185 | return client, nil | |
186 | } | |
187 | } | |
188 | ||
189 | func scribeSerialize(s *Span) string { | |
190 | t := thrift.NewTMemoryBuffer() | |
191 | p := thrift.NewTBinaryProtocolTransport(t) | |
192 | if err := s.Encode().Write(p); err != nil { | |
193 | panic(err) | |
194 | } | |
195 | return base64.StdEncoding.EncodeToString(t.Buffer.Bytes()) | |
196 | } |
0 | package zipkin_test | |
1 | ||
2 | import ( | |
3 | "encoding/base64" | |
4 | "fmt" | |
5 | "math/rand" | |
6 | "net" | |
7 | "sync" | |
8 | "testing" | |
9 | "time" | |
10 | ||
11 | "github.com/apache/thrift/lib/go/thrift" | |
12 | ||
13 | "github.com/go-kit/kit/tracing/zipkin" | |
14 | "github.com/go-kit/kit/tracing/zipkin/_thrift/gen-go/scribe" | |
15 | "github.com/go-kit/kit/tracing/zipkin/_thrift/gen-go/zipkincore" | |
16 | ) | |
17 | ||
18 | func TestScribeCollector(t *testing.T) { | |
19 | server := newScribeServer(t) | |
20 | ||
21 | timeout := time.Second | |
22 | batchInterval := time.Millisecond | |
23 | c, err := zipkin.NewScribeCollector(server.addr(), timeout, zipkin.ScribeBatchSize(0), zipkin.ScribeBatchInterval(batchInterval)) | |
24 | if err != nil { | |
25 | t.Fatal(err) | |
26 | } | |
27 | ||
28 | var ( | |
29 | serviceName = "service" | |
30 | methodName = "method" | |
31 | traceID = int64(123) | |
32 | spanID = int64(456) | |
33 | parentSpanID = int64(0) | |
34 | value = "foo" | |
35 | duration = 42 * time.Millisecond | |
36 | ) | |
37 | ||
38 | span := zipkin.NewSpan("1.2.3.4:1234", serviceName, methodName, traceID, spanID, parentSpanID) | |
39 | span.AnnotateDuration("foo", 42*time.Millisecond) | |
40 | if err := c.Collect(span); err != nil { | |
41 | t.Errorf("error during collection: %v", err) | |
42 | } | |
43 | ||
44 | // Need to yield to the select loop to accept the send request, and then | |
45 | // yield again to the send operation to write to the socket. I think the | |
46 | // best way to do that is just give it some time. | |
47 | ||
48 | deadline := time.Now().Add(1 * time.Second) | |
49 | for { | |
50 | if time.Now().After(deadline) { | |
51 | t.Fatalf("never received a span") | |
52 | } | |
53 | if want, have := 1, len(server.spans()); want != have { | |
54 | time.Sleep(time.Millisecond) | |
55 | continue | |
56 | } | |
57 | break | |
58 | } | |
59 | ||
60 | gotSpan := server.spans()[0] | |
61 | if want, have := methodName, gotSpan.GetName(); want != have { | |
62 | t.Errorf("want %q, have %q", want, have) | |
63 | } | |
64 | if want, have := traceID, gotSpan.GetTraceId(); want != have { | |
65 | t.Errorf("want %d, have %d", want, have) | |
66 | } | |
67 | if want, have := spanID, gotSpan.GetId(); want != have { | |
68 | t.Errorf("want %d, have %d", want, have) | |
69 | } | |
70 | if want, have := parentSpanID, gotSpan.GetParentId(); want != have { | |
71 | t.Errorf("want %d, have %d", want, have) | |
72 | } | |
73 | ||
74 | if want, have := 1, len(gotSpan.GetAnnotations()); want != have { | |
75 | t.Fatalf("want %d, have %d", want, have) | |
76 | } | |
77 | ||
78 | gotAnnotation := gotSpan.GetAnnotations()[0] | |
79 | if want, have := value, gotAnnotation.GetValue(); want != have { | |
80 | t.Errorf("want %q, have %q", want, have) | |
81 | } | |
82 | if want, have := duration, time.Duration(gotAnnotation.GetDuration())*time.Microsecond; want != have { | |
83 | t.Errorf("want %s, have %s", want, have) | |
84 | } | |
85 | } | |
86 | ||
87 | type scribeServer struct { | |
88 | t *testing.T | |
89 | transport *thrift.TServerSocket | |
90 | address string | |
91 | server *thrift.TSimpleServer | |
92 | handler *scribeHandler | |
93 | } | |
94 | ||
95 | func newScribeServer(t *testing.T) *scribeServer { | |
96 | protocolFactory := thrift.NewTBinaryProtocolFactoryDefault() | |
97 | transportFactory := thrift.NewTFramedTransportFactory(thrift.NewTTransportFactory()) | |
98 | ||
99 | var port int | |
100 | var transport *thrift.TServerSocket | |
101 | var err error | |
102 | for i := 0; i < 10; i++ { | |
103 | port = 10000 + rand.Intn(10000) | |
104 | transport, err = thrift.NewTServerSocket(fmt.Sprintf(":%d", port)) | |
105 | if err != nil { | |
106 | t.Logf("port %d: %v", port, err) | |
107 | continue | |
108 | } | |
109 | break | |
110 | } | |
111 | if err != nil { | |
112 | t.Fatal(err) | |
113 | } | |
114 | ||
115 | handler := newScribeHandler(t) | |
116 | server := thrift.NewTSimpleServer4( | |
117 | scribe.NewScribeProcessor(handler), | |
118 | transport, | |
119 | transportFactory, | |
120 | protocolFactory, | |
121 | ) | |
122 | ||
123 | go server.Serve() | |
124 | ||
125 | deadline := time.Now().Add(time.Second) | |
126 | for !canConnect(port) { | |
127 | if time.Now().After(deadline) { | |
128 | t.Fatal("server never started") | |
129 | } | |
130 | time.Sleep(time.Millisecond) | |
131 | } | |
132 | ||
133 | return &scribeServer{ | |
134 | transport: transport, | |
135 | address: fmt.Sprintf("127.0.0.1:%d", port), | |
136 | handler: handler, | |
137 | } | |
138 | } | |
139 | ||
140 | func (s *scribeServer) addr() string { | |
141 | return s.address | |
142 | } | |
143 | ||
144 | func (s *scribeServer) spans() []*zipkincore.Span { | |
145 | return s.handler.spans() | |
146 | } | |
147 | ||
148 | type scribeHandler struct { | |
149 | t *testing.T | |
150 | sync.RWMutex | |
151 | entries []*scribe.LogEntry | |
152 | } | |
153 | ||
154 | func newScribeHandler(t *testing.T) *scribeHandler { | |
155 | return &scribeHandler{t: t} | |
156 | } | |
157 | ||
158 | func (h *scribeHandler) Log(messages []*scribe.LogEntry) (scribe.ResultCode, error) { | |
159 | h.Lock() | |
160 | defer h.Unlock() | |
161 | for _, m := range messages { | |
162 | h.entries = append(h.entries, m) | |
163 | } | |
164 | return scribe.ResultCode_OK, nil | |
165 | } | |
166 | ||
167 | func (h *scribeHandler) spans() []*zipkincore.Span { | |
168 | h.RLock() | |
169 | defer h.RUnlock() | |
170 | spans := []*zipkincore.Span{} | |
171 | for _, m := range h.entries { | |
172 | decoded, err := base64.StdEncoding.DecodeString(m.GetMessage()) | |
173 | if err != nil { | |
174 | h.t.Error(err) | |
175 | continue | |
176 | } | |
177 | buffer := thrift.NewTMemoryBuffer() | |
178 | if _, err := buffer.Write(decoded); err != nil { | |
179 | h.t.Error(err) | |
180 | continue | |
181 | } | |
182 | transport := thrift.NewTBinaryProtocolTransport(buffer) | |
183 | zs := &zipkincore.Span{} | |
184 | if err := zs.Read(transport); err != nil { | |
185 | h.t.Error(err) | |
186 | continue | |
187 | } | |
188 | spans = append(spans, zs) | |
189 | } | |
190 | return spans | |
191 | } | |
192 | ||
193 | func canConnect(port int) bool { | |
194 | c, err := net.Dial("tcp", fmt.Sprintf("127.0.0.1:%d", port)) | |
195 | if err != nil { | |
196 | return false | |
197 | } | |
198 | c.Close() | |
199 | return true | |
200 | } |
1 | 1 | |
2 | 2 | import ( |
3 | 3 | "encoding/binary" |
4 | "fmt" | |
5 | "math" | |
4 | 6 | "net" |
5 | 7 | "strconv" |
6 | 8 | "time" |
7 | 9 | |
8 | 10 | "github.com/go-kit/kit/tracing/zipkin/_thrift/gen-go/zipkincore" |
9 | ) | |
10 | ||
11 | var ( | |
12 | // SpanContextKey represents the Span in the request context. | |
13 | SpanContextKey = "Zipkin-Span" | |
14 | 11 | ) |
15 | 12 | |
16 | 13 | // A Span is a named collection of annotations. It represents meaningful |
50 | 47 | if err != nil { |
51 | 48 | return nil |
52 | 49 | } |
50 | portInt, err := strconv.ParseInt(port, 10, 16) | |
51 | if err != nil { | |
52 | return nil | |
53 | } | |
53 | 54 | addrs, err := net.LookupIP(host) |
54 | 55 | if err != nil { |
55 | 56 | return nil |
56 | 57 | } |
57 | if len(addrs) <= 0 { | |
58 | return nil | |
59 | } | |
60 | portInt, err := strconv.ParseInt(port, 10, 16) | |
61 | if err != nil { | |
58 | // we need the first IPv4 address. | |
59 | var addr net.IP | |
60 | for i := range addrs { | |
61 | addr = addrs[i].To4() | |
62 | if addr != nil { | |
63 | break | |
64 | } | |
65 | } | |
66 | if addr == nil { | |
67 | // none of the returned addresses is IPv4. | |
62 | 68 | return nil |
63 | 69 | } |
64 | 70 | endpoint := zipkincore.NewEndpoint() |
65 | binary.LittleEndian.PutUint32(addrs[0], (uint32)(endpoint.Ipv4)) | |
71 | endpoint.Ipv4 = (int32)(binary.BigEndian.Uint32(addr)) | |
66 | 72 | endpoint.Port = int16(portInt) |
67 | 73 | endpoint.ServiceName = serviceName |
68 | 74 | return endpoint |
93 | 99 | s.AnnotateDuration(value, 0) |
94 | 100 | } |
95 | 101 | |
96 | // AnnotateBinary annotates the span with a key and a byte value. | |
97 | func (s *Span) AnnotateBinary(key string, value []byte) { | |
102 | // AnnotateBinary annotates the span with a key and a value that will be []byte | |
103 | // encoded. | |
104 | func (s *Span) AnnotateBinary(key string, value interface{}) { | |
105 | var a zipkincore.AnnotationType | |
106 | var b []byte | |
107 | // We are not using zipkincore.AnnotationType_I16 for types that could fit | |
108 | // as reporting on it seems to be broken on the zipkin web interface | |
109 | // (however, we can properly extract the number from zipkin storage | |
110 | // directly). int64 has issues with negative numbers but seems ok for | |
111 | // positive numbers needing more than 32 bit. | |
112 | switch v := value.(type) { | |
113 | case bool: | |
114 | a = zipkincore.AnnotationType_BOOL | |
115 | b = []byte("\x00") | |
116 | if v { | |
117 | b = []byte("\x01") | |
118 | } | |
119 | case []byte: | |
120 | a = zipkincore.AnnotationType_BYTES | |
121 | b = v | |
122 | case byte: | |
123 | a = zipkincore.AnnotationType_I32 | |
124 | b = make([]byte, 4) | |
125 | binary.BigEndian.PutUint32(b, uint32(v)) | |
126 | case int8: | |
127 | a = zipkincore.AnnotationType_I32 | |
128 | b = make([]byte, 4) | |
129 | binary.BigEndian.PutUint32(b, uint32(v)) | |
130 | case int16: | |
131 | a = zipkincore.AnnotationType_I32 | |
132 | b = make([]byte, 4) | |
133 | binary.BigEndian.PutUint32(b, uint32(v)) | |
134 | case uint16: | |
135 | a = zipkincore.AnnotationType_I32 | |
136 | b = make([]byte, 4) | |
137 | binary.BigEndian.PutUint32(b, uint32(v)) | |
138 | case int32: | |
139 | a = zipkincore.AnnotationType_I32 | |
140 | b = make([]byte, 4) | |
141 | binary.BigEndian.PutUint32(b, uint32(v)) | |
142 | case uint32: | |
143 | a = zipkincore.AnnotationType_I32 | |
144 | b = make([]byte, 4) | |
145 | binary.BigEndian.PutUint32(b, uint32(v)) | |
146 | case int64: | |
147 | a = zipkincore.AnnotationType_I64 | |
148 | b = make([]byte, 8) | |
149 | binary.BigEndian.PutUint64(b, uint64(v)) | |
150 | case int: | |
151 | a = zipkincore.AnnotationType_I32 | |
152 | b = make([]byte, 8) | |
153 | binary.BigEndian.PutUint32(b, uint32(v)) | |
154 | case uint: | |
155 | a = zipkincore.AnnotationType_I32 | |
156 | b = make([]byte, 8) | |
157 | binary.BigEndian.PutUint32(b, uint32(v)) | |
158 | case uint64: | |
159 | a = zipkincore.AnnotationType_I64 | |
160 | b = make([]byte, 8) | |
161 | binary.BigEndian.PutUint64(b, uint64(v)) | |
162 | case float32: | |
163 | a = zipkincore.AnnotationType_DOUBLE | |
164 | b = make([]byte, 8) | |
165 | bits := math.Float64bits(float64(v)) | |
166 | binary.BigEndian.PutUint64(b, bits) | |
167 | case float64: | |
168 | a = zipkincore.AnnotationType_DOUBLE | |
169 | b = make([]byte, 8) | |
170 | bits := math.Float64bits(v) | |
171 | binary.BigEndian.PutUint64(b, bits) | |
172 | case string: | |
173 | a = zipkincore.AnnotationType_STRING | |
174 | b = []byte(v) | |
175 | default: | |
176 | // we have no handler for type's value, but let's get a string | |
177 | // representation of it. | |
178 | a = zipkincore.AnnotationType_STRING | |
179 | b = []byte(fmt.Sprintf("%+v", value)) | |
180 | } | |
98 | 181 | s.binaryAnnotations = append(s.binaryAnnotations, binaryAnnotation{ |
99 | 182 | key: key, |
100 | value: value, | |
101 | annotationType: zipkincore.AnnotationType_BYTES, | |
183 | value: b, | |
184 | annotationType: a, | |
102 | 185 | host: s.host, |
103 | 186 | }) |
104 | 187 | } |
24 | 24 | // • https://gist.github.com/yoavaa/3478d3a0df666f21a98c |
25 | 25 | |
26 | 26 | const ( |
27 | // SpanContextKey holds the key used to store Zipkin spans in the context. | |
28 | SpanContextKey = "Zipkin-Span" | |
29 | ||
27 | 30 | // https://github.com/racker/tryfer#headers |
28 | 31 | traceIDHTTPHeader = "X-B3-TraceId" |
29 | 32 | spanIDHTTPHeader = "X-B3-SpanId" |
208 | 208 | } |
209 | 209 | return nil |
210 | 210 | } |
211 | ||
212 | func (c *countingCollector) Close() error { | |
213 | return nil | |
214 | } |
34 | 34 | options ...ServerOption, |
35 | 35 | ) *Server { |
36 | 36 | s := &Server{ |
37 | ctx: ctx, | |
38 | e: e, | |
39 | dec: dec, | |
40 | enc: enc, | |
37 | ctx: ctx, | |
38 | e: e, | |
39 | dec: dec, | |
40 | enc: enc, | |
41 | 41 | logger: log.NewNopLogger(), |
42 | 42 | } |
43 | 43 | for _, option := range options { |
118 | 118 | b := make([]byte, len(testbody)) |
119 | 119 | _, err = response.Body.Read(b) |
120 | 120 | if want, have := io.EOF, err; have != want { |
121 | t.Fatal("want %q, have %q", want, have) | |
121 | t.Fatalf("want %q, have %q", want, have) | |
122 | 122 | } |
123 | 123 | if want, have := testbody, string(b); want != have { |
124 | 124 | t.Errorf("want %q, have %q", want, have) |