Merge pull request #247 from bensigelman/bhs/opentracing
OpenTracing integration
Peter Bourgon
6 years ago
2 | 2 | import ( |
3 | 3 | "io" |
4 | 4 | |
5 | kitot "github.com/go-kit/kit/tracing/opentracing" | |
6 | "github.com/opentracing/opentracing-go" | |
5 | 7 | "google.golang.org/grpc" |
6 | 8 | |
7 | 9 | "github.com/go-kit/kit/endpoint" |
8 | 10 | "github.com/go-kit/kit/examples/addsvc/pb" |
11 | "github.com/go-kit/kit/loadbalancer" | |
12 | "github.com/go-kit/kit/log" | |
9 | 13 | grpctransport "github.com/go-kit/kit/transport/grpc" |
10 | 14 | ) |
11 | 15 | |
12 | // SumEndpointFactory transforms GRPC host:port strings into Endpoints that call the Sum method on a GRPC server | |
16 | // MakeSumEndpointFactory returns a loadbalancer.Factory that transforms GRPC | |
17 | // host:port strings into Endpoints that call the Sum method on a GRPC server | |
13 | 18 | // at that address. |
14 | func SumEndpointFactory(instance string) (endpoint.Endpoint, io.Closer, error) { | |
15 | cc, err := grpc.Dial(instance, grpc.WithInsecure()) | |
16 | return grpctransport.NewClient( | |
17 | cc, | |
18 | "Add", | |
19 | "Sum", | |
20 | encodeSumRequest, | |
21 | decodeSumResponse, | |
22 | pb.SumReply{}, | |
23 | ).Endpoint(), cc, err | |
19 | func MakeSumEndpointFactory(tracer opentracing.Tracer, tracingLogger log.Logger) loadbalancer.Factory { | |
20 | return func(instance string) (endpoint.Endpoint, io.Closer, error) { | |
21 | cc, err := grpc.Dial(instance, grpc.WithInsecure()) | |
22 | return grpctransport.NewClient( | |
23 | cc, | |
24 | "Add", | |
25 | "Sum", | |
26 | encodeSumRequest, | |
27 | decodeSumResponse, | |
28 | pb.SumReply{}, | |
29 | grpctransport.SetClientBefore(kitot.ToGRPCRequest(tracer, tracingLogger)), | |
30 | ).Endpoint(), cc, err | |
31 | } | |
24 | 32 | } |
25 | 33 | |
26 | // ConcatEndpointFactory transforms GRPC host:port strings into Endpoints that call the Concat method on a GRPC server | |
27 | // at that address. | |
28 | func ConcatEndpointFactory(instance string) (endpoint.Endpoint, io.Closer, error) { | |
29 | cc, err := grpc.Dial(instance, grpc.WithInsecure()) | |
30 | return grpctransport.NewClient( | |
31 | cc, | |
32 | "Add", | |
33 | "Concat", | |
34 | encodeConcatRequest, | |
35 | decodeConcatResponse, | |
36 | pb.ConcatReply{}, | |
37 | ).Endpoint(), cc, err | |
34 | // MakeConcatEndpointFactory returns a loadbalancer.Factory that transforms | |
35 | // GRPC host:port strings into Endpoints that call the Concat method on a GRPC | |
36 | // server at that address. | |
37 | func MakeConcatEndpointFactory(tracer opentracing.Tracer, tracingLogger log.Logger) loadbalancer.Factory { | |
38 | return func(instance string) (endpoint.Endpoint, io.Closer, error) { | |
39 | cc, err := grpc.Dial(instance, grpc.WithInsecure()) | |
40 | return grpctransport.NewClient( | |
41 | cc, | |
42 | "Add", | |
43 | "Concat", | |
44 | encodeConcatRequest, | |
45 | decodeConcatResponse, | |
46 | pb.ConcatReply{}, | |
47 | grpctransport.SetClientBefore(kitot.ToGRPCRequest(tracer, tracingLogger)), | |
48 | ).Endpoint(), cc, err | |
49 | } | |
38 | 50 | } |
3 | 3 | "io" |
4 | 4 | "net/url" |
5 | 5 | |
6 | "github.com/opentracing/opentracing-go" | |
7 | ||
6 | 8 | "github.com/go-kit/kit/endpoint" |
7 | 9 | "github.com/go-kit/kit/examples/addsvc/server" |
10 | "github.com/go-kit/kit/loadbalancer" | |
11 | "github.com/go-kit/kit/log" | |
12 | kitot "github.com/go-kit/kit/tracing/opentracing" | |
8 | 13 | httptransport "github.com/go-kit/kit/transport/http" |
9 | 14 | ) |
10 | 15 | |
11 | // SumEndpointFactory transforms a http url into an Endpoint. | |
16 | // MakeSumEndpointFactory generates a Factory that transforms an http url into | |
17 | // an Endpoint. | |
18 | // | |
12 | 19 | // The path of the url is reset to /sum. |
13 | func SumEndpointFactory(instance string) (endpoint.Endpoint, io.Closer, error) { | |
14 | sumURL, err := url.Parse(instance) | |
15 | if err != nil { | |
16 | return nil, nil, err | |
20 | func MakeSumEndpointFactory(tracer opentracing.Tracer, tracingLogger log.Logger) loadbalancer.Factory { | |
21 | return func(instance string) (endpoint.Endpoint, io.Closer, error) { | |
22 | sumURL, err := url.Parse(instance) | |
23 | if err != nil { | |
24 | return nil, nil, err | |
25 | } | |
26 | sumURL.Path = "/sum" | |
27 | ||
28 | client := httptransport.NewClient( | |
29 | "GET", | |
30 | sumURL, | |
31 | server.EncodeSumRequest, | |
32 | server.DecodeSumResponse, | |
33 | httptransport.SetClient(nil), | |
34 | httptransport.SetClientBefore(kitot.ToHTTPRequest(tracer, tracingLogger)), | |
35 | ) | |
36 | ||
37 | return client.Endpoint(), nil, nil | |
17 | 38 | } |
18 | sumURL.Path = "/sum" | |
19 | ||
20 | client := httptransport.NewClient( | |
21 | "GET", | |
22 | sumURL, | |
23 | server.EncodeSumRequest, | |
24 | server.DecodeSumResponse, | |
25 | httptransport.SetClient(nil), | |
26 | ) | |
27 | ||
28 | return client.Endpoint(), nil, nil | |
29 | 39 | } |
30 | 40 | |
31 | // ConcatEndpointFactory transforms a http url into an Endpoint. | |
41 | // MakeConcatEndpointFactory generates a Factory that transforms an http url | |
42 | // into an Endpoint. | |
43 | // | |
32 | 44 | // The path of the url is reset to /concat. |
33 | func ConcatEndpointFactory(instance string) (endpoint.Endpoint, io.Closer, error) { | |
34 | concatURL, err := url.Parse(instance) | |
35 | if err != nil { | |
36 | return nil, nil, err | |
45 | func MakeConcatEndpointFactory(tracer opentracing.Tracer, tracingLogger log.Logger) loadbalancer.Factory { | |
46 | return func(instance string) (endpoint.Endpoint, io.Closer, error) { | |
47 | concatURL, err := url.Parse(instance) | |
48 | if err != nil { | |
49 | return nil, nil, err | |
50 | } | |
51 | concatURL.Path = "/concat" | |
52 | ||
53 | client := httptransport.NewClient( | |
54 | "GET", | |
55 | concatURL, | |
56 | server.EncodeConcatRequest, | |
57 | server.DecodeConcatResponse, | |
58 | httptransport.SetClient(nil), | |
59 | httptransport.SetClientBefore(kitot.ToHTTPRequest(tracer, tracingLogger)), | |
60 | ) | |
61 | ||
62 | return client.Endpoint(), nil, nil | |
37 | 63 | } |
38 | concatURL.Path = "/concat" | |
39 | ||
40 | client := httptransport.NewClient( | |
41 | "GET", | |
42 | concatURL, | |
43 | server.EncodeConcatRequest, | |
44 | server.DecodeConcatResponse, | |
45 | httptransport.SetClient(nil), | |
46 | ) | |
47 | ||
48 | return client.Endpoint(), nil, nil | |
49 | 64 | } |
8 | 8 | "strings" |
9 | 9 | "time" |
10 | 10 | |
11 | "github.com/lightstep/lightstep-tracer-go" | |
12 | "github.com/opentracing/opentracing-go" | |
13 | appdashot "github.com/sourcegraph/appdash/opentracing" | |
11 | 14 | "golang.org/x/net/context" |
15 | "sourcegraph.com/sourcegraph/appdash" | |
12 | 16 | |
13 | 17 | "github.com/go-kit/kit/endpoint" |
14 | 18 | grpcclient "github.com/go-kit/kit/examples/addsvc/client/grpc" |
18 | 22 | "github.com/go-kit/kit/loadbalancer" |
19 | 23 | "github.com/go-kit/kit/loadbalancer/static" |
20 | 24 | "github.com/go-kit/kit/log" |
25 | kitot "github.com/go-kit/kit/tracing/opentracing" | |
21 | 26 | ) |
22 | 27 | |
23 | 28 | func main() { |
30 | 35 | thriftProtocol = flag.String("thrift.protocol", "binary", "binary, compact, json, simplejson") |
31 | 36 | thriftBufferSize = flag.Int("thrift.buffer.size", 0, "0 for unbuffered") |
32 | 37 | thriftFramed = flag.Bool("thrift.framed", false, "true to enable framing") |
38 | ||
39 | // Two OpenTracing backends (to demonstrate how they can be interchanged): | |
40 | appdashAddr = flag.String("appdash.addr", "", "Enable Appdash tracing via an Appdash server host:port") | |
41 | lightstepAccessToken = flag.String("lightstep.token", "", "Enable LightStep tracing via a LightStep access token") | |
33 | 42 | ) |
34 | 43 | flag.Parse() |
35 | 44 | if len(os.Args) < 4 { |
47 | 56 | logger = log.NewLogfmtLogger(os.Stdout) |
48 | 57 | logger = log.NewContext(logger).With("caller", log.DefaultCaller) |
49 | 58 | logger = log.NewContext(logger).With("transport", *transport) |
59 | tracingLogger := log.NewContext(logger).With("component", "tracing") | |
60 | ||
61 | // Set up OpenTracing | |
62 | var tracer opentracing.Tracer | |
63 | { | |
64 | switch { | |
65 | case *appdashAddr != "" && *lightstepAccessToken == "": | |
66 | tracer = appdashot.NewTracer(appdash.NewRemoteCollector(*appdashAddr)) | |
67 | case *appdashAddr == "" && *lightstepAccessToken != "": | |
68 | tracer = lightstep.NewTracer(lightstep.Options{ | |
69 | AccessToken: *lightstepAccessToken, | |
70 | }) | |
71 | defer lightstep.FlushLightStepTracer(tracer) | |
72 | case *appdashAddr == "" && *lightstepAccessToken == "": | |
73 | tracer = opentracing.GlobalTracer() // no-op | |
74 | default: | |
75 | panic("specify either -appdash.addr or -lightstep.access.token, not both") | |
76 | } | |
77 | } | |
50 | 78 | |
51 | 79 | var ( |
52 | 80 | instances []string |
56 | 84 | switch *transport { |
57 | 85 | case "grpc": |
58 | 86 | instances = strings.Split(*grpcAddrs, ",") |
59 | sumFactory = grpcclient.SumEndpointFactory | |
60 | concatFactory = grpcclient.ConcatEndpointFactory | |
87 | sumFactory = grpcclient.MakeSumEndpointFactory(tracer, tracingLogger) | |
88 | concatFactory = grpcclient.MakeConcatEndpointFactory(tracer, tracingLogger) | |
61 | 89 | |
62 | 90 | case "httpjson": |
63 | 91 | instances = strings.Split(*httpAddrs, ",") |
66 | 94 | instances[i] = "http://" + rawurl |
67 | 95 | } |
68 | 96 | } |
69 | sumFactory = httpjsonclient.SumEndpointFactory | |
70 | concatFactory = httpjsonclient.ConcatEndpointFactory | |
97 | sumFactory = httpjsonclient.MakeSumEndpointFactory(tracer, tracingLogger) | |
98 | concatFactory = httpjsonclient.MakeConcatEndpointFactory(tracer, tracingLogger) | |
71 | 99 | |
72 | 100 | case "netrpc": |
73 | 101 | instances = strings.Split(*netrpcAddrs, ",") |
85 | 113 | os.Exit(1) |
86 | 114 | } |
87 | 115 | |
88 | sum := buildEndpoint(instances, sumFactory, randomSeed, logger) | |
89 | concat := buildEndpoint(instances, concatFactory, randomSeed, logger) | |
116 | sum := buildEndpoint(tracer, "sum", instances, sumFactory, randomSeed, logger) | |
117 | concat := buildEndpoint(tracer, "concat", instances, concatFactory, randomSeed, logger) | |
90 | 118 | |
91 | 119 | svc := newClient(root, sum, concat, logger) |
92 | 120 | |
109 | 137 | } |
110 | 138 | } |
111 | 139 | |
112 | func buildEndpoint(instances []string, factory loadbalancer.Factory, seed int64, logger log.Logger) endpoint.Endpoint { | |
140 | func buildEndpoint(tracer opentracing.Tracer, operationName string, instances []string, factory loadbalancer.Factory, seed int64, logger log.Logger) endpoint.Endpoint { | |
113 | 141 | publisher := static.NewPublisher(instances, factory, logger) |
114 | 142 | random := loadbalancer.NewRandom(publisher, seed) |
115 | return loadbalancer.Retry(10, 10*time.Second, random) | |
143 | endpoint := loadbalancer.Retry(10, 10*time.Second, random) | |
144 | return kitot.TraceClient(tracer, operationName)(endpoint) | |
116 | 145 | } |
2 | 2 | import ( |
3 | 3 | "golang.org/x/net/context" |
4 | 4 | |
5 | "github.com/opentracing/opentracing-go" | |
6 | ||
5 | 7 | "github.com/go-kit/kit/examples/addsvc/pb" |
6 | 8 | "github.com/go-kit/kit/examples/addsvc/server" |
7 | 9 | servergrpc "github.com/go-kit/kit/examples/addsvc/server/grpc" |
10 | "github.com/go-kit/kit/log" | |
11 | kitot "github.com/go-kit/kit/tracing/opentracing" | |
8 | 12 | "github.com/go-kit/kit/transport/grpc" |
9 | 13 | ) |
10 | 14 | |
12 | 16 | sum, concat grpc.Handler |
13 | 17 | } |
14 | 18 | |
15 | func newGRPCBinding(ctx context.Context, svc server.AddService) grpcBinding { | |
19 | func newGRPCBinding(ctx context.Context, tracer opentracing.Tracer, svc server.AddService, tracingLogger log.Logger) grpcBinding { | |
16 | 20 | return grpcBinding{ |
17 | sum: grpc.NewServer(ctx, makeSumEndpoint(svc), servergrpc.DecodeSumRequest, servergrpc.EncodeSumResponse), | |
18 | concat: grpc.NewServer(ctx, makeConcatEndpoint(svc), servergrpc.DecodeConcatRequest, servergrpc.EncodeConcatResponse), | |
21 | sum: grpc.NewServer( | |
22 | ctx, | |
23 | kitot.TraceServer(tracer, "sum")(makeSumEndpoint(svc)), | |
24 | servergrpc.DecodeSumRequest, | |
25 | servergrpc.EncodeSumResponse, | |
26 | grpc.ServerBefore(kitot.FromGRPCRequest(tracer, "", tracingLogger)), | |
27 | ), | |
28 | concat: grpc.NewServer( | |
29 | ctx, | |
30 | kitot.TraceServer(tracer, "concat")(makeConcatEndpoint(svc)), | |
31 | servergrpc.DecodeConcatRequest, | |
32 | servergrpc.EncodeConcatResponse, | |
33 | grpc.ServerBefore(kitot.FromGRPCRequest(tracer, "", tracingLogger)), | |
34 | ), | |
19 | 35 | } |
20 | 36 | } |
21 | 37 |
14 | 14 | "time" |
15 | 15 | |
16 | 16 | "github.com/apache/thrift/lib/go/thrift" |
17 | "github.com/lightstep/lightstep-tracer-go" | |
18 | "github.com/opentracing/opentracing-go" | |
17 | 19 | stdprometheus "github.com/prometheus/client_golang/prometheus" |
20 | appdashot "github.com/sourcegraph/appdash/opentracing" | |
18 | 21 | "golang.org/x/net/context" |
19 | 22 | "google.golang.org/grpc" |
23 | "sourcegraph.com/sourcegraph/appdash" | |
20 | 24 | |
21 | 25 | "github.com/go-kit/kit/endpoint" |
22 | 26 | "github.com/go-kit/kit/examples/addsvc/pb" |
26 | 30 | "github.com/go-kit/kit/metrics" |
27 | 31 | "github.com/go-kit/kit/metrics/expvar" |
28 | 32 | "github.com/go-kit/kit/metrics/prometheus" |
33 | kitot "github.com/go-kit/kit/tracing/opentracing" | |
29 | 34 | "github.com/go-kit/kit/tracing/zipkin" |
30 | 35 | httptransport "github.com/go-kit/kit/transport/http" |
31 | 36 | ) |
35 | 40 | // of glog. So, we define a new flag set, to keep those domains distinct. |
36 | 41 | fs := flag.NewFlagSet("", flag.ExitOnError) |
37 | 42 | var ( |
38 | debugAddr = fs.String("debug.addr", ":8000", "Address for HTTP debug/instrumentation server") | |
39 | httpAddr = fs.String("http.addr", ":8001", "Address for HTTP (JSON) server") | |
40 | grpcAddr = fs.String("grpc.addr", ":8002", "Address for gRPC server") | |
41 | netrpcAddr = fs.String("netrpc.addr", ":8003", "Address for net/rpc server") | |
42 | thriftAddr = fs.String("thrift.addr", ":8004", "Address for Thrift server") | |
43 | thriftProtocol = fs.String("thrift.protocol", "binary", "binary, compact, json, simplejson") | |
44 | thriftBufferSize = fs.Int("thrift.buffer.size", 0, "0 for unbuffered") | |
45 | thriftFramed = fs.Bool("thrift.framed", false, "true to enable framing") | |
46 | zipkinHostPort = fs.String("zipkin.host.port", "my.service.domain:12345", "Zipkin host:port") | |
47 | zipkinServiceName = fs.String("zipkin.service.name", "addsvc", "Zipkin service name") | |
48 | zipkinCollectorAddr = fs.String("zipkin.collector.addr", "", "Zipkin Kafka collector address (empty will log spans)") | |
43 | debugAddr = fs.String("debug.addr", ":8000", "Address for HTTP debug/instrumentation server") | |
44 | httpAddr = fs.String("http.addr", ":8001", "Address for HTTP (JSON) server") | |
45 | grpcAddr = fs.String("grpc.addr", ":8002", "Address for gRPC server") | |
46 | netrpcAddr = fs.String("netrpc.addr", ":8003", "Address for net/rpc server") | |
47 | thriftAddr = fs.String("thrift.addr", ":8004", "Address for Thrift server") | |
48 | thriftProtocol = fs.String("thrift.protocol", "binary", "binary, compact, json, simplejson") | |
49 | thriftBufferSize = fs.Int("thrift.buffer.size", 0, "0 for unbuffered") | |
50 | thriftFramed = fs.Bool("thrift.framed", false, "true to enable framing") | |
51 | ||
52 | // Supported OpenTracing backends | |
53 | appdashAddr = fs.String("appdash.addr", "", "Enable Appdash tracing via an Appdash server host:port") | |
54 | lightstepAccessToken = fs.String("lightstep.token", "", "Enable LightStep tracing via a LightStep access token") | |
49 | 55 | ) |
50 | 56 | flag.Usage = fs.Usage // only show our flags |
51 | 57 | if err := fs.Parse(os.Args[1:]); err != nil { |
77 | 83 | )) |
78 | 84 | } |
79 | 85 | |
80 | // package tracing | |
81 | var collector zipkin.Collector | |
82 | { | |
83 | zipkinLogger := log.NewContext(logger).With("component", "zipkin") | |
84 | collector = loggingCollector{zipkinLogger} // TODO(pb) | |
85 | if *zipkinCollectorAddr != "" { | |
86 | var err error | |
87 | if collector, err = zipkin.NewKafkaCollector( | |
88 | []string{*zipkinCollectorAddr}, | |
89 | zipkin.KafkaLogger(zipkinLogger), | |
90 | ); err != nil { | |
91 | zipkinLogger.Log("err", err) | |
92 | os.Exit(1) | |
93 | } | |
86 | // Set up OpenTracing | |
87 | var tracer opentracing.Tracer | |
88 | { | |
89 | switch { | |
90 | case *appdashAddr != "" && *lightstepAccessToken == "": | |
91 | tracer = appdashot.NewTracer(appdash.NewRemoteCollector(*appdashAddr)) | |
92 | case *appdashAddr == "" && *lightstepAccessToken != "": | |
93 | tracer = lightstep.NewTracer(lightstep.Options{ | |
94 | AccessToken: *lightstepAccessToken, | |
95 | }) | |
96 | defer lightstep.FlushLightStepTracer(tracer) | |
97 | case *appdashAddr == "" && *lightstepAccessToken == "": | |
98 | tracer = opentracing.GlobalTracer() // no-op | |
99 | default: | |
100 | panic("specify either -appdash.addr or -lightstep.access.token, not both") | |
94 | 101 | } |
95 | 102 | } |
96 | 103 | |
123 | 130 | var ( |
124 | 131 | transportLogger = log.NewContext(logger).With("transport", "HTTP/JSON") |
125 | 132 | tracingLogger = log.NewContext(transportLogger).With("component", "tracing") |
126 | newSumSpan = zipkin.MakeNewSpanFunc(*zipkinHostPort, *zipkinServiceName, "sum") | |
127 | newConcatSpan = zipkin.MakeNewSpanFunc(*zipkinHostPort, *zipkinServiceName, "concat") | |
128 | traceSum = zipkin.ToContext(newSumSpan, tracingLogger) | |
129 | traceConcat = zipkin.ToContext(newConcatSpan, tracingLogger) | |
130 | 133 | mux = http.NewServeMux() |
131 | 134 | sum, concat endpoint.Endpoint |
132 | 135 | ) |
133 | 136 | |
134 | 137 | sum = makeSumEndpoint(svc) |
135 | sum = zipkin.AnnotateServer(newSumSpan, collector)(sum) | |
138 | sum = kitot.TraceServer(tracer, "sum")(sum) | |
136 | 139 | mux.Handle("/sum", httptransport.NewServer( |
137 | 140 | root, |
138 | 141 | sum, |
139 | 142 | server.DecodeSumRequest, |
140 | 143 | server.EncodeSumResponse, |
141 | httptransport.ServerBefore(traceSum), | |
142 | 144 | httptransport.ServerErrorLogger(transportLogger), |
145 | httptransport.ServerBefore(kitot.FromHTTPRequest(tracer, "sum", tracingLogger)), | |
143 | 146 | )) |
144 | 147 | |
145 | 148 | concat = makeConcatEndpoint(svc) |
146 | concat = zipkin.AnnotateServer(newConcatSpan, collector)(concat) | |
149 | concat = kitot.TraceServer(tracer, "concat")(concat) | |
147 | 150 | mux.Handle("/concat", httptransport.NewServer( |
148 | 151 | root, |
149 | 152 | concat, |
150 | 153 | server.DecodeConcatRequest, |
151 | 154 | server.EncodeConcatResponse, |
152 | httptransport.ServerBefore(traceConcat), | |
153 | 155 | httptransport.ServerErrorLogger(transportLogger), |
156 | httptransport.ServerBefore(kitot.FromHTTPRequest(tracer, "concat", tracingLogger)), | |
154 | 157 | )) |
155 | 158 | |
156 | 159 | transportLogger.Log("addr", *httpAddr) |
160 | 163 | // Transport: gRPC |
161 | 164 | go func() { |
162 | 165 | transportLogger := log.NewContext(logger).With("transport", "gRPC") |
166 | tracingLogger := log.NewContext(transportLogger).With("component", "tracing") | |
163 | 167 | ln, err := net.Listen("tcp", *grpcAddr) |
164 | 168 | if err != nil { |
165 | 169 | errc <- err |
166 | 170 | return |
167 | 171 | } |
168 | 172 | s := grpc.NewServer() // uses its own, internal context |
169 | pb.RegisterAddServer(s, newGRPCBinding(root, svc)) | |
173 | pb.RegisterAddServer(s, newGRPCBinding(root, tracer, svc, tracingLogger)) | |
170 | 174 | transportLogger.Log("addr", *grpcAddr) |
171 | 175 | errc <- s.Serve(ln) |
172 | 176 | }() |
16 | 16 | |
17 | 17 | "github.com/gorilla/mux" |
18 | 18 | "github.com/hashicorp/consul/api" |
19 | "github.com/opentracing/opentracing-go" | |
19 | 20 | "golang.org/x/net/context" |
20 | 21 | |
21 | 22 | "github.com/go-kit/kit/endpoint" |
76 | 77 | factory loadbalancer.Factory |
77 | 78 | }{ |
78 | 79 | "addsvc": { |
79 | {path: "/api/addsvc/concat", factory: grpc.ConcatEndpointFactory}, | |
80 | {path: "/api/addsvc/sum", factory: grpc.SumEndpointFactory}, | |
80 | {path: "/api/addsvc/concat", factory: grpc.MakeConcatEndpointFactory(opentracing.GlobalTracer(), nil)}, | |
81 | {path: "/api/addsvc/sum", factory: grpc.MakeSumEndpointFactory(opentracing.GlobalTracer(), nil)}, | |
81 | 82 | }, |
82 | 83 | "stringsvc": { |
83 | 84 | {path: "/api/stringsvc/uppercase", factory: httpFactory(ctx, "GET", "uppercase/")}, |
0 | package opentracing | |
1 | ||
2 | import ( | |
3 | "github.com/opentracing/opentracing-go" | |
4 | otext "github.com/opentracing/opentracing-go/ext" | |
5 | "golang.org/x/net/context" | |
6 | ||
7 | "github.com/go-kit/kit/endpoint" | |
8 | ) | |
9 | ||
10 | // TraceServer returns a Middleware that wraps the `next` Endpoint in an | |
11 | // OpenTracing Span called `operationName`. | |
12 | // | |
13 | // If `ctx` already has a Span, it is re-used and the operation name is | |
14 | // overwritten. If `ctx` does not yet have a Span, one is created here. | |
15 | func TraceServer(tracer opentracing.Tracer, operationName string) endpoint.Middleware { | |
16 | return func(next endpoint.Endpoint) endpoint.Endpoint { | |
17 | return func(ctx context.Context, request interface{}) (interface{}, error) { | |
18 | serverSpan := opentracing.SpanFromContext(ctx) | |
19 | if serverSpan == nil { | |
20 | // All we can do is create a new root span. | |
21 | serverSpan = tracer.StartSpan(operationName) | |
22 | } else { | |
23 | serverSpan.SetOperationName(operationName) | |
24 | } | |
25 | defer serverSpan.Finish() | |
26 | otext.SpanKind.Set(serverSpan, otext.SpanKindRPCServer) | |
27 | ctx = opentracing.ContextWithSpan(ctx, serverSpan) | |
28 | return next(ctx, request) | |
29 | } | |
30 | } | |
31 | } | |
32 | ||
33 | // TraceClient returns a Middleware that wraps the `next` Endpoint in an | |
34 | // OpenTracing Span called `operationName`. | |
35 | func TraceClient(tracer opentracing.Tracer, operationName string) endpoint.Middleware { | |
36 | return func(next endpoint.Endpoint) endpoint.Endpoint { | |
37 | return func(ctx context.Context, request interface{}) (interface{}, error) { | |
38 | parentSpan := opentracing.SpanFromContext(ctx) | |
39 | clientSpan := tracer.StartSpanWithOptions(opentracing.StartSpanOptions{ | |
40 | OperationName: operationName, | |
41 | Parent: parentSpan, // may be nil | |
42 | }) | |
43 | defer clientSpan.Finish() | |
44 | otext.SpanKind.Set(clientSpan, otext.SpanKindRPCClient) | |
45 | ctx = opentracing.ContextWithSpan(ctx, clientSpan) | |
46 | return next(ctx, request) | |
47 | } | |
48 | } | |
49 | } |
0 | package opentracing_test | |
1 | ||
2 | import ( | |
3 | "testing" | |
4 | ||
5 | "github.com/opentracing/opentracing-go" | |
6 | "github.com/opentracing/opentracing-go/mocktracer" | |
7 | "golang.org/x/net/context" | |
8 | ||
9 | "github.com/go-kit/kit/endpoint" | |
10 | kitot "github.com/go-kit/kit/tracing/opentracing" | |
11 | ) | |
12 | ||
13 | func TestTraceServer(t *testing.T) { | |
14 | tracer := mocktracer.New() | |
15 | ||
16 | // Initialize the ctx with a nameless Span. | |
17 | contextSpan := tracer.StartSpan("").(*mocktracer.MockSpan) | |
18 | ctx := opentracing.ContextWithSpan(context.Background(), contextSpan) | |
19 | ||
20 | var innerEndpoint endpoint.Endpoint | |
21 | innerEndpoint = func(context.Context, interface{}) (interface{}, error) { | |
22 | return struct{}{}, nil | |
23 | } | |
24 | tracedEndpoint := kitot.TraceServer(tracer, "testOp")(innerEndpoint) | |
25 | if _, err := tracedEndpoint(ctx, struct{}{}); err != nil { | |
26 | t.Fatal(err) | |
27 | } | |
28 | if want, have := 1, len(tracer.FinishedSpans); want != have { | |
29 | t.Fatalf("Want %v span(s), found %v", want, have) | |
30 | } | |
31 | ||
32 | endpointSpan := tracer.FinishedSpans[0] | |
33 | // Test that the op name is updated | |
34 | if want, have := "testOp", endpointSpan.OperationName; want != have { | |
35 | t.Fatalf("Want %q, have %q", want, have) | |
36 | } | |
37 | // ... and that the ID is unmodified. | |
38 | if want, have := contextSpan.SpanID, endpointSpan.SpanID; want != have { | |
39 | t.Errorf("Want SpanID %q, have %q", want, have) | |
40 | } | |
41 | } | |
42 | ||
43 | func TestTraceServerNoContextSpan(t *testing.T) { | |
44 | tracer := mocktracer.New() | |
45 | ||
46 | var innerEndpoint endpoint.Endpoint | |
47 | innerEndpoint = func(context.Context, interface{}) (interface{}, error) { | |
48 | return struct{}{}, nil | |
49 | } | |
50 | tracedEndpoint := kitot.TraceServer(tracer, "testOp")(innerEndpoint) | |
51 | // Empty/background context: | |
52 | if _, err := tracedEndpoint(context.Background(), struct{}{}); err != nil { | |
53 | t.Fatal(err) | |
54 | } | |
55 | // tracedEndpoint created a new Span: | |
56 | if want, have := 1, len(tracer.FinishedSpans); want != have { | |
57 | t.Fatalf("Want %v span(s), found %v", want, have) | |
58 | } | |
59 | ||
60 | endpointSpan := tracer.FinishedSpans[0] | |
61 | if want, have := "testOp", endpointSpan.OperationName; want != have { | |
62 | t.Fatalf("Want %q, have %q", want, have) | |
63 | } | |
64 | } | |
65 | ||
66 | func TestTraceClient(t *testing.T) { | |
67 | tracer := mocktracer.New() | |
68 | ||
69 | // Initialize the ctx with a parent Span. | |
70 | parentSpan := tracer.StartSpan("parent").(*mocktracer.MockSpan) | |
71 | defer parentSpan.Finish() | |
72 | ctx := opentracing.ContextWithSpan(context.Background(), parentSpan) | |
73 | ||
74 | var innerEndpoint endpoint.Endpoint | |
75 | innerEndpoint = func(context.Context, interface{}) (interface{}, error) { | |
76 | return struct{}{}, nil | |
77 | } | |
78 | tracedEndpoint := kitot.TraceClient(tracer, "testOp")(innerEndpoint) | |
79 | if _, err := tracedEndpoint(ctx, struct{}{}); err != nil { | |
80 | t.Fatal(err) | |
81 | } | |
82 | // tracedEndpoint created a new Span: | |
83 | if want, have := 1, len(tracer.FinishedSpans); want != have { | |
84 | t.Fatalf("Want %v span(s), found %v", want, have) | |
85 | } | |
86 | ||
87 | endpointSpan := tracer.FinishedSpans[0] | |
88 | if want, have := "testOp", endpointSpan.OperationName; want != have { | |
89 | t.Fatalf("Want %q, have %q", want, have) | |
90 | } | |
91 | // ... and that the parent ID is set appropriately. | |
92 | if want, have := parentSpan.SpanID, endpointSpan.ParentID; want != have { | |
93 | t.Errorf("Want ParentID %q, have %q", want, have) | |
94 | } | |
95 | } |
0 | package opentracing | |
1 | ||
2 | import ( | |
3 | "github.com/opentracing/opentracing-go" | |
4 | "golang.org/x/net/context" | |
5 | "google.golang.org/grpc/metadata" | |
6 | ||
7 | "github.com/go-kit/kit/log" | |
8 | ) | |
9 | ||
10 | // ToGRPCRequest returns a grpc RequestFunc that injects an OpenTracing Span | |
11 | // found in `ctx` into the grpc Metadata. If no such Span can be found, the | |
12 | // RequestFunc is a noop. | |
13 | // | |
14 | // The logger is used to report errors and may be nil. | |
15 | func ToGRPCRequest(tracer opentracing.Tracer, logger log.Logger) func(ctx context.Context, md *metadata.MD) context.Context { | |
16 | return func(ctx context.Context, md *metadata.MD) context.Context { | |
17 | if span := opentracing.SpanFromContext(ctx); span != nil { | |
18 | // There's nothing we can do with an error here. | |
19 | err := tracer.Inject(span, opentracing.TextMap, metadataReaderWriter{md}) | |
20 | if err != nil && logger != nil { | |
21 | logger.Log("msg", "Inject failed", "err", err) | |
22 | } | |
23 | } | |
24 | return ctx | |
25 | } | |
26 | } | |
27 | ||
28 | // FromGRPCRequest returns a grpc RequestFunc that tries to join with an | |
29 | // OpenTracing trace found in `req` and starts a new Span called | |
30 | // `operationName` accordingly. If no trace could be found in `req`, the Span | |
31 | // will be a trace root. The Span is incorporated in the returned Context and | |
32 | // can be retrieved with opentracing.SpanFromContext(ctx). | |
33 | // | |
34 | // The logger is used to report errors and may be nil. | |
35 | func FromGRPCRequest(tracer opentracing.Tracer, operationName string, logger log.Logger) func(ctx context.Context, md *metadata.MD) context.Context { | |
36 | return func(ctx context.Context, md *metadata.MD) context.Context { | |
37 | span, err := tracer.Join(operationName, opentracing.TextMap, metadataReaderWriter{md}) | |
38 | if err != nil && logger != nil { | |
39 | logger.Log("msg", "Join failed", "err", err) | |
40 | } | |
41 | if span == nil { | |
42 | span = tracer.StartSpan(operationName) | |
43 | } | |
44 | return opentracing.ContextWithSpan(ctx, span) | |
45 | } | |
46 | } | |
47 | ||
48 | // A type that conforms to opentracing.TextMapReader and | |
49 | // opentracing.TextMapWriter. | |
50 | type metadataReaderWriter struct { | |
51 | *metadata.MD | |
52 | } | |
53 | ||
54 | func (w metadataReaderWriter) Set(key, val string) { | |
55 | (*w.MD)[key] = append((*w.MD)[key], val) | |
56 | } | |
57 | ||
58 | func (w metadataReaderWriter) ForeachKey(handler func(key, val string) error) error { | |
59 | for k, vals := range *w.MD { | |
60 | for _, v := range vals { | |
61 | if err := handler(k, v); err != nil { | |
62 | return err | |
63 | } | |
64 | } | |
65 | } | |
66 | return nil | |
67 | } |
0 | package opentracing_test | |
1 | ||
2 | import ( | |
3 | "testing" | |
4 | ||
5 | "github.com/opentracing/opentracing-go" | |
6 | "github.com/opentracing/opentracing-go/mocktracer" | |
7 | "golang.org/x/net/context" | |
8 | "google.golang.org/grpc/metadata" | |
9 | ||
10 | kitot "github.com/go-kit/kit/tracing/opentracing" | |
11 | "github.com/go-kit/kit/transport/grpc" | |
12 | ) | |
13 | ||
14 | func TestTraceGRPCRequestRoundtrip(t *testing.T) { | |
15 | tracer := mocktracer.New() | |
16 | ||
17 | // Initialize the ctx with a Span to inject. | |
18 | beforeSpan := tracer.StartSpan("to_inject").(*mocktracer.MockSpan) | |
19 | defer beforeSpan.Finish() | |
20 | beforeSpan.SetBaggageItem("baggage", "check") | |
21 | beforeCtx := opentracing.ContextWithSpan(context.Background(), beforeSpan) | |
22 | ||
23 | var toGRPCFunc grpc.RequestFunc = kitot.ToGRPCRequest(tracer, nil) | |
24 | md := metadata.Pairs() | |
25 | // Call the RequestFunc. | |
26 | afterCtx := toGRPCFunc(beforeCtx, &md) | |
27 | ||
28 | // The Span should not have changed. | |
29 | afterSpan := opentracing.SpanFromContext(afterCtx) | |
30 | if beforeSpan != afterSpan { | |
31 | t.Errorf("Should not swap in a new span") | |
32 | } | |
33 | ||
34 | // No spans should have finished yet. | |
35 | if want, have := 0, len(tracer.FinishedSpans); want != have { | |
36 | t.Errorf("Want %v span(s), found %v", want, have) | |
37 | } | |
38 | ||
39 | // Use FromGRPCRequest to verify that we can join with the trace given MD. | |
40 | var fromGRPCFunc grpc.RequestFunc = kitot.FromGRPCRequest(tracer, "joined", nil) | |
41 | joinCtx := fromGRPCFunc(afterCtx, &md) | |
42 | joinedSpan := opentracing.SpanFromContext(joinCtx).(*mocktracer.MockSpan) | |
43 | ||
44 | if joinedSpan.SpanID == beforeSpan.SpanID { | |
45 | t.Error("SpanID should have changed", joinedSpan.SpanID, beforeSpan.SpanID) | |
46 | } | |
47 | ||
48 | // Check that the parent/child relationship is as expected for the joined span. | |
49 | if want, have := beforeSpan.SpanID, joinedSpan.ParentID; want != have { | |
50 | t.Errorf("Want ParentID %q, have %q", want, have) | |
51 | } | |
52 | if want, have := "joined", joinedSpan.OperationName; want != have { | |
53 | t.Errorf("Want %q, have %q", want, have) | |
54 | } | |
55 | if want, have := "check", joinedSpan.BaggageItem("baggage"); want != have { | |
56 | t.Errorf("Want %q, have %q", want, have) | |
57 | } | |
58 | } |
0 | package opentracing | |
1 | ||
2 | import ( | |
3 | "net" | |
4 | "net/http" | |
5 | "strconv" | |
6 | ||
7 | "github.com/opentracing/opentracing-go" | |
8 | "github.com/opentracing/opentracing-go/ext" | |
9 | "golang.org/x/net/context" | |
10 | ||
11 | "github.com/go-kit/kit/log" | |
12 | kithttp "github.com/go-kit/kit/transport/http" | |
13 | ) | |
14 | ||
15 | // ToHTTPRequest returns an http RequestFunc that injects an OpenTracing Span | |
16 | // found in `ctx` into the http headers. If no such Span can be found, the | |
17 | // RequestFunc is a noop. | |
18 | // | |
19 | // The logger is used to report errors and may be nil. | |
20 | func ToHTTPRequest(tracer opentracing.Tracer, logger log.Logger) kithttp.RequestFunc { | |
21 | return func(ctx context.Context, req *http.Request) context.Context { | |
22 | // Try to find a Span in the Context. | |
23 | if span := opentracing.SpanFromContext(ctx); span != nil { | |
24 | // Add standard OpenTracing tags. | |
25 | ext.HTTPMethod.Set(span, req.URL.RequestURI()) | |
26 | host, portString, err := net.SplitHostPort(req.URL.Host) | |
27 | if err == nil { | |
28 | ext.PeerHostname.Set(span, host) | |
29 | if port, err := strconv.Atoi(portString); err != nil { | |
30 | ext.PeerPort.Set(span, uint16(port)) | |
31 | } | |
32 | } else { | |
33 | ext.PeerHostname.Set(span, req.URL.Host) | |
34 | } | |
35 | ||
36 | // There's nothing we can do with any errors here. | |
37 | err = tracer.Inject( | |
38 | span, | |
39 | opentracing.TextMap, | |
40 | opentracing.HTTPHeaderTextMapCarrier(req.Header), | |
41 | ) | |
42 | if err != nil && logger != nil { | |
43 | logger.Log("msg", "Join failed", "err", err) | |
44 | } | |
45 | } | |
46 | return ctx | |
47 | } | |
48 | } | |
49 | ||
50 | // FromHTTPRequest returns an http RequestFunc that tries to join with an | |
51 | // OpenTracing trace found in `req` and starts a new Span called | |
52 | // `operationName` accordingly. If no trace could be found in `req`, the Span | |
53 | // will be a trace root. The Span is incorporated in the returned Context and | |
54 | // can be retrieved with opentracing.SpanFromContext(ctx). | |
55 | // | |
56 | // The logger is used to report errors and may be nil. | |
57 | func FromHTTPRequest(tracer opentracing.Tracer, operationName string, logger log.Logger) kithttp.RequestFunc { | |
58 | return func(ctx context.Context, req *http.Request) context.Context { | |
59 | // Try to join to a trace propagated in `req`. There's nothing we can | |
60 | // do with any errors here, so we ignore them. | |
61 | span, err := tracer.Join( | |
62 | operationName, | |
63 | opentracing.TextMap, | |
64 | opentracing.HTTPHeaderTextMapCarrier(req.Header), | |
65 | ) | |
66 | if err != nil && logger != nil { | |
67 | logger.Log("msg", "Join failed", "err", err) | |
68 | } | |
69 | if span == nil { | |
70 | span = opentracing.StartSpan(operationName) | |
71 | } | |
72 | return opentracing.ContextWithSpan(ctx, span) | |
73 | } | |
74 | } |
0 | package opentracing_test | |
1 | ||
2 | import ( | |
3 | "net/http" | |
4 | "testing" | |
5 | ||
6 | "github.com/opentracing/opentracing-go" | |
7 | "github.com/opentracing/opentracing-go/mocktracer" | |
8 | "golang.org/x/net/context" | |
9 | ||
10 | kitot "github.com/go-kit/kit/tracing/opentracing" | |
11 | kithttp "github.com/go-kit/kit/transport/http" | |
12 | ) | |
13 | ||
14 | func TestTraceHTTPRequestRoundtrip(t *testing.T) { | |
15 | tracer := mocktracer.New() | |
16 | ||
17 | // Initialize the ctx with a Span to inject. | |
18 | beforeSpan := tracer.StartSpan("to_inject").(*mocktracer.MockSpan) | |
19 | defer beforeSpan.Finish() | |
20 | beforeSpan.SetBaggageItem("baggage", "check") | |
21 | beforeCtx := opentracing.ContextWithSpan(context.Background(), beforeSpan) | |
22 | ||
23 | var toHTTPFunc kithttp.RequestFunc = kitot.ToHTTPRequest(tracer, nil) | |
24 | req, _ := http.NewRequest("GET", "http://test.biz/url", nil) | |
25 | // Call the RequestFunc. | |
26 | afterCtx := toHTTPFunc(beforeCtx, req) | |
27 | ||
28 | // The Span should not have changed. | |
29 | afterSpan := opentracing.SpanFromContext(afterCtx) | |
30 | if beforeSpan != afterSpan { | |
31 | t.Errorf("Should not swap in a new span") | |
32 | } | |
33 | ||
34 | // No spans should have finished yet. | |
35 | if want, have := 0, len(tracer.FinishedSpans); want != have { | |
36 | t.Errorf("Want %v span(s), found %v", want, have) | |
37 | } | |
38 | ||
39 | // Use FromHTTPRequest to verify that we can join with the trace given a req. | |
40 | var fromHTTPFunc kithttp.RequestFunc = kitot.FromHTTPRequest(tracer, "joined", nil) | |
41 | joinCtx := fromHTTPFunc(afterCtx, req) | |
42 | joinedSpan := opentracing.SpanFromContext(joinCtx).(*mocktracer.MockSpan) | |
43 | ||
44 | if joinedSpan.SpanID == beforeSpan.SpanID { | |
45 | t.Error("SpanID should have changed", joinedSpan.SpanID, beforeSpan.SpanID) | |
46 | } | |
47 | ||
48 | // Check that the parent/child relationship is as expected for the joined span. | |
49 | if want, have := beforeSpan.SpanID, joinedSpan.ParentID; want != have { | |
50 | t.Errorf("Want ParentID %q, have %q", want, have) | |
51 | } | |
52 | if want, have := "joined", joinedSpan.OperationName; want != have { | |
53 | t.Errorf("Want %q, have %q", want, have) | |
54 | } | |
55 | if want, have := "check", joinedSpan.BaggageItem("baggage"); want != have { | |
56 | t.Errorf("Want %q, have %q", want, have) | |
57 | } | |
58 | } |