BugFixes to opentracing.From*Request logic & Zipkin Wiring
The opentracing.FromHTTPRequest and opentracing.FromGRPCRequest incorrectly assumed span would be nil
on a Join error. This made the next step crash in a server side implementation.
Added Zipkin to the examples to highlight the availability of it through OpenTracing.
Bas van Beek
7 years ago
8 | 8 | "strings" |
9 | 9 | "time" |
10 | 10 | |
11 | zipkin "github.com/basvanbeek/zipkin-go-opentracing" | |
11 | 12 | "github.com/lightstep/lightstep-tracer-go" |
12 | 13 | "github.com/opentracing/opentracing-go" |
13 | 14 | appdashot "github.com/sourcegraph/appdash/opentracing" |
36 | 37 | thriftBufferSize = flag.Int("thrift.buffer.size", 0, "0 for unbuffered") |
37 | 38 | thriftFramed = flag.Bool("thrift.framed", false, "true to enable framing") |
38 | 39 | |
39 | // Two OpenTracing backends (to demonstrate how they can be interchanged): | |
40 | // Three OpenTracing backends (to demonstrate how they can be interchanged): | |
41 | zipkinAddr = flag.String("zipkin.kafka.addr", "", "Enable Zipkin tracing via a Kafka Collector host:port") | |
40 | 42 | appdashAddr = flag.String("appdash.addr", "", "Enable Appdash tracing via an Appdash server host:port") |
41 | 43 | lightstepAccessToken = flag.String("lightstep.token", "", "Enable LightStep tracing via a LightStep access token") |
42 | 44 | ) |
62 | 64 | var tracer opentracing.Tracer |
63 | 65 | { |
64 | 66 | switch { |
65 | case *appdashAddr != "" && *lightstepAccessToken == "": | |
67 | case *appdashAddr != "" && *lightstepAccessToken == "" && *zipkinAddr == "": | |
66 | 68 | tracer = appdashot.NewTracer(appdash.NewRemoteCollector(*appdashAddr)) |
67 | case *appdashAddr == "" && *lightstepAccessToken != "": | |
69 | case *appdashAddr == "" && *lightstepAccessToken != "" && *zipkinAddr == "": | |
68 | 70 | tracer = lightstep.NewTracer(lightstep.Options{ |
69 | 71 | AccessToken: *lightstepAccessToken, |
70 | 72 | }) |
71 | 73 | defer lightstep.FlushLightStepTracer(tracer) |
72 | case *appdashAddr == "" && *lightstepAccessToken == "": | |
74 | case *appdashAddr == "" && *lightstepAccessToken == "" && *zipkinAddr != "": | |
75 | collector, err := zipkin.NewKafkaCollector( | |
76 | strings.Split(*zipkinAddr, ","), | |
77 | zipkin.KafkaLogger(tracingLogger), | |
78 | ) | |
79 | if err != nil { | |
80 | tracingLogger.Log("err", "unable to create kafka collector") | |
81 | os.Exit(1) | |
82 | } | |
83 | tracer, err = zipkin.NewTracer( | |
84 | zipkin.NewRecorder(collector, false, "localhost:8000", "addsvc-client"), | |
85 | ) | |
86 | if err != nil { | |
87 | tracingLogger.Log("err", "unable to create zipkin tracer") | |
88 | os.Exit(1) | |
89 | } | |
90 | case *appdashAddr == "" && *lightstepAccessToken == "" && *zipkinAddr == "": | |
73 | 91 | tracer = opentracing.GlobalTracer() // no-op |
74 | 92 | default: |
75 | panic("specify either -appdash.addr or -lightstep.access.token, not both") | |
93 | panic("specify a single -appdash.addr, -lightstep.access.token or -zipkin.kafka.addr") | |
76 | 94 | } |
77 | 95 | } |
78 | 96 | |
135 | 153 | logger.Log("err", "invalid method "+method) |
136 | 154 | os.Exit(1) |
137 | 155 | } |
156 | // wait for collector | |
157 | time.Sleep(2 * time.Second) | |
138 | 158 | } |
139 | 159 | |
140 | 160 | func buildEndpoint(tracer opentracing.Tracer, operationName string, instances []string, factory loadbalancer.Factory, seed int64, logger log.Logger) endpoint.Endpoint { |
14 | 14 | "time" |
15 | 15 | |
16 | 16 | "github.com/apache/thrift/lib/go/thrift" |
17 | zipkin "github.com/basvanbeek/zipkin-go-opentracing" | |
17 | 18 | "github.com/lightstep/lightstep-tracer-go" |
18 | 19 | "github.com/opentracing/opentracing-go" |
19 | 20 | stdprometheus "github.com/prometheus/client_golang/prometheus" |
31 | 32 | "github.com/go-kit/kit/metrics/expvar" |
32 | 33 | "github.com/go-kit/kit/metrics/prometheus" |
33 | 34 | kitot "github.com/go-kit/kit/tracing/opentracing" |
34 | "github.com/go-kit/kit/tracing/zipkin" | |
35 | 35 | httptransport "github.com/go-kit/kit/transport/http" |
36 | 36 | ) |
37 | 37 | |
50 | 50 | thriftFramed = fs.Bool("thrift.framed", false, "true to enable framing") |
51 | 51 | |
52 | 52 | // Supported OpenTracing backends |
53 | zipkinAddr = fs.String("zipkin.kafka.addr", "", "Enable Zipkin tracing via a Kafka server host:port") | |
53 | 54 | appdashAddr = fs.String("appdash.addr", "", "Enable Appdash tracing via an Appdash server host:port") |
54 | 55 | lightstepAccessToken = fs.String("lightstep.token", "", "Enable LightStep tracing via a LightStep access token") |
55 | 56 | ) |
87 | 88 | var tracer opentracing.Tracer |
88 | 89 | { |
89 | 90 | switch { |
90 | case *appdashAddr != "" && *lightstepAccessToken == "": | |
91 | case *appdashAddr != "" && *lightstepAccessToken == "" && *zipkinAddr == "": | |
91 | 92 | tracer = appdashot.NewTracer(appdash.NewRemoteCollector(*appdashAddr)) |
92 | case *appdashAddr == "" && *lightstepAccessToken != "": | |
93 | case *appdashAddr == "" && *lightstepAccessToken != "" && *zipkinAddr == "": | |
93 | 94 | tracer = lightstep.NewTracer(lightstep.Options{ |
94 | 95 | AccessToken: *lightstepAccessToken, |
95 | 96 | }) |
96 | 97 | defer lightstep.FlushLightStepTracer(tracer) |
97 | case *appdashAddr == "" && *lightstepAccessToken == "": | |
98 | case *appdashAddr == "" && *lightstepAccessToken == "" && *zipkinAddr != "": | |
99 | collector, err := zipkin.NewKafkaCollector( | |
100 | strings.Split(*zipkinAddr, ","), | |
101 | zipkin.KafkaLogger(logger), | |
102 | ) | |
103 | if err != nil { | |
104 | logger.Log("err", "unable to create kafka collector") | |
105 | os.Exit(1) | |
106 | } | |
107 | tracer, err = zipkin.NewTracer( | |
108 | zipkin.NewRecorder(collector, false, "localhost:80", "addsvc"), | |
109 | ) | |
110 | if err != nil { | |
111 | logger.Log("err", "unable to create zipkin tracer") | |
112 | os.Exit(1) | |
113 | } | |
114 | case *appdashAddr == "" && *lightstepAccessToken == "" && *zipkinAddr == "": | |
98 | 115 | tracer = opentracing.GlobalTracer() // no-op |
99 | 116 | default: |
100 | panic("specify either -appdash.addr or -lightstep.access.token, not both") | |
117 | panic("specify a single -appdash.addr, -lightstep.access.token or -zipkin.kafka.addr") | |
101 | 118 | } |
102 | 119 | } |
103 | 120 | |
236 | 253 | signal.Notify(c, syscall.SIGINT, syscall.SIGTERM) |
237 | 254 | return fmt.Errorf("%s", <-c) |
238 | 255 | } |
239 | ||
240 | type loggingCollector struct{ log.Logger } | |
241 | ||
242 | func (c loggingCollector) Collect(s *zipkin.Span) error { | |
243 | annotations := s.Encode().GetAnnotations() | |
244 | values := make([]string, len(annotations)) | |
245 | for i, a := range annotations { | |
246 | values[i] = a.Value | |
247 | } | |
248 | c.Logger.Log( | |
249 | "trace_id", s.TraceID(), | |
250 | "span_id", s.SpanID(), | |
251 | "parent_span_id", s.ParentSpanID(), | |
252 | "annotations", strings.Join(values, " "), | |
253 | ) | |
254 | return nil | |
255 | } | |
256 | ||
257 | func (c loggingCollector) ShouldSample(*zipkin.Span) bool { return true } | |
258 | ||
259 | func (c loggingCollector) Close() error { return nil } |
38 | 38 | func FromGRPCRequest(tracer opentracing.Tracer, operationName string, logger log.Logger) func(ctx context.Context, md *metadata.MD) context.Context { |
39 | 39 | return func(ctx context.Context, md *metadata.MD) context.Context { |
40 | 40 | span, err := tracer.Join(operationName, opentracing.TextMap, metadataReaderWriter{md}) |
41 | if err != nil && logger != nil { | |
42 | logger.Log("msg", "Join failed", "err", err) | |
43 | } | |
44 | if span == nil { | |
41 | if err != nil { | |
45 | 42 | span = tracer.StartSpan(operationName) |
43 | if logger != nil { | |
44 | logger.Log("msg", "Join failed", "err", err) | |
45 | } | |
46 | 46 | } |
47 | 47 | return opentracing.ContextWithSpan(ctx, span) |
48 | 48 | } |
56 | 56 | // The logger is used to report errors and may be nil. |
57 | 57 | func FromHTTPRequest(tracer opentracing.Tracer, operationName string, logger log.Logger) kithttp.RequestFunc { |
58 | 58 | return func(ctx context.Context, req *http.Request) context.Context { |
59 | // Try to join to a trace propagated in `req`. There's nothing we can | |
60 | // do with any errors here, so we ignore them. | |
59 | // Try to join to a trace propagated in `req`. | |
61 | 60 | span, err := tracer.Join( |
62 | 61 | operationName, |
63 | 62 | opentracing.TextMap, |
64 | 63 | opentracing.HTTPHeaderTextMapCarrier(req.Header), |
65 | 64 | ) |
66 | if err != nil && logger != nil { | |
67 | logger.Log("msg", "Join failed", "err", err) | |
68 | } | |
69 | if span == nil { | |
70 | span = opentracing.StartSpan(operationName) | |
65 | if err != nil { | |
66 | span = tracer.StartSpan(operationName) | |
67 | if logger != nil { | |
68 | logger.Log("msg", "Join failed", "err", err) | |
69 | } | |
71 | 70 | } |
72 | 71 | return opentracing.ContextWithSpan(ctx, span) |
73 | 72 | } |