Codebase list golang-github-go-kit-kit / 5dbf809
Merge pull request #236 from basvanbeek/child-span Zipkin Updates Peter Bourgon 7 years ago
13 changed file(s) with 508 addition(s) and 165 deletion(s). Raw diff Collapse all Expand all
3535 // of glog. So, we define a new flag set, to keep those domains distinct.
3636 fs := flag.NewFlagSet("", flag.ExitOnError)
3737 var (
38 debugAddr = fs.String("debug.addr", ":8000", "Address for HTTP debug/instrumentation server")
39 httpAddr = fs.String("http.addr", ":8001", "Address for HTTP (JSON) server")
40 grpcAddr = fs.String("grpc.addr", ":8002", "Address for gRPC server")
41 netrpcAddr = fs.String("netrpc.addr", ":8003", "Address for net/rpc server")
42 thriftAddr = fs.String("thrift.addr", ":8004", "Address for Thrift server")
43 thriftProtocol = fs.String("thrift.protocol", "binary", "binary, compact, json, simplejson")
44 thriftBufferSize = fs.Int("thrift.buffer.size", 0, "0 for unbuffered")
45 thriftFramed = fs.Bool("thrift.framed", false, "true to enable framing")
46 zipkinHostPort = fs.String("zipkin.host.port", "my.service.domain:12345", "Zipkin host:port")
47 zipkinServiceName = fs.String("zipkin.service.name", "addsvc", "Zipkin service name")
48 zipkinCollectorAddr = fs.String("zipkin.collector.addr", "", "Zipkin Scribe collector address (empty will log spans)")
49 zipkinCollectorTimeout = fs.Duration("zipkin.collector.timeout", time.Second, "Zipkin collector timeout")
50 zipkinCollectorBatchSize = fs.Int("zipkin.collector.batch.size", 100, "Zipkin collector batch size")
51 zipkinCollectorBatchInterval = fs.Duration("zipkin.collector.batch.interval", time.Second, "Zipkin collector batch interval")
38 debugAddr = fs.String("debug.addr", ":8000", "Address for HTTP debug/instrumentation server")
39 httpAddr = fs.String("http.addr", ":8001", "Address for HTTP (JSON) server")
40 grpcAddr = fs.String("grpc.addr", ":8002", "Address for gRPC server")
41 netrpcAddr = fs.String("netrpc.addr", ":8003", "Address for net/rpc server")
42 thriftAddr = fs.String("thrift.addr", ":8004", "Address for Thrift server")
43 thriftProtocol = fs.String("thrift.protocol", "binary", "binary, compact, json, simplejson")
44 thriftBufferSize = fs.Int("thrift.buffer.size", 0, "0 for unbuffered")
45 thriftFramed = fs.Bool("thrift.framed", false, "true to enable framing")
46 zipkinHostPort = fs.String("zipkin.host.port", "my.service.domain:12345", "Zipkin host:port")
47 zipkinServiceName = fs.String("zipkin.service.name", "addsvc", "Zipkin service name")
48 zipkinCollectorAddr = fs.String("zipkin.collector.addr", "", "Zipkin Kafka collector address (empty will log spans)")
5249 )
5350 flag.Usage = fs.Usage // only show our flags
5451 if err := fs.Parse(os.Args[1:]); err != nil {
8784 collector = loggingCollector{zipkinLogger} // TODO(pb)
8885 if *zipkinCollectorAddr != "" {
8986 var err error
90 if collector, err = zipkin.NewScribeCollector(
91 *zipkinCollectorAddr,
92 *zipkinCollectorTimeout,
93 zipkin.ScribeBatchSize(*zipkinCollectorBatchSize),
94 zipkin.ScribeBatchInterval(*zipkinCollectorBatchInterval),
95 zipkin.ScribeLogger(zipkinLogger),
87 if collector, err = zipkin.NewKafkaCollector(
88 []string{*zipkinCollectorAddr},
89 zipkin.KafkaLogger(zipkinLogger),
9690 ); err != nil {
9791 zipkinLogger.Log("err", err)
9892 os.Exit(1)
256250 return nil
257251 }
258252
253 func (c loggingCollector) ShouldSample(*zipkin.Span) bool { return true }
254
259255 func (c loggingCollector) Close() error { return nil }
1313 hot spots, and diagnosing errors. All microservice infrastructures will
1414 benefit from request tracing; sufficiently large infrastructures will require
1515 it.
16
17 ## Test Setup
18
19 Setting up [Zipkin] is not an easy thing to do. It will also demand quite some
20 resources. To help you get started with development and testing we've made a
21 [VirtualBox] image available through [Vagrant] (*The box will require about 6GB
22 internal memory*).
23
24 First make sure you've installed [Vagrant] on your machine. Then you can
25 download and run the [Vagrant] image like this from the command line:
26
27 ```
28 # create a new directory to store your vagrant configuration and image files
29 mkdir zipkin
30 cd zipkin
31 vagrant init bas303/zipkin
32 vagrant up --provider virtualbox
33 ```
34
35 [Zipkin]: http://zipkin.io/
36 [VirtualBox]: https://www.virtualbox.org/
37 [Vagrant]: https://www.vagrantup.com/
38
39 You probably need to adjust the `Vagrantfile` configuration to meet your
40 networking needs. The file itself is documented so should not be hard to get
41 configured. After the change you can reload your box with the updated settings
42 like this:
43
44 ```
45 vagrant reload
46 ```
47
48 As mentioned the box is quite heavy and may take a few minutes to fully boot up.
49 To get into the box connect through ssh and use `vagrant` for both username and
50 password.
51
52 The following services have been set-up to run:
53 - Apache ZooKeeper (port: 2181)
54 - Apache Kafka (port: 9092)
55 - MySQL Server 5.5 (port: 3306)
56 - Zipkin Collector (Kafka, MySQL)
57 - Zipkin Query (MySQL)
58 - Zipkin Web (port: 8080, 9990)
59
60 To inspect if everything booted up properly check the log files in these
61 directories:
62 ```
63 /var/log/zookeeper
64 /var/log/kafka
65 /var/log/mysql
66 /var/log/zipkin
67 ```
68
69 The individual services can be managed with the `service` command:
70 - zookeeper
71 - kafka
72 - mysql
73 - collector
74 - query
75 - web
76
77 ## Usage
78
79 Wrap a server- or client-side [endpoint][] so that it emits traces to a Zipkin
80 collector.
81
82 [endpoint]: http://godoc.org/github.com/go-kit/kit/endpoint#Endpoint
83
84 ```go
85 func main() {
86 var (
87 myHost = "instance01.addsvc.internal.net"
88 myMethod = "ADD"
89 scribeHost = "scribe.internal.net"
90 timeout = 50 * time.Millisecond
91 batchSize = 100
92 batchInterval = 5 * time.Second
93 )
94 spanFunc := zipkin.NewSpanFunc(myHost, myMethod)
95 collector, _ := zipkin.NewScribeCollector(scribeHost, timeout, batchSize, batchInterval)
96
97 // Server-side
98 var server endpoint.Endpoint
99 server = makeEndpoint() // for your service
100 server = zipkin.AnnotateServer(spanFunc, collector)(server)
101 go serveViaHTTP(server)
102
103 // Client-side
104 before := httptransport.ClientBefore(zipkin.ToRequest(spanFunc))
105 var client endpoint.Endpoint
106 client = httptransport.NewClient(addr, codec, factory, before)
107 client = zipkin.AnnotateClient(spanFunc, collector)(client)
108 }
109 ```
0 # Zipkin
1
2 ## Development and Testing Set-up
3
4 Setting up [Zipkin] is not an easy thing to do. It will also demand quite some
5 resources. To help you get started with development and testing we've made a
6 docker-compose file available for running a full Zipkin stack. See the
7 `kit/tracing/zipkin/_docker` subdirectory.
8
9 You will need [docker-compose] 1.6.0+ and [docker-engine] 1.10.0+.
10
11 If running on Linux `HOSTNAME` can be set to `localhost`. If running on Mac OS X
12 or Windows you probably need to set the hostname environment variable to the
13 hostname of the VM running the docker containers.
14
15 ```sh
16 cd tracing/zipkin/_docker
17 HOSTNAME=localhost docker-compose -f docker-compose-zipkin.yml up
18 ```
19
20 [Zipkin]: http://zipkin.io/
21 [docker-compose]: https://docs.docker.com/compose/
22 [docker-engine]: https://docs.docker.com/engine/
23
24 As mentioned the [Zipkin] stack is quite heavy and may take a few minutes to
25 fully initialize.
26
27 The following services have been set-up to run:
28 - Apache Cassandra (port: 9160 (thrift), 9042 (native))
29 - Apache ZooKeeper (port: 2181)
30 - Apache Kafka (port: 9092)
31 - Zipkin Collector
32 - Zipkin Query
33 - Zipkin Web (port: 8080, 9990)
34
35
36 ## Middleware Usage
37
38 Wrap a server- or client-side [endpoint][] so that it emits traces to a Zipkin
39 collector. Make sure the host given to `MakeNewSpanFunc` resolves to an IP. If
40 not your span will silently fail!
41
42 [endpoint]: http://godoc.org/github.com/go-kit/kit/endpoint#Endpoint
43
44 If needing to create child spans in methods or calling another service from your
45 service method, it is highly recommended to request a context parameter so you
46 can transfer the needed metadata for traces across service boundaries.
47
48 It is also wise to always return error parameters with your service method
49 calls, even if your service method implementations will not throw errors
50 themselves. The error return parameter can be wired to pass the potential
51 transport errors when consuming your service API in a networked environment.
52
53 ```go
54 func main() {
55 var (
56 // myHost MUST resolve to an IP or your span will not show up in Zipkin.
57 myHost = "instance01.addsvc.internal.net:8000"
58 myService = "AddService"
59 myMethod = "Add"
60 url = myHost + "/add/"
61 kafkaHost = []string{"kafka.internal.net:9092"}
62 )
63
64 ctx := context.Background()
65
66 // Set Up Zipkin Collector and Span factory
67 spanFunc := zipkin.MakeNewSpanFunc(myHost, myService, myMethod)
68 collector, _ := zipkin.NewKafkaCollector(kafkaHost)
69
70 // Server-side Wiring
71 var server endpoint.Endpoint
72 server = makeEndpoint() // for your service
73 // wrap endpoint with Zipkin tracing middleware
74 server = zipkin.AnnotateServer(spanFunc, collector)(server)
75
76 http.Handle(
77 "/add/",
78 httptransport.NewServer(
79 ctx,
80 server,
81 decodeRequestFunc,
82 encodeResponseFunc,
83 httptransport.ServerBefore(
84 zipkin.ToContext(spanFunc),
85 ),
86 ),
87 )
88 ...
89
90 // Client-side
91 var client endpoint.Endpoint
92 client = httptransport.NewClient(
93 "GET",
94 URL,
95 encodeRequestFunc,
96 decodeResponseFunc,
97 httptransport.ClientBefore(zipkin.ToRequest(spanFunc)),
98 ).Endpoint()
99 client = zipkin.AnnotateClient(spanFunc, collector)(client)
100
101 ctx, cancel := context.WithTimeout(ctx, myTimeout)
102 defer cancel()
103
104 reply, err := client(ctx, param1, param2)
105 // do something with the response/error
106 ...
107 }
108 ```
109
110 ## Annotating Remote Resources
111
112 Next to the above shown examples of wiring server-side and client-side tracing
113 middlewares, you can also span resources called from your service methods.
114
115 To do this, the service method needs to include a context parameter. From your
116 endpoint wrapper you can inject the endpoint context which will hold the parent
117 span already created by the server-side middleware. If the resource is a remote
118 database you can use the `zipkin.ServerAddr` spanOption to identify the remote
119 host:port and the display name of this resource.
120
121 ```go
122 type MyService struct {
123 // add a Zipkin Collector to your service implementation's properties.
124 Collector zipkin.Collector
125 }
126
127 // Example of the endpoint.Endpoint to service method wrapper, injecting the
128 // context provided by the transport server.
129 func makeComplexEndpoint() endpoint.Endpoint {
130 return func(ctx context.Context, request interface{}) (interface{}, error) {
131 req := request.(ComplexRequest)
132 v, err := svc.Complex(ctx, req.A, req.B)
133 return ComplexResponse{V: v, Err: err}, nil
134 }
135 }
136
137 // Complex is an example method of our service, displaying the tracing of a
138 // remote database resource.
139 func (s *MyService) Complex(ctx context.Context, A someType, B otherType) (returnType, error) {
140 // we've parsed the incoming parameters and now we need to query the database.
141 // we wish to include this action into our trace.
142 span, collect := zipkin.NewChildSpan(
143 ctx,
144 s.Collector,
145 "complexQuery",
146 zipkin.ServerAddr(
147 "mysql01.internal.net:3306",
148 "MySQL",
149 ),
150 )
151 // you probably want to binary annotate your query
152 span.AnnotateBinary("query", "SELECT ... FROM ... WHERE ... ORDER BY ..."),
153 // annotate the start of the query
154 span.Annotate("complexQuery:start")
155 // do the query and handle resultset
156 ...
157 // annotate we are done with the query
158 span.Annotate("complexQuery:end")
159 // maybe binary annotate some items returned by the resultset
160 ...
161 // when done with all annotations, collect the span
162 collect()
163 ...
164 }
165 ```
0 # This file uses the version 2 docker-compose file format, described here:
1 # https://docs.docker.com/compose/compose-file/#version-2
2 #
3 # It runs the zipkin-cassandra, zipkin-collector, zipkin-query, zipkin-web, and
4 # zookeeper-exhibitor containers.
5 #
6 # On linux you probably want to start this composition like this:
7 #
8 # HOSTNAME=localhost docker-compose -f docker-compose-zipkin.yml up
9 #
10 # On OS X you will probably start like this:
11 #
12 # HOSTNAME=default docker-compose -f docker-compose-zipkin.yml up
13
14 version: '2'
15 services:
16 cassandra:
17 image: openzipkin/zipkin-cassandra:1.39.4
18 network_mode: host
19
20 zookeeper:
21 image: mbabineau/zookeeper-exhibitor:latest
22 network_mode: host
23 environment:
24 HOSTNAME: ${HOSTNAME}
25
26 kafka:
27 image: wurstmeister/kafka
28 network_mode: host
29 environment:
30 KAFKA_CREATE_TOPICS: "zipkin:1:1"
31 KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: 60000
32 KAFKA_ADVERTISED_PORT: 9092
33 KAFKA_ADVERTISED_HOST_NAME: ${HOSTNAME}
34 KAFKA_ZOOKEEPER_CONNECT: ${HOSTNAME}:2181
35 depends_on:
36 - zookeeper
37
38 collector:
39 image: openzipkin/zipkin-collector:1.39.4
40 network_mode: host
41 environment:
42 STORAGE_TYPE: cassandra
43 TRANSPORT_TYPE: kafka
44 CASSANDRA_CONTACT_POINTS: ${HOSTNAME}
45 KAFKA_ZOOKEEPER: ${HOSTNAME}:2181
46 METADATA_BROKER_LIST: ${HOSTNAME}:9092
47 depends_on:
48 - cassandra
49 - kafka
50
51 query:
52 image: openzipkin/zipkin-query:1.39.4
53 network_mode: host
54 environment:
55 STORAGE_TYPE: cassandra
56 TRANSPORT_TYPE: kafka
57 CASSANDRA_CONTACT_POINTS: ${HOSTNAME}
58 KAFKA_ZOOKEEPER: ${HOSTNAME}:2181
59 METADATA_BROKER_LIST: ${HOSTNAME}:9092
60 depends_on:
61 - cassandra
62 - kafka
63
64 web:
65 image: openzipkin/zipkin-web:1.39.4
66 network_mode: host
67 environment:
68 TRANSPORT_TYPE: kafka
69 KAFKA_ZOOKEEPER: ${HOSTNAME}:2181
70 METADATA_BROKER_LIST: ${HOSTNAME}:9092
71 QUERY_PORT_9411_TCP_ADDR: ${HOSTNAME}
72 ROOTURL: http://${HOSTNAME}:8080
73 depends_on:
74 - cassandra
75 - kafka
55 // remote endpoints.
66 type Collector interface {
77 Collect(*Span) error
8 ShouldSample(*Span) bool
89 Close() error
910 }
1011
1314
1415 // Collect implements Collector.
1516 func (NopCollector) Collect(*Span) error { return nil }
17
18 // ShouldSample implements Collector.
19 func (n NopCollector) ShouldSample(span *Span) bool { return false }
1620
1721 // Close implements Collector.
1822 func (NopCollector) Close() error { return nil }
2428 func (c MultiCollector) Collect(s *Span) error {
2529 return c.aggregateErrors(func(coll Collector) error { return coll.Collect(s) })
2630 }
31
32 // ShouldSample implements Collector.
33 func (c MultiCollector) ShouldSample(s *Span) bool { return false }
2734
2835 // Close implements Collector.
2936 func (c MultiCollector) Close() error {
3131 }
3232 return nil
3333 }
34
35 func (c *stubCollector) ShouldSample(*zipkin.Span) bool { return true }
3436
3537 func (c *stubCollector) Close() error {
3638 c.closed = true
8383
8484 // Collect implements Collector.
8585 func (c *KafkaCollector) Collect(s *Span) error {
86 if c.shouldSample(s.traceID) {
86 if c.ShouldSample(s) || s.debug {
8787 c.producer.Input() <- &sarama.ProducerMessage{
8888 Topic: c.topic,
8989 Key: nil,
9191 }
9292 }
9393 return nil
94 }
95
96 // ShouldSample implements Collector.
97 func (c *KafkaCollector) ShouldSample(s *Span) bool {
98 if !s.sampled && s.runSampler {
99 s.runSampler = false
100 s.sampled = c.shouldSample(s.TraceID())
101 }
102 return s.sampled
94103 }
95104
96105 // Close implements Collector.
0 package zipkin
0 package zipkin_test
11
2 import "testing"
2 import (
3 "testing"
4
5 "github.com/go-kit/kit/tracing/zipkin"
6 )
37
48 func TestSampleRate(t *testing.T) {
59 type triple struct {
2327 triple{999, 0, 0.99}: true,
2428 triple{9999, 0, 0.99}: false,
2529 } {
26 sampler := SampleRate(input.rate, input.salt)
30 sampler := zipkin.SampleRate(input.rate, input.salt)
2731 if have := sampler(input.id); want != have {
2832 t.Errorf("%#+v: want %v, have %v", input, want, have)
2933 }
6969
7070 // Collect implements Collector.
7171 func (c *ScribeCollector) Collect(s *Span) error {
72 c.spanc <- s
72 if c.ShouldSample(s) || s.debug {
73 c.spanc <- s
74 }
7375 return nil // accepted
76 }
77
78 // ShouldSample implements Collector.
79 func (c *ScribeCollector) ShouldSample(s *Span) bool {
80 if !s.sampled && s.runSampler {
81 s.runSampler = false
82 s.sampled = c.shouldSample(s.TraceID())
83 }
84 return s.sampled
7485 }
7586
7687 // Close implements Collector.
8596 for {
8697 select {
8798 case span := <-c.spanc:
88 if !c.shouldSample(span.traceID) {
89 continue
90 }
9199 c.batch = append(c.batch, &scribe.LogEntry{
92100 Category: c.category,
93101 Message: scribeSerialize(span),
3232 spanID = int64(456)
3333 parentSpanID = int64(0)
3434 value = "foo"
35 duration = 42 * time.Millisecond
3635 )
3736
3837 span := zipkin.NewSpan("1.2.3.4:1234", serviceName, methodName, traceID, spanID, parentSpanID)
39 span.AnnotateDuration("foo", 42*time.Millisecond)
38 span.Annotate("foo")
4039 if err := c.Collect(span); err != nil {
4140 t.Errorf("error during collection: %v", err)
4241 }
7877 gotAnnotation := gotSpan.GetAnnotations()[0]
7978 if want, have := value, gotAnnotation.GetValue(); want != have {
8079 t.Errorf("want %q, have %q", want, have)
81 }
82 if want, have := duration, time.Duration(gotAnnotation.GetDuration())*time.Microsecond; want != have {
83 t.Errorf("want %s, have %s", want, have)
8480 }
8581 }
8682
77 "strconv"
88 "time"
99
10 "golang.org/x/net/context"
11
1012 "github.com/go-kit/kit/tracing/zipkin/_thrift/gen-go/zipkincore"
1113 )
1214
2426
2527 annotations []annotation
2628 binaryAnnotations []binaryAnnotation
29
30 debug bool
31 sampled bool
32 runSampler bool
2733 }
2834
2935 // NewSpan returns a new Span, which can be annotated and collected by a
3642 traceID: traceID,
3743 spanID: spanID,
3844 parentSpanID: parentSpanID,
45 runSampler: true,
3946 }
4047 }
4148
94101 // It may be zero.
95102 func (s *Span) ParentSpanID() int64 { return s.parentSpanID }
96103
104 // Sample forces sampling of this span.
105 func (s *Span) Sample() {
106 s.sampled = true
107 }
108
109 // SetDebug forces debug mode on this span.
110 func (s *Span) SetDebug() {
111 s.debug = true
112 }
113
97114 // Annotate annotates the span with the given value.
98115 func (s *Span) Annotate(value string) {
99 s.AnnotateDuration(value, 0)
116 s.annotations = append(s.annotations, annotation{
117 timestamp: time.Now(),
118 value: value,
119 host: s.host,
120 })
100121 }
101122
102123 // AnnotateBinary annotates the span with a key and a value that will be []byte
187208 }
188209
189210 // AnnotateString annotates the span with a key and a string value.
211 // Deprecated: use AnnotateBinary instead.
190212 func (s *Span) AnnotateString(key, value string) {
191213 s.binaryAnnotations = append(s.binaryAnnotations, binaryAnnotation{
192214 key: key,
196218 })
197219 }
198220
199 // AnnotateDuration annotates the span with the given value and duration.
200 func (s *Span) AnnotateDuration(value string, duration time.Duration) {
201 s.annotations = append(s.annotations, annotation{
202 timestamp: time.Now(),
203 value: value,
204 duration: duration,
205 host: s.host,
206 })
221 // SpanOption sets an optional parameter for Spans.
222 type SpanOption func(s *Span)
223
224 // ServerAddr will create a ServerAddr annotation with its own zipkin Endpoint
225 // when used with NewChildSpan. This is typically used when the NewChildSpan is
226 // used to annotate non Zipkin aware resources like databases and caches.
227 func ServerAddr(hostport, serviceName string) SpanOption {
228 return func(s *Span) {
229 e := makeEndpoint(hostport, serviceName)
230 if e != nil {
231 host := s.host
232 s.host = e // set temporary Endpoint
233 s.AnnotateBinary(ServerAddress, true) // use
234 s.host = host // reset
235 }
236 }
237 }
238
239 // Host will update the default zipkin Endpoint of the Span it is used with.
240 func Host(hostport, serviceName string) SpanOption {
241 return func(s *Span) {
242 e := makeEndpoint(hostport, serviceName)
243 if e != nil {
244 s.host = e // update
245 }
246 }
247 }
248
249 // Debug will set the Span to debug mode forcing Samplers to pass the Span.
250 func Debug(debug bool) SpanOption {
251 return func(s *Span) {
252 s.debug = debug
253 }
254 }
255
256 // CollectFunc will collect the span created with NewChildSpan.
257 type CollectFunc func()
258
259 // NewChildSpan returns a new child Span of a parent Span extracted from the
260 // passed context. It can be used to annotate resources like databases, caches,
261 // etc. and treat them as if they are a regular service. For tracing client
262 // endpoints use AnnotateClient instead.
263 func NewChildSpan(ctx context.Context, collector Collector, methodName string, options ...SpanOption) (*Span, CollectFunc) {
264 span, ok := FromContext(ctx)
265 if !ok {
266 return nil, func() {}
267 }
268 childSpan := &Span{
269 host: span.host,
270 methodName: methodName,
271 traceID: span.traceID,
272 spanID: newID(),
273 parentSpanID: span.spanID,
274 }
275 childSpan.Annotate(ClientSend)
276 for _, option := range options {
277 option(childSpan)
278 }
279 collectFunc := func() {
280 if childSpan != nil {
281 childSpan.Annotate(ClientReceive)
282 collector.Collect(childSpan)
283 childSpan = nil
284 }
285 }
286 return childSpan, collectFunc
287 }
288
289 // IsSampled returns if the span is set to be sampled.
290 func (s *Span) IsSampled() bool {
291 return s.sampled
207292 }
208293
209294 // Encode creates a Thrift Span from the gokit Span.
214299 TraceId: s.traceID,
215300 Name: s.methodName,
216301 Id: s.spanID,
217 Debug: true, // TODO
302 Debug: s.debug,
218303 }
219304
220305 if s.parentSpanID != 0 {
228313 Timestamp: a.timestamp.UnixNano() / 1e3,
229314 Value: a.value,
230315 Host: a.host,
231 }
232
233 if a.duration > 0 {
234 zs.Annotations[i].Duration = new(int32)
235 *(zs.Annotations[i].Duration) = int32(a.duration / time.Microsecond)
236316 }
237317 }
238318
252332 type annotation struct {
253333 timestamp time.Time
254334 value string
255 duration time.Duration // optional
256335 host *zipkincore.Endpoint
257336 }
258337
99
1010 "github.com/go-kit/kit/endpoint"
1111 "github.com/go-kit/kit/log"
12 "github.com/go-kit/kit/transport/grpc"
1312 )
1413
1514 // In Zipkin, "spans are considered to start and stop with the client." The
3130 traceIDHTTPHeader = "X-B3-TraceId"
3231 spanIDHTTPHeader = "X-B3-SpanId"
3332 parentSpanIDHTTPHeader = "X-B3-ParentSpanId"
33 sampledHTTPHeader = "X-B3-Sampled"
34
3435 // gRPC keys are always lowercase
3536 traceIDGRPCKey = "x-b3-traceid"
3637 spanIDGRPCKey = "x-b3-spanid"
3738 parentSpanIDGRPCKey = "x-b3-parentspanid"
39 sampledGRPCKey = "x-b3-sampled"
3840
3941 // ClientSend is the annotation value used to mark a client sending a
4042 // request to a server.
5153 // ClientReceive is the annotation value used to mark a client's receipt
5254 // of a completed request from a server.
5355 ClientReceive = "cr"
56
57 // ServerAddress allows to annotate the server endpoint in case the server
58 // side trace is not instrumented as with resources like caches and
59 // databases.
60 ServerAddress = "sa"
61
62 // ClientAddress allows to annotate the client origin in case the client was
63 // forwarded by a proxy which does not instrument itself.
64 ClientAddress = "ca"
5465 )
5566
5667 // AnnotateServer returns a server.Middleware that extracts a span from the
6273 return func(ctx context.Context, request interface{}) (interface{}, error) {
6374 span, ok := FromContext(ctx)
6475 if !ok {
65 span = newSpan(newID(), newID(), 0)
76 traceID := newID()
77 span = newSpan(traceID, traceID, 0)
6678 ctx = context.WithValue(ctx, SpanContextKey, span)
6779 }
80 c.ShouldSample(span)
6881 span.Annotate(ServerReceive)
6982 defer func() { span.Annotate(ServerSend); c.Collect(span) }()
7083 return next(ctx, request)
8497 parentSpan, ok := FromContext(ctx)
8598 if ok {
8699 clientSpan = newSpan(parentSpan.TraceID(), newID(), parentSpan.SpanID())
100 clientSpan.runSampler = false
101 clientSpan.sampled = c.ShouldSample(parentSpan)
87102 } else {
88 clientSpan = newSpan(newID(), newID(), 0)
103 // Abnormal operation. Traces should always start server side.
104 // We create a root span but annotate with a warning.
105 traceID := newID()
106 clientSpan = newSpan(traceID, traceID, 0)
107 c.ShouldSample(clientSpan)
108 clientSpan.AnnotateBinary("warning", "missing server side trace")
89109 }
90110 ctx = context.WithValue(ctx, SpanContextKey, clientSpan) // set
91111 defer func() { ctx = context.WithValue(ctx, SpanContextKey, parentSpan) }() // reset
102122 // Before stack. The logger is used to report errors.
103123 func ToContext(newSpan NewSpanFunc, logger log.Logger) func(ctx context.Context, r *http.Request) context.Context {
104124 return func(ctx context.Context, r *http.Request) context.Context {
105 return context.WithValue(ctx, SpanContextKey, fromHTTP(newSpan, r, logger))
125 span := fromHTTP(newSpan, r, logger)
126 if span == nil {
127 return ctx
128 }
129 return context.WithValue(ctx, SpanContextKey, span)
106130 }
107131 }
108132
112136 // Before stack. The logger is used to report errors.
113137 func ToGRPCContext(newSpan NewSpanFunc, logger log.Logger) func(ctx context.Context, md *metadata.MD) context.Context {
114138 return func(ctx context.Context, md *metadata.MD) context.Context {
115 return context.WithValue(ctx, SpanContextKey, fromGRPC(newSpan, *md, logger))
139 span := fromGRPC(newSpan, *md, logger)
140 if span == nil {
141 return ctx
142 }
143 return context.WithValue(ctx, SpanContextKey, span)
116144 }
117145 }
118146
125153 return func(ctx context.Context, r *http.Request) context.Context {
126154 span, ok := FromContext(ctx)
127155 if !ok {
128 span = newSpan(newID(), newID(), 0)
156 return ctx
129157 }
130158 if id := span.TraceID(); id > 0 {
131159 r.Header.Set(traceIDHTTPHeader, strconv.FormatInt(id, 16))
135163 }
136164 if id := span.ParentSpanID(); id > 0 {
137165 r.Header.Set(parentSpanIDHTTPHeader, strconv.FormatInt(id, 16))
166 }
167 if span.IsSampled() {
168 r.Header.Set(sampledHTTPHeader, "1")
169 } else {
170 r.Header.Set(sampledHTTPHeader, "0")
138171 }
139172 return ctx
140173 }
149182 return func(ctx context.Context, md *metadata.MD) context.Context {
150183 span, ok := FromContext(ctx)
151184 if !ok {
152 span = newSpan(newID(), newID(), 0)
185 return ctx
153186 }
154187 if id := span.TraceID(); id > 0 {
155 key, value := grpc.EncodeKeyValue(traceIDGRPCKey, strconv.FormatInt(id, 16))
156 (*md)[key] = append((*md)[key], value)
188 (*md)[traceIDGRPCKey] = append((*md)[traceIDGRPCKey], strconv.FormatInt(id, 16))
157189 }
158190 if id := span.SpanID(); id > 0 {
159 key, value := grpc.EncodeKeyValue(spanIDGRPCKey, strconv.FormatInt(id, 16))
160 (*md)[key] = append((*md)[key], value)
191 (*md)[spanIDGRPCKey] = append((*md)[spanIDGRPCKey], strconv.FormatInt(id, 16))
161192 }
162193 if id := span.ParentSpanID(); id > 0 {
163 key, value := grpc.EncodeKeyValue(parentSpanIDGRPCKey, strconv.FormatInt(id, 16))
164 (*md)[key] = append((*md)[key], value)
194 (*md)[parentSpanIDGRPCKey] = append((*md)[parentSpanIDGRPCKey], strconv.FormatInt(id, 16))
195 }
196 if span.IsSampled() {
197 (*md)[sampledGRPCKey] = append((*md)[sampledGRPCKey], "1")
198 } else {
199 (*md)[sampledGRPCKey] = append((*md)[sampledGRPCKey], "0")
165200 }
166201 return ctx
167202 }
170205 func fromHTTP(newSpan NewSpanFunc, r *http.Request, logger log.Logger) *Span {
171206 traceIDStr := r.Header.Get(traceIDHTTPHeader)
172207 if traceIDStr == "" {
173 return newSpan(newID(), newID(), 0) // normal; just make a new one
208 return nil
174209 }
175210 traceID, err := strconv.ParseInt(traceIDStr, 16, 64)
176211 if err != nil {
177 logger.Log(traceIDHTTPHeader, traceIDStr, "err", err)
178 return newSpan(newID(), newID(), 0)
212 logger.Log("msg", "invalid trace id found, ignoring trace", "err", err)
213 return nil
179214 }
180215 spanIDStr := r.Header.Get(spanIDHTTPHeader)
181216 if spanIDStr == "" {
196231 logger.Log(parentSpanIDHTTPHeader, parentSpanIDStr, "err", err) // abnormal
197232 parentSpanID = 0 // the only way to deal with it
198233 }
199 return newSpan(traceID, spanID, parentSpanID)
234 span := newSpan(traceID, spanID, parentSpanID)
235 switch r.Header.Get(sampledHTTPHeader) {
236 case "0":
237 span.runSampler = false
238 span.sampled = false
239 case "1":
240 span.runSampler = false
241 span.sampled = true
242 default:
243 // we don't know if the upstream trace was sampled. use our sampler
244 span.runSampler = true
245 }
246 return span
200247 }
201248
202249 func fromGRPC(newSpan NewSpanFunc, md metadata.MD, logger log.Logger) *Span {
203250 traceIDSlc := md[traceIDGRPCKey]
204251 pos := len(traceIDSlc) - 1
205252 if pos < 0 {
206 return newSpan(newID(), newID(), 0) // normal; just make a new one
253 return nil
207254 }
208255 traceID, err := strconv.ParseInt(traceIDSlc[pos], 16, 64)
209256 if err != nil {
210 logger.Log(traceIDHTTPHeader, traceIDSlc, "err", err)
211 return newSpan(newID(), newID(), 0)
257 logger.Log("msg", "invalid trace id found, ignoring trace", "err", err)
258 return nil
212259 }
213260 spanIDSlc := md[spanIDGRPCKey]
214261 pos = len(spanIDSlc) - 1
239286 logger.Log(parentSpanIDHTTPHeader, parentSpanIDSlc, "err", err) // abnormal
240287 parentSpanID = 0 // the only way to deal with it
241288 }
242 return newSpan(traceID, spanID, parentSpanID)
289 span := newSpan(traceID, spanID, parentSpanID)
290 var sampledHdr string
291 sampledSlc := md[sampledGRPCKey]
292 pos = len(sampledSlc) - 1
293 if pos >= 0 {
294 sampledHdr = sampledSlc[pos]
295 }
296 switch sampledHdr {
297 case "0":
298 span.runSampler = false
299 span.sampled = false
300 case "1":
301 span.runSampler = false
302 span.sampled = true
303 default:
304 // we don't know if the upstream trace was sampled. use our sampler
305 span.runSampler = true
306 }
307 return span
243308 }
244309
245310 // FromContext extracts an existing Zipkin span if it is stored in the provided
2525 traceID int64 = 12
2626 spanID int64 = 34
2727 parentSpanID int64 = 56
28 sampled = "1"
2829 )
2930
3031 r, _ := http.NewRequest("GET", "https://best.horse", nil)
3132 r.Header.Set("X-B3-TraceId", strconv.FormatInt(traceID, 16))
3233 r.Header.Set("X-B3-SpanId", strconv.FormatInt(spanID, 16))
3334 r.Header.Set("X-B3-ParentSpanId", strconv.FormatInt(parentSpanID, 16))
35 r.Header.Set("X-B3-Sampled", sampled)
3436
3537 newSpan := zipkin.MakeNewSpanFunc(hostport, serviceName, methodName)
3638 toContext := zipkin.ToContext(newSpan, log.NewLogfmtLogger(ioutil.Discard))
5860 t.Errorf("%s: want %d, have %d", name, want, have)
5961 }
6062 }
63 if want, have := true, span.IsSampled(); want != have {
64 t.Errorf("IsSampled: want %v, have %v", want, have)
65 }
6166 }
6267
6368 func TestFromContext(t *testing.T) {
7075 parentSpanID int64 = 58
7176 )
7277
78 newSpan := zipkin.NewSpan(hostport, serviceName, methodName, traceID, spanID, parentSpanID)
79 newSpan.Sample()
7380 ctx := context.WithValue(
7481 context.Background(),
7582 zipkin.SpanContextKey,
76 zipkin.NewSpan(hostport, serviceName, methodName, traceID, spanID, parentSpanID),
83 newSpan,
7784 )
7885
7986 span, ok := zipkin.FromContext(ctx)
96103 t.Errorf("%s: want %d, have %d", name, want, have)
97104 }
98105 }
106 if want, have := true, span.IsSampled(); want != have {
107 t.Errorf("IsSampled: want %v, have %v", want, have)
108 }
99109 }
100110
101111 func TestToGRPCContext(t *testing.T) {
112122 "x-b3-traceid": []string{strconv.FormatInt(traceID, 16)},
113123 "x-b3-spanid": []string{strconv.FormatInt(spanID, 16)},
114124 "x-b3-parentspanid": []string{strconv.FormatInt(parentSpanID, 16)},
125 "x-b3-sampled": []string{"1"},
115126 }
116127
117128 newSpan := zipkin.MakeNewSpanFunc(hostport, serviceName, methodName)
140151 t.Errorf("%s: want %d, have %d", name, want, have)
141152 }
142153 }
154 if want, have := true, span.IsSampled(); want != have {
155 t.Errorf("IsSampled: want %v, have %v", want, have)
156 }
143157 }
144158
145159 func TestToRequest(t *testing.T) {
150164 traceID int64 = 20
151165 spanID int64 = 40
152166 parentSpanID int64 = 90
167 sampled = "1"
153168 )
154169
155170 newSpan := zipkin.MakeNewSpanFunc(hostport, serviceName, methodName)
156171 span := newSpan(traceID, spanID, parentSpanID)
172 span.Sample()
157173 ctx := context.WithValue(context.Background(), zipkin.SpanContextKey, span)
158174 r, _ := http.NewRequest("GET", "https://best.horse", nil)
159175 ctx = zipkin.ToRequest(newSpan)(ctx, r)
167183 t.Errorf("%s: want %q, have %q", header, want, have)
168184 }
169185 }
186 if want, have := sampled, r.Header.Get("X-B3-Sampled"); want != have {
187 t.Errorf("X-B3-Sampled: want %q, have %q", want, have)
188 }
170189 }
171190
172191 func TestToGRPCRequest(t *testing.T) {
177196 traceID int64 = 20
178197 spanID int64 = 40
179198 parentSpanID int64 = 90
199 sampled = "1"
180200 )
181201
182202 newSpan := zipkin.MakeNewSpanFunc(hostport, serviceName, methodName)
183203 span := newSpan(traceID, spanID, parentSpanID)
204 span.Sample()
184205 ctx := context.WithValue(context.Background(), zipkin.SpanContextKey, span)
185206 md := &metadata.MD{}
186207 ctx = zipkin.ToGRPCRequest(newSpan)(ctx, md)
194215 t.Errorf("%s: want %q, have %q", header, want, have)
195216 }
196217 }
218 if want, have := sampled, (*md)["x-b3-sampled"][0]; want != have {
219 t.Errorf("x-b3-sampled: want %q, have %q", want, have)
220 }
221
197222 }
198223
199224 func TestAnnotateServer(t *testing.T) {
247272 return nil
248273 }
249274
275 func (c *countingCollector) ShouldSample(s *zipkin.Span) bool {
276 return true
277 }
278
250279 func (c *countingCollector) Close() error {
251280 return nil
252281 }