updated readme for tracing and tracing/zipkin
added NewChildSpan() for annotating clients to tracing unaware services
addsvc update to use Kafka Collector instead of the deprecated Scribe collector
removed AnnotateDuration as this type of annotation is now deprecated.
added ability to create a "SA" annotation for use with annotating resources like databases and caches.
X-B3-Sampled implementation
added docker-compose-zipkin.yml file for setting up development zipkin environment
Bas van Beek
8 years ago
35 | 35 | // of glog. So, we define a new flag set, to keep those domains distinct. |
36 | 36 | fs := flag.NewFlagSet("", flag.ExitOnError) |
37 | 37 | var ( |
38 | debugAddr = fs.String("debug.addr", ":8000", "Address for HTTP debug/instrumentation server") | |
39 | httpAddr = fs.String("http.addr", ":8001", "Address for HTTP (JSON) server") | |
40 | grpcAddr = fs.String("grpc.addr", ":8002", "Address for gRPC server") | |
41 | netrpcAddr = fs.String("netrpc.addr", ":8003", "Address for net/rpc server") | |
42 | thriftAddr = fs.String("thrift.addr", ":8004", "Address for Thrift server") | |
43 | thriftProtocol = fs.String("thrift.protocol", "binary", "binary, compact, json, simplejson") | |
44 | thriftBufferSize = fs.Int("thrift.buffer.size", 0, "0 for unbuffered") | |
45 | thriftFramed = fs.Bool("thrift.framed", false, "true to enable framing") | |
46 | zipkinHostPort = fs.String("zipkin.host.port", "my.service.domain:12345", "Zipkin host:port") | |
47 | zipkinServiceName = fs.String("zipkin.service.name", "addsvc", "Zipkin service name") | |
48 | zipkinCollectorAddr = fs.String("zipkin.collector.addr", "", "Zipkin Scribe collector address (empty will log spans)") | |
49 | zipkinCollectorTimeout = fs.Duration("zipkin.collector.timeout", time.Second, "Zipkin collector timeout") | |
50 | zipkinCollectorBatchSize = fs.Int("zipkin.collector.batch.size", 100, "Zipkin collector batch size") | |
51 | zipkinCollectorBatchInterval = fs.Duration("zipkin.collector.batch.interval", time.Second, "Zipkin collector batch interval") | |
38 | debugAddr = fs.String("debug.addr", ":8000", "Address for HTTP debug/instrumentation server") | |
39 | httpAddr = fs.String("http.addr", ":8001", "Address for HTTP (JSON) server") | |
40 | grpcAddr = fs.String("grpc.addr", ":8002", "Address for gRPC server") | |
41 | netrpcAddr = fs.String("netrpc.addr", ":8003", "Address for net/rpc server") | |
42 | thriftAddr = fs.String("thrift.addr", ":8004", "Address for Thrift server") | |
43 | thriftProtocol = fs.String("thrift.protocol", "binary", "binary, compact, json, simplejson") | |
44 | thriftBufferSize = fs.Int("thrift.buffer.size", 0, "0 for unbuffered") | |
45 | thriftFramed = fs.Bool("thrift.framed", false, "true to enable framing") | |
46 | zipkinHostPort = fs.String("zipkin.host.port", "my.service.domain:12345", "Zipkin host:port") | |
47 | zipkinServiceName = fs.String("zipkin.service.name", "addsvc", "Zipkin service name") | |
48 | zipkinCollectorAddr = fs.String("zipkin.collector.addr", "", "Zipkin Kafka collector address (empty will log spans)") | |
52 | 49 | ) |
53 | 50 | flag.Usage = fs.Usage // only show our flags |
54 | 51 | if err := fs.Parse(os.Args[1:]); err != nil { |
87 | 84 | collector = loggingCollector{zipkinLogger} // TODO(pb) |
88 | 85 | if *zipkinCollectorAddr != "" { |
89 | 86 | var err error |
90 | if collector, err = zipkin.NewScribeCollector( | |
91 | *zipkinCollectorAddr, | |
92 | *zipkinCollectorTimeout, | |
93 | zipkin.ScribeBatchSize(*zipkinCollectorBatchSize), | |
94 | zipkin.ScribeBatchInterval(*zipkinCollectorBatchInterval), | |
95 | zipkin.ScribeLogger(zipkinLogger), | |
87 | if collector, err = zipkin.NewKafkaCollector( | |
88 | []string{*zipkinCollectorAddr}, | |
89 | zipkin.KafkaLogger(zipkinLogger), | |
96 | 90 | ); err != nil { |
97 | 91 | zipkinLogger.Log("err", err) |
98 | 92 | os.Exit(1) |
256 | 250 | return nil |
257 | 251 | } |
258 | 252 | |
253 | func (c loggingCollector) ShouldSample(*zipkin.Span) bool { return true } | |
254 | ||
259 | 255 | func (c loggingCollector) Close() error { return nil } |
13 | 13 | hot spots, and diagnosing errors. All microservice infrastructures will |
14 | 14 | benefit from request tracing; sufficiently large infrastructures will require |
15 | 15 | it. |
16 | ||
17 | ## Test Setup | |
18 | ||
19 | Setting up [Zipkin] is not an easy thing to do. It will also demand quite some | |
20 | resources. To help you get started with development and testing we've made a | |
21 | [VirtualBox] image available through [Vagrant] (*The box will require about 6GB | |
22 | internal memory*). | |
23 | ||
24 | First make sure you've installed [Vagrant] on your machine. Then you can | |
25 | download and run the [Vagrant] image like this from the command line: | |
26 | ||
27 | ``` | |
28 | # create a new directory to store your vagrant configuration and image files | |
29 | mkdir zipkin | |
30 | cd zipkin | |
31 | vagrant init bas303/zipkin | |
32 | vagrant up --provider virtualbox | |
33 | ``` | |
34 | ||
35 | [Zipkin]: http://zipkin.io/ | |
36 | [VirtualBox]: https://www.virtualbox.org/ | |
37 | [Vagrant]: https://www.vagrantup.com/ | |
38 | ||
39 | You probably need to adjust the `Vagrantfile` configuration to meet your | |
40 | networking needs. The file itself is documented so should not be hard to get | |
41 | configured. After the change you can reload your box with the updated settings | |
42 | like this: | |
43 | ||
44 | ``` | |
45 | vagrant reload | |
46 | ``` | |
47 | ||
48 | As mentioned the box is quite heavy and may take a few minutes to fully boot up. | |
49 | To get into the box connect through ssh and use `vagrant` for both username and | |
50 | password. | |
51 | ||
52 | The following services have been set-up to run: | |
53 | - Apache ZooKeeper (port: 2181) | |
54 | - Apache Kafka (port: 9092) | |
55 | - MySQL Server 5.5 (port: 3306) | |
56 | - Zipkin Collector (Kafka, MySQL) | |
57 | - Zipkin Query (MySQL) | |
58 | - Zipkin Web (port: 8080, 9990) | |
59 | ||
60 | To inspect if everything booted up properly check the log files in these | |
61 | directories: | |
62 | ``` | |
63 | /var/log/zookeeper | |
64 | /var/log/kafka | |
65 | /var/log/mysql | |
66 | /var/log/zipkin | |
67 | ``` | |
68 | ||
69 | The individual services can be managed with the `service` command: | |
70 | - zookeeper | |
71 | - kafka | |
72 | - mysql | |
73 | - collector | |
74 | - query | |
75 | - web | |
76 | ||
77 | ## Usage | |
78 | ||
79 | Wrap a server- or client-side [endpoint][] so that it emits traces to a Zipkin | |
80 | collector. | |
81 | ||
82 | [endpoint]: http://godoc.org/github.com/go-kit/kit/endpoint#Endpoint | |
83 | ||
84 | ```go | |
85 | func main() { | |
86 | var ( | |
87 | myHost = "instance01.addsvc.internal.net" | |
88 | myMethod = "ADD" | |
89 | scribeHost = "scribe.internal.net" | |
90 | timeout = 50 * time.Millisecond | |
91 | batchSize = 100 | |
92 | batchInterval = 5 * time.Second | |
93 | ) | |
94 | spanFunc := zipkin.NewSpanFunc(myHost, myMethod) | |
95 | collector, _ := zipkin.NewScribeCollector(scribeHost, timeout, batchSize, batchInterval) | |
96 | ||
97 | // Server-side | |
98 | var server endpoint.Endpoint | |
99 | server = makeEndpoint() // for your service | |
100 | server = zipkin.AnnotateServer(spanFunc, collector)(server) | |
101 | go serveViaHTTP(server) | |
102 | ||
103 | // Client-side | |
104 | before := httptransport.ClientBefore(zipkin.ToRequest(spanFunc)) | |
105 | var client endpoint.Endpoint | |
106 | client = httptransport.NewClient(addr, codec, factory, before) | |
107 | client = zipkin.AnnotateClient(spanFunc, collector)(client) | |
108 | } | |
109 | ``` |
0 | # Zipkin | |
1 | ||
2 | ## Development and Testing Set-up | |
3 | ||
4 | Setting up [Zipkin] is not an easy thing to do. It will also demand quite some | |
5 | resources. To help you get started with development and testing we've made a | |
6 | docker-compose file available for running a full Zipkin stack. See the | |
7 | `kit/tracing/zipkin/_docker` subdirectory. | |
8 | ||
9 | You will need [docker-compose] 1.6.0+ and [docker-engine] 1.10.0+. | |
10 | ||
11 | If running on Linux `HOSTNAME` can be set to `localhost`. If running on Mac OS X | |
12 | or Windows you probably need to set the hostname environment variable to the | |
13 | hostname of the VM running the docker containers. | |
14 | ||
15 | ```sh | |
16 | git clone https://github.com/basvanbeek/docker-zipkin.git | |
17 | cd docker-zipkin | |
18 | HOSTNAME=localhost docker-compose -f docker-compose-zipkin.yml up | |
19 | ``` | |
20 | ||
21 | [Zipkin]: http://zipkin.io/ | |
22 | [docker-compose]: https://docs.docker.com/compose/ | |
23 | [docker-engine]: https://docs.docker.com/engine/ | |
24 | ||
25 | As mentioned the [Zipkin] stack is quite heavy and may take a few minutes to | |
26 | fully initialize. | |
27 | ||
28 | The following services have been set-up to run: | |
29 | - Apache Cassandra (port: 9160 (thrift), 9042 (native)) | |
30 | - Apache ZooKeeper (port: 2181) | |
31 | - Apache Kafka (port: 9092) | |
32 | - Zipkin Collector | |
33 | - Zipkin Query | |
34 | - Zipkin Web (port: 8080, 9990) | |
35 | ||
36 | ||
37 | ## Middleware Usage | |
38 | ||
39 | Wrap a server- or client-side [endpoint][] so that it emits traces to a Zipkin | |
40 | collector. Make sure the host given to `MakeNewSpanFunc` resolves to an IP. If | |
41 | not your span will silently fail! | |
42 | ||
43 | [endpoint]: http://godoc.org/github.com/go-kit/kit/endpoint#Endpoint | |
44 | ||
45 | If needing to create child spans in methods or calling another service from your | |
46 | service method, it is highly recommended to request a context parameter so you | |
47 | can transfer the needed metadata for traces across service boundaries. | |
48 | ||
49 | It is also wise to always return error parameters with your service method | |
50 | calls, even if your service method implementations will not throw errors | |
51 | themselves. The error return parameter can be wired to pass the potential | |
52 | transport errors when consuming your service API in a networked environment. | |
53 | ||
54 | ```go | |
55 | func main() { | |
56 | var ( | |
57 | // myHost MUST resolve to an IP or your span will not show up in Zipkin. | |
58 | myHost = "instance01.addsvc.internal.net:8000" | |
59 | myService = "AddService" | |
60 | myMethod = "Add" | |
61 | url = myHost + "/add/" | |
62 | kafkaHost = []string{"kafka.internal.net:9092"} | |
63 | ) | |
64 | ||
65 | ctx := context.Background() | |
66 | ||
67 | // Set Up Zipkin Collector and Span factory | |
68 | spanFunc := zipkin.MakeNewSpanFunc(myHost, myService, myMethod) | |
69 | collector, _ := zipkin.NewKafkaCollector(kafkaHost) | |
70 | ||
71 | // Server-side Wiring | |
72 | var server endpoint.Endpoint | |
73 | server = makeEndpoint() // for your service | |
74 | // wrap endpoint with Zipkin tracing middleware | |
75 | server = zipkin.AnnotateServer(spanFunc, collector)(server) | |
76 | ||
77 | http.Handle( | |
78 | "/add/", | |
79 | httptransport.NewServer( | |
80 | ctx, | |
81 | server, | |
82 | decodeRequestFunc, | |
83 | encodeResponseFunc, | |
84 | httptransport.ServerBefore( | |
85 | zipkin.ToContext(spanFunc), | |
86 | ), | |
87 | ), | |
88 | ) | |
89 | ... | |
90 | ||
91 | // Client-side | |
92 | var client endpoint.Endpoint | |
93 | client = httptransport.NewClient( | |
94 | "GET", | |
95 | URL, | |
96 | encodeRequestFunc, | |
97 | decodeResponseFunc, | |
98 | httptransport.ClientBefore(zipkin.ToRequest(spanFunc)), | |
99 | ).Endpoint() | |
100 | client = zipkin.AnnotateClient(spanFunc, collector)(client) | |
101 | ||
102 | ctx, cancel := context.WithTimeout(ctx, myTimeout) | |
103 | defer cancel() | |
104 | ||
105 | reply, err := client(ctx, param1, param2) | |
106 | // do something with the response/error | |
107 | ... | |
108 | } | |
109 | ``` | |
110 | ||
111 | ## Annotating Remote Resources | |
112 | ||
113 | Next to the above shown examples of wiring server-side and client-side tracing | |
114 | middlewares, you can also span resources called from your service methods. | |
115 | ||
116 | To do this, the service method needs to include a context parameter. From your | |
117 | endpoint wrapper you can inject the endpoint context which will hold the parent | |
118 | span already created by the server-side middleware. If the resource is a remote | |
119 | database you can use the `zipkin.ServerAddr` spanOption to identify the remote | |
120 | host:port and the display name of this resource. | |
121 | ||
122 | ```go | |
123 | type MyService struct { | |
124 | // add a Zipkin Collector to your service implementation's properties. | |
125 | Collector zipkin.Collector | |
126 | } | |
127 | ||
128 | // Example of the endpoint.Endpoint to service method wrapper, injecting the | |
129 | // context provided by the transport server. | |
130 | func makeComplexEndpoint() endpoint.Endpoint { | |
131 | return func(ctx context.Context, request interface{}) (interface{}, error) { | |
132 | req := request.(ComplexRequest) | |
133 | v, err := svc.Complex(ctx, req.A, req.B) | |
134 | return ComplexResponse{V: v, Err: err}, nil | |
135 | } | |
136 | } | |
137 | ||
138 | // Complex is an example method of our service, displaying the tracing of a | |
139 | // remote database resource. | |
140 | func (s *MyService) Complex(ctx context.Context, A someType, B otherType) (returnType, error) { | |
141 | // we've parsed the incoming parameters and now we need to query the database. | |
142 | // we wish to include this action into our trace. | |
143 | span, collect := zipkin.NewChildSpan( | |
144 | ctx, | |
145 | s.Collector, | |
146 | "complexQuery", | |
147 | zipkin.ServerAddr( | |
148 | "mysql01.internal.net:3306", | |
149 | "MySQL", | |
150 | ), | |
151 | ) | |
152 | // you probably want to binary annotate your query | |
153 | span.AnnotateBinary("query", "SELECT ... FROM ... WHERE ... ORDER BY ..."), | |
154 | // annotate the start of the query | |
155 | span.Annotate("complexQuery:start") | |
156 | // do the query and handle resultset | |
157 | ... | |
158 | // annotate we are done with the query | |
159 | span.Annotate("complexQuery:end") | |
160 | // maybe binary annotate some items returned by the resultset | |
161 | ... | |
162 | // when done with all annotations, collect the span | |
163 | collect() | |
164 | ... | |
165 | } | |
166 | ``` |
0 | # This file uses the version 2 docker-compose file format, described here: | |
1 | # https://docs.docker.com/compose/compose-file/#version-2 | |
2 | # | |
3 | # It runs the zipkin-cassandra, zipkin-collector, zipkin-query, zipkin-web, and | |
4 | # zookeeper-exhibitor containers. | |
5 | # | |
6 | # On linux you probably want to start this composition like this: | |
7 | # | |
8 | # HOSTNAME=localhost docker-compose -f docker-compose-zipkin.yml up | |
9 | # | |
10 | # On OS X you will probably start like this: | |
11 | # | |
12 | # HOSTNAME=default docker-compose -f docker-compose-zipkin.yml up | |
13 | ||
14 | version: '2' | |
15 | services: | |
16 | cassandra: | |
17 | image: openzipkin/zipkin-cassandra:1.39.4 | |
18 | network_mode: host | |
19 | ||
20 | zookeeper: | |
21 | image: mbabineau/zookeeper-exhibitor:latest | |
22 | network_mode: host | |
23 | environment: | |
24 | HOSTNAME: ${HOSTNAME} | |
25 | ||
26 | kafka: | |
27 | image: wurstmeister/kafka | |
28 | network_mode: host | |
29 | environment: | |
30 | KAFKA_CREATE_TOPICS: "zipkin:1:1" | |
31 | KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: 60000 | |
32 | KAFKA_ADVERTISED_PORT: 9092 | |
33 | KAFKA_ADVERTISED_HOST_NAME: ${HOSTNAME} | |
34 | KAFKA_ZOOKEEPER_CONNECT: ${HOSTNAME}:2181 | |
35 | depends_on: | |
36 | - zookeeper | |
37 | ||
38 | collector: | |
39 | image: openzipkin/zipkin-collector:1.39.4 | |
40 | network_mode: host | |
41 | environment: | |
42 | STORAGE_TYPE: cassandra | |
43 | TRANSPORT_TYPE: kafka | |
44 | CASSANDRA_CONTACT_POINTS: ${HOSTNAME} | |
45 | KAFKA_ZOOKEEPER: ${HOSTNAME}:2181 | |
46 | METADATA_BROKER_LIST: ${HOSTNAME}:9092 | |
47 | depends_on: | |
48 | - cassandra | |
49 | - kafka | |
50 | ||
51 | query: | |
52 | image: openzipkin/zipkin-query:1.39.4 | |
53 | network_mode: host | |
54 | environment: | |
55 | STORAGE_TYPE: cassandra | |
56 | TRANSPORT_TYPE: kafka | |
57 | CASSANDRA_CONTACT_POINTS: ${HOSTNAME} | |
58 | KAFKA_ZOOKEEPER: ${HOSTNAME}:2181 | |
59 | METADATA_BROKER_LIST: ${HOSTNAME}:9092 | |
60 | depends_on: | |
61 | - cassandra | |
62 | - kafka | |
63 | ||
64 | web: | |
65 | image: openzipkin/zipkin-web:1.39.4 | |
66 | network_mode: host | |
67 | environment: | |
68 | TRANSPORT_TYPE: kafka | |
69 | KAFKA_ZOOKEEPER: ${HOSTNAME}:2181 | |
70 | METADATA_BROKER_LIST: ${HOSTNAME}:9092 | |
71 | QUERY_PORT_9411_TCP_ADDR: ${HOSTNAME} | |
72 | ROOTURL: http://${HOSTNAME}:8080 | |
73 | depends_on: | |
74 | - cassandra | |
75 | - kafka |
5 | 5 | // remote endpoints. |
6 | 6 | type Collector interface { |
7 | 7 | Collect(*Span) error |
8 | ShouldSample(*Span) bool | |
8 | 9 | Close() error |
9 | 10 | } |
10 | 11 | |
13 | 14 | |
14 | 15 | // Collect implements Collector. |
15 | 16 | func (NopCollector) Collect(*Span) error { return nil } |
17 | ||
18 | // ShouldSample implements Collector. | |
19 | func (n NopCollector) ShouldSample(span *Span) bool { return false } | |
16 | 20 | |
17 | 21 | // Close implements Collector. |
18 | 22 | func (NopCollector) Close() error { return nil } |
24 | 28 | func (c MultiCollector) Collect(s *Span) error { |
25 | 29 | return c.aggregateErrors(func(coll Collector) error { return coll.Collect(s) }) |
26 | 30 | } |
31 | ||
32 | // ShouldSample implements Collector. | |
33 | func (c MultiCollector) ShouldSample(s *Span) bool { return false } | |
27 | 34 | |
28 | 35 | // Close implements Collector. |
29 | 36 | func (c MultiCollector) Close() error { |
31 | 31 | } |
32 | 32 | return nil |
33 | 33 | } |
34 | ||
35 | func (c *stubCollector) ShouldSample(*zipkin.Span) bool { return true } | |
34 | 36 | |
35 | 37 | func (c *stubCollector) Close() error { |
36 | 38 | c.closed = true |
83 | 83 | |
84 | 84 | // Collect implements Collector. |
85 | 85 | func (c *KafkaCollector) Collect(s *Span) error { |
86 | if c.shouldSample(s.traceID) { | |
86 | if c.ShouldSample(s) || s.debug { | |
87 | 87 | c.producer.Input() <- &sarama.ProducerMessage{ |
88 | 88 | Topic: c.topic, |
89 | 89 | Key: nil, |
91 | 91 | } |
92 | 92 | } |
93 | 93 | return nil |
94 | } | |
95 | ||
96 | // ShouldSample implements Collector. | |
97 | func (c *KafkaCollector) ShouldSample(s *Span) bool { | |
98 | if !s.sampled && s.runSampler { | |
99 | s.runSampler = false | |
100 | s.sampled = c.shouldSample(s.TraceID()) | |
101 | } | |
102 | return s.sampled | |
94 | 103 | } |
95 | 104 | |
96 | 105 | // Close implements Collector. |
0 | package zipkin | |
0 | package zipkin_test | |
1 | 1 | |
2 | import "testing" | |
2 | import ( | |
3 | "testing" | |
4 | ||
5 | "github.com/go-kit/kit/tracing/zipkin" | |
6 | ) | |
3 | 7 | |
4 | 8 | func TestSampleRate(t *testing.T) { |
5 | 9 | type triple struct { |
23 | 27 | triple{999, 0, 0.99}: true, |
24 | 28 | triple{9999, 0, 0.99}: false, |
25 | 29 | } { |
26 | sampler := SampleRate(input.rate, input.salt) | |
30 | sampler := zipkin.SampleRate(input.rate, input.salt) | |
27 | 31 | if have := sampler(input.id); want != have { |
28 | 32 | t.Errorf("%#+v: want %v, have %v", input, want, have) |
29 | 33 | } |
69 | 69 | |
70 | 70 | // Collect implements Collector. |
71 | 71 | func (c *ScribeCollector) Collect(s *Span) error { |
72 | c.spanc <- s | |
72 | if c.ShouldSample(s) || s.debug { | |
73 | c.spanc <- s | |
74 | } | |
73 | 75 | return nil // accepted |
76 | } | |
77 | ||
78 | // ShouldSample implements Collector. | |
79 | func (c *ScribeCollector) ShouldSample(s *Span) bool { | |
80 | if !s.sampled && s.runSampler { | |
81 | s.runSampler = false | |
82 | s.sampled = c.shouldSample(s.TraceID()) | |
83 | } | |
84 | return s.sampled | |
74 | 85 | } |
75 | 86 | |
76 | 87 | // Close implements Collector. |
85 | 96 | for { |
86 | 97 | select { |
87 | 98 | case span := <-c.spanc: |
88 | if !c.shouldSample(span.traceID) { | |
89 | continue | |
90 | } | |
91 | 99 | c.batch = append(c.batch, &scribe.LogEntry{ |
92 | 100 | Category: c.category, |
93 | 101 | Message: scribeSerialize(span), |
32 | 32 | spanID = int64(456) |
33 | 33 | parentSpanID = int64(0) |
34 | 34 | value = "foo" |
35 | duration = 42 * time.Millisecond | |
36 | 35 | ) |
37 | 36 | |
38 | 37 | span := zipkin.NewSpan("1.2.3.4:1234", serviceName, methodName, traceID, spanID, parentSpanID) |
39 | span.AnnotateDuration("foo", 42*time.Millisecond) | |
38 | span.Annotate("foo") | |
40 | 39 | if err := c.Collect(span); err != nil { |
41 | 40 | t.Errorf("error during collection: %v", err) |
42 | 41 | } |
78 | 77 | gotAnnotation := gotSpan.GetAnnotations()[0] |
79 | 78 | if want, have := value, gotAnnotation.GetValue(); want != have { |
80 | 79 | t.Errorf("want %q, have %q", want, have) |
81 | } | |
82 | if want, have := duration, time.Duration(gotAnnotation.GetDuration())*time.Microsecond; want != have { | |
83 | t.Errorf("want %s, have %s", want, have) | |
84 | 80 | } |
85 | 81 | } |
86 | 82 |
7 | 7 | "strconv" |
8 | 8 | "time" |
9 | 9 | |
10 | "golang.org/x/net/context" | |
11 | ||
10 | 12 | "github.com/go-kit/kit/tracing/zipkin/_thrift/gen-go/zipkincore" |
11 | 13 | ) |
12 | 14 | |
24 | 26 | |
25 | 27 | annotations []annotation |
26 | 28 | binaryAnnotations []binaryAnnotation |
29 | ||
30 | debug bool | |
31 | sampled bool | |
32 | runSampler bool | |
27 | 33 | } |
28 | 34 | |
29 | 35 | // NewSpan returns a new Span, which can be annotated and collected by a |
36 | 42 | traceID: traceID, |
37 | 43 | spanID: spanID, |
38 | 44 | parentSpanID: parentSpanID, |
45 | runSampler: true, | |
39 | 46 | } |
40 | 47 | } |
41 | 48 | |
94 | 101 | // It may be zero. |
95 | 102 | func (s *Span) ParentSpanID() int64 { return s.parentSpanID } |
96 | 103 | |
104 | // Sample forces sampling of this span. | |
105 | func (s *Span) Sample() { | |
106 | s.sampled = true | |
107 | } | |
108 | ||
109 | // SetDebug forces debug mode on this span. | |
110 | func (s *Span) SetDebug() { | |
111 | s.debug = true | |
112 | } | |
113 | ||
97 | 114 | // Annotate annotates the span with the given value. |
98 | 115 | func (s *Span) Annotate(value string) { |
99 | s.AnnotateDuration(value, 0) | |
116 | s.annotations = append(s.annotations, annotation{ | |
117 | timestamp: time.Now(), | |
118 | value: value, | |
119 | host: s.host, | |
120 | }) | |
100 | 121 | } |
101 | 122 | |
102 | 123 | // AnnotateBinary annotates the span with a key and a value that will be []byte |
187 | 208 | } |
188 | 209 | |
189 | 210 | // AnnotateString annotates the span with a key and a string value. |
211 | // Deprecated: use AnnotateBinary instead. | |
190 | 212 | func (s *Span) AnnotateString(key, value string) { |
191 | 213 | s.binaryAnnotations = append(s.binaryAnnotations, binaryAnnotation{ |
192 | 214 | key: key, |
196 | 218 | }) |
197 | 219 | } |
198 | 220 | |
199 | // AnnotateDuration annotates the span with the given value and duration. | |
200 | func (s *Span) AnnotateDuration(value string, duration time.Duration) { | |
201 | s.annotations = append(s.annotations, annotation{ | |
202 | timestamp: time.Now(), | |
203 | value: value, | |
204 | duration: duration, | |
205 | host: s.host, | |
206 | }) | |
221 | // SpanOption sets an optional parameter for Spans. | |
222 | type SpanOption func(s *Span) | |
223 | ||
224 | // ServerAddr will create a ServerAddr annotation with its own zipkin Endpoint | |
225 | // when used with NewChildSpan. This is typically used when the NewChildSpan is | |
226 | // used to annotate non Zipkin aware resources like databases and caches. | |
227 | func ServerAddr(hostport, serviceName string) SpanOption { | |
228 | return func(s *Span) { | |
229 | e := makeEndpoint(hostport, serviceName) | |
230 | if e != nil { | |
231 | host := s.host | |
232 | s.host = e // set temporary Endpoint | |
233 | s.AnnotateBinary(ServerAddress, true) // use | |
234 | s.host = host // reset | |
235 | } | |
236 | } | |
237 | } | |
238 | ||
239 | // Host will update the default zipkin Endpoint of the Span it is used with. | |
240 | func Host(hostport, serviceName string) SpanOption { | |
241 | return func(s *Span) { | |
242 | e := makeEndpoint(hostport, serviceName) | |
243 | if e != nil { | |
244 | s.host = e // update | |
245 | } | |
246 | } | |
247 | } | |
248 | ||
249 | // Debug will set the Span to debug mode forcing Samplers to pass the Span. | |
250 | func Debug(debug bool) SpanOption { | |
251 | return func(s *Span) { | |
252 | s.debug = debug | |
253 | } | |
254 | } | |
255 | ||
256 | // CollectFunc will collect the span created with NewChildSpan. | |
257 | type CollectFunc func() | |
258 | ||
259 | // NewChildSpan returns a new child Span of a parent Span extracted from the | |
260 | // passed context. It can be used to annotate resources like databases, caches, | |
261 | // etc. and treat them as if they are a regular service. For tracing client | |
262 | // endpoints use AnnotateClient instead. | |
263 | func NewChildSpan(ctx context.Context, collector Collector, methodName string, options ...SpanOption) (*Span, CollectFunc) { | |
264 | span, ok := FromContext(ctx) | |
265 | if !ok { | |
266 | return nil, func() {} | |
267 | } | |
268 | childSpan := &Span{ | |
269 | host: span.host, | |
270 | methodName: methodName, | |
271 | traceID: span.traceID, | |
272 | spanID: newID(), | |
273 | parentSpanID: span.spanID, | |
274 | } | |
275 | childSpan.Annotate(ClientSend) | |
276 | for _, option := range options { | |
277 | option(childSpan) | |
278 | } | |
279 | collectFunc := func() { | |
280 | if childSpan != nil { | |
281 | childSpan.Annotate(ClientReceive) | |
282 | collector.Collect(childSpan) | |
283 | childSpan = nil | |
284 | } | |
285 | } | |
286 | return childSpan, collectFunc | |
287 | } | |
288 | ||
289 | // IsSampled returns if the span is set to be sampled. | |
290 | func (s *Span) IsSampled() bool { | |
291 | return s.sampled | |
207 | 292 | } |
208 | 293 | |
209 | 294 | // Encode creates a Thrift Span from the gokit Span. |
214 | 299 | TraceId: s.traceID, |
215 | 300 | Name: s.methodName, |
216 | 301 | Id: s.spanID, |
217 | Debug: true, // TODO | |
302 | Debug: s.debug, | |
218 | 303 | } |
219 | 304 | |
220 | 305 | if s.parentSpanID != 0 { |
228 | 313 | Timestamp: a.timestamp.UnixNano() / 1e3, |
229 | 314 | Value: a.value, |
230 | 315 | Host: a.host, |
231 | } | |
232 | ||
233 | if a.duration > 0 { | |
234 | zs.Annotations[i].Duration = new(int32) | |
235 | *(zs.Annotations[i].Duration) = int32(a.duration / time.Microsecond) | |
236 | 316 | } |
237 | 317 | } |
238 | 318 | |
252 | 332 | type annotation struct { |
253 | 333 | timestamp time.Time |
254 | 334 | value string |
255 | duration time.Duration // optional | |
256 | 335 | host *zipkincore.Endpoint |
257 | 336 | } |
258 | 337 |
9 | 9 | |
10 | 10 | "github.com/go-kit/kit/endpoint" |
11 | 11 | "github.com/go-kit/kit/log" |
12 | "github.com/go-kit/kit/transport/grpc" | |
13 | 12 | ) |
14 | 13 | |
15 | 14 | // In Zipkin, "spans are considered to start and stop with the client." The |
31 | 30 | traceIDHTTPHeader = "X-B3-TraceId" |
32 | 31 | spanIDHTTPHeader = "X-B3-SpanId" |
33 | 32 | parentSpanIDHTTPHeader = "X-B3-ParentSpanId" |
33 | sampledHTTPHeader = "X-B3-Sampled" | |
34 | ||
34 | 35 | // gRPC keys are always lowercase |
35 | 36 | traceIDGRPCKey = "x-b3-traceid" |
36 | 37 | spanIDGRPCKey = "x-b3-spanid" |
37 | 38 | parentSpanIDGRPCKey = "x-b3-parentspanid" |
39 | sampledGRPCKey = "x-b3-sampled" | |
38 | 40 | |
39 | 41 | // ClientSend is the annotation value used to mark a client sending a |
40 | 42 | // request to a server. |
51 | 53 | // ClientReceive is the annotation value used to mark a client's receipt |
52 | 54 | // of a completed request from a server. |
53 | 55 | ClientReceive = "cr" |
56 | ||
57 | // ServerAddress allows to annotate the server endpoint in case the server | |
58 | // side trace is not instrumented as with resources like caches and | |
59 | // databases. | |
60 | ServerAddress = "sa" | |
61 | ||
62 | // ClientAddress allows to annotate the client origin in case the client was | |
63 | // forwarded by a proxy which does not instrument itself. | |
64 | ClientAddress = "ca" | |
54 | 65 | ) |
55 | 66 | |
56 | 67 | // AnnotateServer returns a server.Middleware that extracts a span from the |
62 | 73 | return func(ctx context.Context, request interface{}) (interface{}, error) { |
63 | 74 | span, ok := FromContext(ctx) |
64 | 75 | if !ok { |
65 | span = newSpan(newID(), newID(), 0) | |
76 | traceID := newID() | |
77 | span = newSpan(traceID, traceID, 0) | |
66 | 78 | ctx = context.WithValue(ctx, SpanContextKey, span) |
67 | 79 | } |
80 | c.ShouldSample(span) | |
68 | 81 | span.Annotate(ServerReceive) |
69 | 82 | defer func() { span.Annotate(ServerSend); c.Collect(span) }() |
70 | 83 | return next(ctx, request) |
84 | 97 | parentSpan, ok := FromContext(ctx) |
85 | 98 | if ok { |
86 | 99 | clientSpan = newSpan(parentSpan.TraceID(), newID(), parentSpan.SpanID()) |
100 | clientSpan.runSampler = false | |
101 | clientSpan.sampled = c.ShouldSample(parentSpan) | |
87 | 102 | } else { |
88 | clientSpan = newSpan(newID(), newID(), 0) | |
103 | // Abnormal operation. Traces should always start server side. | |
104 | // We create a root span but annotate with a warning. | |
105 | traceID := newID() | |
106 | clientSpan = newSpan(traceID, traceID, 0) | |
107 | c.ShouldSample(clientSpan) | |
108 | clientSpan.AnnotateBinary("warning", "missing server side trace") | |
89 | 109 | } |
90 | 110 | ctx = context.WithValue(ctx, SpanContextKey, clientSpan) // set |
91 | 111 | defer func() { ctx = context.WithValue(ctx, SpanContextKey, parentSpan) }() // reset |
102 | 122 | // Before stack. The logger is used to report errors. |
103 | 123 | func ToContext(newSpan NewSpanFunc, logger log.Logger) func(ctx context.Context, r *http.Request) context.Context { |
104 | 124 | return func(ctx context.Context, r *http.Request) context.Context { |
105 | return context.WithValue(ctx, SpanContextKey, fromHTTP(newSpan, r, logger)) | |
125 | span := fromHTTP(newSpan, r, logger) | |
126 | if span == nil { | |
127 | return ctx | |
128 | } | |
129 | return context.WithValue(ctx, SpanContextKey, span) | |
106 | 130 | } |
107 | 131 | } |
108 | 132 | |
112 | 136 | // Before stack. The logger is used to report errors. |
113 | 137 | func ToGRPCContext(newSpan NewSpanFunc, logger log.Logger) func(ctx context.Context, md *metadata.MD) context.Context { |
114 | 138 | return func(ctx context.Context, md *metadata.MD) context.Context { |
115 | return context.WithValue(ctx, SpanContextKey, fromGRPC(newSpan, *md, logger)) | |
139 | span := fromGRPC(newSpan, *md, logger) | |
140 | if span == nil { | |
141 | return ctx | |
142 | } | |
143 | return context.WithValue(ctx, SpanContextKey, span) | |
116 | 144 | } |
117 | 145 | } |
118 | 146 | |
125 | 153 | return func(ctx context.Context, r *http.Request) context.Context { |
126 | 154 | span, ok := FromContext(ctx) |
127 | 155 | if !ok { |
128 | span = newSpan(newID(), newID(), 0) | |
156 | return ctx | |
129 | 157 | } |
130 | 158 | if id := span.TraceID(); id > 0 { |
131 | 159 | r.Header.Set(traceIDHTTPHeader, strconv.FormatInt(id, 16)) |
135 | 163 | } |
136 | 164 | if id := span.ParentSpanID(); id > 0 { |
137 | 165 | r.Header.Set(parentSpanIDHTTPHeader, strconv.FormatInt(id, 16)) |
166 | } | |
167 | if span.IsSampled() { | |
168 | r.Header.Set(sampledHTTPHeader, "1") | |
169 | } else { | |
170 | r.Header.Set(sampledHTTPHeader, "0") | |
138 | 171 | } |
139 | 172 | return ctx |
140 | 173 | } |
149 | 182 | return func(ctx context.Context, md *metadata.MD) context.Context { |
150 | 183 | span, ok := FromContext(ctx) |
151 | 184 | if !ok { |
152 | span = newSpan(newID(), newID(), 0) | |
185 | return ctx | |
153 | 186 | } |
154 | 187 | if id := span.TraceID(); id > 0 { |
155 | key, value := grpc.EncodeKeyValue(traceIDGRPCKey, strconv.FormatInt(id, 16)) | |
156 | (*md)[key] = append((*md)[key], value) | |
188 | (*md)[traceIDGRPCKey] = append((*md)[traceIDGRPCKey], strconv.FormatInt(id, 16)) | |
157 | 189 | } |
158 | 190 | if id := span.SpanID(); id > 0 { |
159 | key, value := grpc.EncodeKeyValue(spanIDGRPCKey, strconv.FormatInt(id, 16)) | |
160 | (*md)[key] = append((*md)[key], value) | |
191 | (*md)[spanIDGRPCKey] = append((*md)[spanIDGRPCKey], strconv.FormatInt(id, 16)) | |
161 | 192 | } |
162 | 193 | if id := span.ParentSpanID(); id > 0 { |
163 | key, value := grpc.EncodeKeyValue(parentSpanIDGRPCKey, strconv.FormatInt(id, 16)) | |
164 | (*md)[key] = append((*md)[key], value) | |
194 | (*md)[parentSpanIDGRPCKey] = append((*md)[parentSpanIDGRPCKey], strconv.FormatInt(id, 16)) | |
195 | } | |
196 | if span.IsSampled() { | |
197 | (*md)[sampledGRPCKey] = append((*md)[sampledGRPCKey], "1") | |
198 | } else { | |
199 | (*md)[sampledGRPCKey] = append((*md)[sampledGRPCKey], "0") | |
165 | 200 | } |
166 | 201 | return ctx |
167 | 202 | } |
170 | 205 | func fromHTTP(newSpan NewSpanFunc, r *http.Request, logger log.Logger) *Span { |
171 | 206 | traceIDStr := r.Header.Get(traceIDHTTPHeader) |
172 | 207 | if traceIDStr == "" { |
173 | return newSpan(newID(), newID(), 0) // normal; just make a new one | |
208 | return nil | |
174 | 209 | } |
175 | 210 | traceID, err := strconv.ParseInt(traceIDStr, 16, 64) |
176 | 211 | if err != nil { |
177 | logger.Log(traceIDHTTPHeader, traceIDStr, "err", err) | |
178 | return newSpan(newID(), newID(), 0) | |
212 | logger.Log("msg", "invalid trace id found, ignoring trace", "err", err) | |
213 | return nil | |
179 | 214 | } |
180 | 215 | spanIDStr := r.Header.Get(spanIDHTTPHeader) |
181 | 216 | if spanIDStr == "" { |
196 | 231 | logger.Log(parentSpanIDHTTPHeader, parentSpanIDStr, "err", err) // abnormal |
197 | 232 | parentSpanID = 0 // the only way to deal with it |
198 | 233 | } |
199 | return newSpan(traceID, spanID, parentSpanID) | |
234 | span := newSpan(traceID, spanID, parentSpanID) | |
235 | switch r.Header.Get(sampledHTTPHeader) { | |
236 | case "0": | |
237 | span.runSampler = false | |
238 | span.sampled = false | |
239 | case "1": | |
240 | span.runSampler = false | |
241 | span.sampled = true | |
242 | default: | |
243 | // we don't know if the upstream trace was sampled. use our sampler | |
244 | span.runSampler = true | |
245 | } | |
246 | return span | |
200 | 247 | } |
201 | 248 | |
202 | 249 | func fromGRPC(newSpan NewSpanFunc, md metadata.MD, logger log.Logger) *Span { |
203 | 250 | traceIDSlc := md[traceIDGRPCKey] |
204 | 251 | pos := len(traceIDSlc) - 1 |
205 | 252 | if pos < 0 { |
206 | return newSpan(newID(), newID(), 0) // normal; just make a new one | |
253 | return nil | |
207 | 254 | } |
208 | 255 | traceID, err := strconv.ParseInt(traceIDSlc[pos], 16, 64) |
209 | 256 | if err != nil { |
210 | logger.Log(traceIDHTTPHeader, traceIDSlc, "err", err) | |
211 | return newSpan(newID(), newID(), 0) | |
257 | logger.Log("msg", "invalid trace id found, ignoring trace", "err", err) | |
258 | return nil | |
212 | 259 | } |
213 | 260 | spanIDSlc := md[spanIDGRPCKey] |
214 | 261 | pos = len(spanIDSlc) - 1 |
239 | 286 | logger.Log(parentSpanIDHTTPHeader, parentSpanIDSlc, "err", err) // abnormal |
240 | 287 | parentSpanID = 0 // the only way to deal with it |
241 | 288 | } |
242 | return newSpan(traceID, spanID, parentSpanID) | |
289 | span := newSpan(traceID, spanID, parentSpanID) | |
290 | var sampledHdr string | |
291 | sampledSlc := md[sampledGRPCKey] | |
292 | pos = len(sampledSlc) - 1 | |
293 | if pos >= 0 { | |
294 | sampledHdr = sampledSlc[pos] | |
295 | } | |
296 | switch sampledHdr { | |
297 | case "0": | |
298 | span.runSampler = false | |
299 | span.sampled = false | |
300 | case "1": | |
301 | span.runSampler = false | |
302 | span.sampled = true | |
303 | default: | |
304 | // we don't know if the upstream trace was sampled. use our sampler | |
305 | span.runSampler = true | |
306 | } | |
307 | return span | |
243 | 308 | } |
244 | 309 | |
245 | 310 | // FromContext extracts an existing Zipkin span if it is stored in the provided |
25 | 25 | traceID int64 = 12 |
26 | 26 | spanID int64 = 34 |
27 | 27 | parentSpanID int64 = 56 |
28 | sampled = "1" | |
28 | 29 | ) |
29 | 30 | |
30 | 31 | r, _ := http.NewRequest("GET", "https://best.horse", nil) |
31 | 32 | r.Header.Set("X-B3-TraceId", strconv.FormatInt(traceID, 16)) |
32 | 33 | r.Header.Set("X-B3-SpanId", strconv.FormatInt(spanID, 16)) |
33 | 34 | r.Header.Set("X-B3-ParentSpanId", strconv.FormatInt(parentSpanID, 16)) |
35 | r.Header.Set("X-B3-Sampled", sampled) | |
34 | 36 | |
35 | 37 | newSpan := zipkin.MakeNewSpanFunc(hostport, serviceName, methodName) |
36 | 38 | toContext := zipkin.ToContext(newSpan, log.NewLogfmtLogger(ioutil.Discard)) |
58 | 60 | t.Errorf("%s: want %d, have %d", name, want, have) |
59 | 61 | } |
60 | 62 | } |
63 | if want, have := true, span.IsSampled(); want != have { | |
64 | t.Errorf("IsSampled: want %v, have %v", want, have) | |
65 | } | |
61 | 66 | } |
62 | 67 | |
63 | 68 | func TestFromContext(t *testing.T) { |
70 | 75 | parentSpanID int64 = 58 |
71 | 76 | ) |
72 | 77 | |
78 | newSpan := zipkin.NewSpan(hostport, serviceName, methodName, traceID, spanID, parentSpanID) | |
79 | newSpan.Sample() | |
73 | 80 | ctx := context.WithValue( |
74 | 81 | context.Background(), |
75 | 82 | zipkin.SpanContextKey, |
76 | zipkin.NewSpan(hostport, serviceName, methodName, traceID, spanID, parentSpanID), | |
83 | newSpan, | |
77 | 84 | ) |
78 | 85 | |
79 | 86 | span, ok := zipkin.FromContext(ctx) |
96 | 103 | t.Errorf("%s: want %d, have %d", name, want, have) |
97 | 104 | } |
98 | 105 | } |
106 | if want, have := true, span.IsSampled(); want != have { | |
107 | t.Errorf("IsSampled: want %v, have %v", want, have) | |
108 | } | |
99 | 109 | } |
100 | 110 | |
101 | 111 | func TestToGRPCContext(t *testing.T) { |
112 | 122 | "x-b3-traceid": []string{strconv.FormatInt(traceID, 16)}, |
113 | 123 | "x-b3-spanid": []string{strconv.FormatInt(spanID, 16)}, |
114 | 124 | "x-b3-parentspanid": []string{strconv.FormatInt(parentSpanID, 16)}, |
125 | "x-b3-sampled": []string{"1"}, | |
115 | 126 | } |
116 | 127 | |
117 | 128 | newSpan := zipkin.MakeNewSpanFunc(hostport, serviceName, methodName) |
140 | 151 | t.Errorf("%s: want %d, have %d", name, want, have) |
141 | 152 | } |
142 | 153 | } |
154 | if want, have := true, span.IsSampled(); want != have { | |
155 | t.Errorf("IsSampled: want %v, have %v", want, have) | |
156 | } | |
143 | 157 | } |
144 | 158 | |
145 | 159 | func TestToRequest(t *testing.T) { |
150 | 164 | traceID int64 = 20 |
151 | 165 | spanID int64 = 40 |
152 | 166 | parentSpanID int64 = 90 |
167 | sampled = "1" | |
153 | 168 | ) |
154 | 169 | |
155 | 170 | newSpan := zipkin.MakeNewSpanFunc(hostport, serviceName, methodName) |
156 | 171 | span := newSpan(traceID, spanID, parentSpanID) |
172 | span.Sample() | |
157 | 173 | ctx := context.WithValue(context.Background(), zipkin.SpanContextKey, span) |
158 | 174 | r, _ := http.NewRequest("GET", "https://best.horse", nil) |
159 | 175 | ctx = zipkin.ToRequest(newSpan)(ctx, r) |
167 | 183 | t.Errorf("%s: want %q, have %q", header, want, have) |
168 | 184 | } |
169 | 185 | } |
186 | if want, have := sampled, r.Header.Get("X-B3-Sampled"); want != have { | |
187 | t.Errorf("X-B3-Sampled: want %q, have %q", want, have) | |
188 | } | |
170 | 189 | } |
171 | 190 | |
172 | 191 | func TestToGRPCRequest(t *testing.T) { |
177 | 196 | traceID int64 = 20 |
178 | 197 | spanID int64 = 40 |
179 | 198 | parentSpanID int64 = 90 |
199 | sampled = "1" | |
180 | 200 | ) |
181 | 201 | |
182 | 202 | newSpan := zipkin.MakeNewSpanFunc(hostport, serviceName, methodName) |
183 | 203 | span := newSpan(traceID, spanID, parentSpanID) |
204 | span.Sample() | |
184 | 205 | ctx := context.WithValue(context.Background(), zipkin.SpanContextKey, span) |
185 | 206 | md := &metadata.MD{} |
186 | 207 | ctx = zipkin.ToGRPCRequest(newSpan)(ctx, md) |
194 | 215 | t.Errorf("%s: want %q, have %q", header, want, have) |
195 | 216 | } |
196 | 217 | } |
218 | if want, have := sampled, (*md)["x-b3-sampled"][0]; want != have { | |
219 | t.Errorf("x-b3-sampled: want %q, have %q", want, have) | |
220 | } | |
221 | ||
197 | 222 | } |
198 | 223 | |
199 | 224 | func TestAnnotateServer(t *testing.T) { |
247 | 272 | return nil |
248 | 273 | } |
249 | 274 | |
275 | func (c *countingCollector) ShouldSample(s *zipkin.Span) bool { | |
276 | return true | |
277 | } | |
278 | ||
250 | 279 | func (c *countingCollector) Close() error { |
251 | 280 | return nil |
252 | 281 | } |