Codebase list golang-github-go-kit-kit / 876cd87
Zipkin tracing support (Native v2) (#671) * Added new Zipkin tracing middleware using the native zipkin-go library * new approach to Zipkin tracing middleware for Go kit * improvements to zipkin tracer docs and code * source comments changes * Adds http test. * Adds endpoint test. * Improves code based on idiomatic feedback. * updates to tracing doc comments * do not allow to activate both zipkin-go and zipkin-go-opentracing to avoid confusion * add test for zipkin http server tracer * add test for zipkin grpc server and client tracers Bas van Beek authored 6 years ago Peter Bourgon committed 6 years ago
23 changed file(s) with 1229 addition(s) and 227 deletion(s). Raw diff Collapse all Expand all
1313 "github.com/apache/thrift/lib/go/thrift"
1414 lightstep "github.com/lightstep/lightstep-tracer-go"
1515 stdopentracing "github.com/opentracing/opentracing-go"
16 zipkin "github.com/openzipkin/zipkin-go-opentracing"
16 zipkin "github.com/openzipkin/zipkin-go"
17 zipkinot "github.com/openzipkin/zipkin-go-opentracing"
18 zipkinhttp "github.com/openzipkin/zipkin-go/reporter/http"
1719 "sourcegraph.com/sourcegraph/appdash"
1820 appdashot "sourcegraph.com/sourcegraph/appdash/opentracing"
1921
4042 thriftProtocol = fs.String("thrift-protocol", "binary", "binary, compact, json, simplejson")
4143 thriftBuffer = fs.Int("thrift-buffer", 0, "0 for unbuffered")
4244 thriftFramed = fs.Bool("thrift-framed", false, "true to enable framing")
43 zipkinURL = fs.String("zipkin-url", "", "Enable Zipkin tracing via a collector URL e.g. http://localhost:9411/api/v1/spans")
44 lightstepToken = flag.String("lightstep-token", "", "Enable LightStep tracing via a LightStep access token")
45 appdashAddr = flag.String("appdash-addr", "", "Enable Appdash tracing via an Appdash server host:port")
45 zipkinV2URL = fs.String("zipkin-url", "", "Enable Zipkin v2 tracing (zipkin-go) via HTTP Reporter URL e.g. http://localhost:94111/api/v2/spans")
46 zipkinV1URL = fs.String("zipkin-v1-url", "", "Enable Zipkin v1 tracing (zipkin-go-opentracing) via a collector URL e.g. http://localhost:9411/api/v1/spans")
47 lightstepToken = fs.String("lightstep-token", "", "Enable LightStep tracing via a LightStep access token")
48 appdashAddr = fs.String("appdash-addr", "", "Enable Appdash tracing via an Appdash server host:port")
4649 method = fs.String("method", "sum", "sum, concat")
4750 )
4851 fs.Usage = usageFor(fs, os.Args[0]+" [flags] <a> <b>")
5457
5558 // This is a demonstration client, which supports multiple tracers.
5659 // Your clients will probably just use one tracer.
57 var tracer stdopentracing.Tracer
60 var otTracer stdopentracing.Tracer
5861 {
59 if *zipkinURL != "" {
60 collector, err := zipkin.NewHTTPCollector(*zipkinURL)
62 if *zipkinV1URL != "" && *zipkinV2URL == "" {
63 collector, err := zipkinot.NewHTTPCollector(*zipkinV1URL)
6164 if err != nil {
6265 fmt.Fprintln(os.Stderr, err.Error())
6366 os.Exit(1)
6568 defer collector.Close()
6669 var (
6770 debug = false
68 hostPort = "localhost:80"
69 serviceName = "addsvc"
71 hostPort = "localhost:0"
72 serviceName = "addsvc-cli"
7073 )
71 recorder := zipkin.NewRecorder(collector, debug, hostPort, serviceName)
72 tracer, err = zipkin.NewTracer(recorder)
74 recorder := zipkinot.NewRecorder(collector, debug, hostPort, serviceName)
75 otTracer, err = zipkinot.NewTracer(recorder)
7376 if err != nil {
7477 fmt.Fprintln(os.Stderr, err.Error())
7578 os.Exit(1)
7679 }
7780 } else if *lightstepToken != "" {
78 tracer = lightstep.NewTracer(lightstep.Options{
81 otTracer = lightstep.NewTracer(lightstep.Options{
7982 AccessToken: *lightstepToken,
8083 })
81 defer lightstep.FlushLightStepTracer(tracer)
84 defer lightstep.FlushLightStepTracer(otTracer)
8285 } else if *appdashAddr != "" {
83 tracer = appdashot.NewTracer(appdash.NewRemoteCollector(*appdashAddr))
86 otTracer = appdashot.NewTracer(appdash.NewRemoteCollector(*appdashAddr))
8487 } else {
85 tracer = stdopentracing.GlobalTracer() // no-op
88 otTracer = stdopentracing.GlobalTracer() // no-op
89 }
90 }
91
92 // This is a demonstration of the native Zipkin tracing client. If using
93 // Zipkin this is the more idiomatic client over OpenTracing.
94 var zipkinTracer *zipkin.Tracer
95 {
96 var (
97 err error
98 hostPort = "" // if host:port is unknown we can keep this empty
99 serviceName = "addsvc-cli"
100 useNoopTracer = (*zipkinV2URL == "")
101 reporter = zipkinhttp.NewReporter(*zipkinV2URL)
102 )
103 defer reporter.Close()
104 zEP, _ := zipkin.NewEndpoint(serviceName, hostPort)
105 zipkinTracer, err = zipkin.NewTracer(
106 reporter, zipkin.WithLocalEndpoint(zEP), zipkin.WithNoopTracer(useNoopTracer),
107 )
108 if err != nil {
109 fmt.Fprintf(os.Stderr, "unable to create zipkin tracer: %s\n", err.Error())
110 os.Exit(1)
86111 }
87112 }
88113
93118 err error
94119 )
95120 if *httpAddr != "" {
96 svc, err = addtransport.NewHTTPClient(*httpAddr, tracer, log.NewNopLogger())
121 svc, err = addtransport.NewHTTPClient(*httpAddr, otTracer, zipkinTracer, log.NewNopLogger())
97122 } else if *grpcAddr != "" {
98123 conn, err := grpc.Dial(*grpcAddr, grpc.WithInsecure(), grpc.WithTimeout(time.Second))
99124 if err != nil {
101126 os.Exit(1)
102127 }
103128 defer conn.Close()
104 svc = addtransport.NewGRPCClient(conn, tracer, log.NewNopLogger())
129 svc = addtransport.NewGRPCClient(conn, otTracer, zipkinTracer, log.NewNopLogger())
105130 } else if *jsonRPCAddr != "" {
106 svc, err = addtransport.NewJSONRPCClient(*jsonRPCAddr, tracer, log.NewNopLogger())
131 svc, err = addtransport.NewJSONRPCClient(*jsonRPCAddr, otTracer, log.NewNopLogger())
107132 } else if *thriftAddr != "" {
108133 // It's necessary to do all of this construction in the func main,
109134 // because (among other reasons) we need to control the lifecycle of the
1313 lightstep "github.com/lightstep/lightstep-tracer-go"
1414 "github.com/oklog/oklog/pkg/group"
1515 stdopentracing "github.com/opentracing/opentracing-go"
16 zipkin "github.com/openzipkin/zipkin-go-opentracing"
16 zipkin "github.com/openzipkin/zipkin-go"
17 zipkinot "github.com/openzipkin/zipkin-go-opentracing"
18 zipkinhttp "github.com/openzipkin/zipkin-go/reporter/http"
1719 stdprometheus "github.com/prometheus/client_golang/prometheus"
1820 "github.com/prometheus/client_golang/prometheus/promhttp"
1921 "google.golang.org/grpc"
2325 "github.com/go-kit/kit/log"
2426 "github.com/go-kit/kit/metrics"
2527 "github.com/go-kit/kit/metrics/prometheus"
28 kitgrpc "github.com/go-kit/kit/transport/grpc"
2629
2730 addpb "github.com/go-kit/kit/examples/addsvc/pb"
2831 "github.com/go-kit/kit/examples/addsvc/pkg/addendpoint"
4548 thriftProtocol = fs.String("thrift-protocol", "binary", "binary, compact, json, simplejson")
4649 thriftBuffer = fs.Int("thrift-buffer", 0, "0 for unbuffered")
4750 thriftFramed = fs.Bool("thrift-framed", false, "true to enable framing")
48 zipkinURL = fs.String("zipkin-url", "", "Enable Zipkin tracing via a collector URL e.g. http://localhost:9411/api/v1/spans")
49 lightstepToken = flag.String("lightstep-token", "", "Enable LightStep tracing via a LightStep access token")
50 appdashAddr = flag.String("appdash-addr", "", "Enable Appdash tracing via an Appdash server host:port")
51 zipkinV2URL = fs.String("zipkin-url", "", "Enable Zipkin v2 tracing (zipkin-go) using a Reporter URL e.g. http://localhost:9411/api/v2/spans")
52 zipkinV1URL = fs.String("zipkin-v1-url", "", "Enable Zipkin v1 tracing (zipkin-go-opentracing) using a collector URL e.g. http://localhost:9411/api/v1/spans")
53 lightstepToken = fs.String("lightstep-token", "", "Enable LightStep tracing via a LightStep access token")
54 appdashAddr = fs.String("appdash-addr", "", "Enable Appdash tracing via an Appdash server host:port")
5155 )
5256 fs.Usage = usageFor(fs, os.Args[0]+" [flags]")
5357 fs.Parse(os.Args[1:])
6064 logger = log.With(logger, "caller", log.DefaultCaller)
6165 }
6266
63 // Determine which tracer to use. We'll pass the tracer to all the
67 // Determine which OpenTracing tracer to use. We'll pass the tracer to all the
6468 // components that use it, as a dependency.
6569 var tracer stdopentracing.Tracer
6670 {
67 if *zipkinURL != "" {
68 logger.Log("tracer", "Zipkin", "URL", *zipkinURL)
69 collector, err := zipkin.NewHTTPCollector(*zipkinURL)
71 if *zipkinV1URL != "" && *zipkinV2URL == "" {
72 logger.Log("tracer", "Zipkin", "type", "OpenTracing", "URL", *zipkinV1URL)
73 collector, err := zipkinot.NewHTTPCollector(*zipkinV1URL)
7074 if err != nil {
7175 logger.Log("err", err)
7276 os.Exit(1)
7781 hostPort = "localhost:80"
7882 serviceName = "addsvc"
7983 )
80 recorder := zipkin.NewRecorder(collector, debug, hostPort, serviceName)
81 tracer, err = zipkin.NewTracer(recorder)
84 recorder := zipkinot.NewRecorder(collector, debug, hostPort, serviceName)
85 tracer, err = zipkinot.NewTracer(recorder)
8286 if err != nil {
8387 logger.Log("err", err)
8488 os.Exit(1)
9397 logger.Log("tracer", "Appdash", "addr", *appdashAddr)
9498 tracer = appdashot.NewTracer(appdash.NewRemoteCollector(*appdashAddr))
9599 } else {
96 logger.Log("tracer", "none")
97100 tracer = stdopentracing.GlobalTracer() // no-op
101 }
102 }
103
104 var zipkinTracer *zipkin.Tracer
105 {
106 var (
107 err error
108 hostPort = "localhost:80"
109 serviceName = "addsvc"
110 useNoopTracer = (*zipkinV2URL == "")
111 reporter = zipkinhttp.NewReporter(*zipkinV2URL)
112 )
113 defer reporter.Close()
114 zEP, _ := zipkin.NewEndpoint(serviceName, hostPort)
115 zipkinTracer, err = zipkin.NewTracer(
116 reporter, zipkin.WithLocalEndpoint(zEP), zipkin.WithNoopTracer(useNoopTracer),
117 )
118 if err != nil {
119 logger.Log("err", err)
120 os.Exit(1)
121 }
122 if !useNoopTracer {
123 logger.Log("tracer", "Zipkin", "type", "Native", "URL", *zipkinV2URL)
98124 }
99125 }
100126
136162 // them to ports or anything yet; we'll do that next.
137163 var (
138164 service = addservice.New(logger, ints, chars)
139 endpoints = addendpoint.New(service, logger, duration, tracer)
140 httpHandler = addtransport.NewHTTPHandler(endpoints, tracer, logger)
141 grpcServer = addtransport.NewGRPCServer(endpoints, tracer, logger)
165 endpoints = addendpoint.New(service, logger, duration, tracer, zipkinTracer)
166 httpHandler = addtransport.NewHTTPHandler(endpoints, tracer, zipkinTracer, logger)
167 grpcServer = addtransport.NewGRPCServer(endpoints, tracer, zipkinTracer, logger)
142168 thriftServer = addtransport.NewThriftServer(endpoints)
143169 jsonrpcHandler = addtransport.NewJSONRPCHandler(endpoints, logger)
144170 )
195221 }
196222 g.Add(func() error {
197223 logger.Log("transport", "gRPC", "addr", *grpcAddr)
198 baseServer := grpc.NewServer()
224 // we add the Go Kit gRPC Interceptor to our gRPC service as it is used by
225 // the here demonstrated zipkin tracing middleware.
226 baseServer := grpc.NewServer(grpc.UnaryInterceptor(kitgrpc.Interceptor))
199227 addpb.RegisterAddServer(baseServer, grpcServer)
200228 return baseServer.Serve(grpcListener)
201229 }, func(error) {
77 "testing"
88
99 "github.com/opentracing/opentracing-go"
10 zipkin "github.com/openzipkin/zipkin-go"
1011
1112 "github.com/go-kit/kit/log"
1213 "github.com/go-kit/kit/metrics/discard"
1718 )
1819
1920 func TestHTTP(t *testing.T) {
21 zkt, _ := zipkin.NewTracer(nil, zipkin.WithNoopTracer(true))
2022 svc := addservice.New(log.NewNopLogger(), discard.NewCounter(), discard.NewCounter())
21 eps := addendpoint.New(svc, log.NewNopLogger(), discard.NewHistogram(), opentracing.GlobalTracer())
22 mux := addtransport.NewHTTPHandler(eps, opentracing.GlobalTracer(), log.NewNopLogger())
23 eps := addendpoint.New(svc, log.NewNopLogger(), discard.NewHistogram(), opentracing.GlobalTracer(), zkt)
24 mux := addtransport.NewHTTPHandler(eps, opentracing.GlobalTracer(), zkt, log.NewNopLogger())
2325 srv := httptest.NewServer(mux)
2426 defer srv.Close()
2527
66 "golang.org/x/time/rate"
77
88 stdopentracing "github.com/opentracing/opentracing-go"
9 stdzipkin "github.com/openzipkin/zipkin-go"
910 "github.com/sony/gobreaker"
1011
1112 "github.com/go-kit/kit/circuitbreaker"
1415 "github.com/go-kit/kit/metrics"
1516 "github.com/go-kit/kit/ratelimit"
1617 "github.com/go-kit/kit/tracing/opentracing"
18 "github.com/go-kit/kit/tracing/zipkin"
1719
1820 "github.com/go-kit/kit/examples/addsvc/pkg/addservice"
1921 )
2830
2931 // New returns a Set that wraps the provided server, and wires in all of the
3032 // expected endpoint middlewares via the various parameters.
31 func New(svc addservice.Service, logger log.Logger, duration metrics.Histogram, trace stdopentracing.Tracer) Set {
33 func New(svc addservice.Service, logger log.Logger, duration metrics.Histogram, otTracer stdopentracing.Tracer, zipkinTracer *stdzipkin.Tracer) Set {
3234 var sumEndpoint endpoint.Endpoint
3335 {
3436 sumEndpoint = MakeSumEndpoint(svc)
3537 sumEndpoint = ratelimit.NewErroringLimiter(rate.NewLimiter(rate.Every(time.Second), 1))(sumEndpoint)
3638 sumEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{}))(sumEndpoint)
37 sumEndpoint = opentracing.TraceServer(trace, "Sum")(sumEndpoint)
39 sumEndpoint = opentracing.TraceServer(otTracer, "Sum")(sumEndpoint)
40 sumEndpoint = zipkin.TraceEndpoint(zipkinTracer, "Sum")(sumEndpoint)
3841 sumEndpoint = LoggingMiddleware(log.With(logger, "method", "Sum"))(sumEndpoint)
3942 sumEndpoint = InstrumentingMiddleware(duration.With("method", "Sum"))(sumEndpoint)
4043 }
4346 concatEndpoint = MakeConcatEndpoint(svc)
4447 concatEndpoint = ratelimit.NewErroringLimiter(rate.NewLimiter(rate.Every(time.Second), 100))(concatEndpoint)
4548 concatEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{}))(concatEndpoint)
46 concatEndpoint = opentracing.TraceServer(trace, "Concat")(concatEndpoint)
49 concatEndpoint = opentracing.TraceServer(otTracer, "Concat")(concatEndpoint)
50 concatEndpoint = zipkin.TraceEndpoint(zipkinTracer, "Concat")(concatEndpoint)
4751 concatEndpoint = LoggingMiddleware(log.With(logger, "method", "Concat"))(concatEndpoint)
4852 concatEndpoint = InstrumentingMiddleware(duration.With("method", "Concat"))(concatEndpoint)
4953 }
77 "google.golang.org/grpc"
88
99 stdopentracing "github.com/opentracing/opentracing-go"
10 stdzipkin "github.com/openzipkin/zipkin-go"
1011 "github.com/sony/gobreaker"
1112 oldcontext "golang.org/x/net/context"
1213 "golang.org/x/time/rate"
1617 "github.com/go-kit/kit/log"
1718 "github.com/go-kit/kit/ratelimit"
1819 "github.com/go-kit/kit/tracing/opentracing"
20 "github.com/go-kit/kit/tracing/zipkin"
1921 grpctransport "github.com/go-kit/kit/transport/grpc"
2022
2123 "github.com/go-kit/kit/examples/addsvc/pb"
2931 }
3032
3133 // NewGRPCServer makes a set of endpoints available as a gRPC AddServer.
32 func NewGRPCServer(endpoints addendpoint.Set, tracer stdopentracing.Tracer, logger log.Logger) pb.AddServer {
34 func NewGRPCServer(endpoints addendpoint.Set, otTracer stdopentracing.Tracer, zipkinTracer *stdzipkin.Tracer, logger log.Logger) pb.AddServer {
35 // Zipkin GRPC Server Trace can either be instantiated per gRPC method with a
36 // provided operation name or a global tracing service can be instantiated
37 // without an operation name and fed to each Go kit gRPC server as a
38 // ServerOption.
39 // In the latter case, the operation name will be the endpoint's grpc method
40 // path if used in combination with the Go kit gRPC Interceptor.
41 //
42 // In this example, we demonstrate a global Zipkin tracing service with
43 // Go kit gRPC Interceptor.
44 zipkinServer := zipkin.GRPCServerTrace(zipkinTracer)
45
3346 options := []grpctransport.ServerOption{
3447 grpctransport.ServerErrorLogger(logger),
35 }
48 zipkinServer,
49 }
50
3651 return &grpcServer{
3752 sum: grpctransport.NewServer(
3853 endpoints.SumEndpoint,
3954 decodeGRPCSumRequest,
4055 encodeGRPCSumResponse,
41 append(options, grpctransport.ServerBefore(opentracing.GRPCToContext(tracer, "Sum", logger)))...,
56 append(options, grpctransport.ServerBefore(opentracing.GRPCToContext(otTracer, "Sum", logger)))...,
4257 ),
4358 concat: grpctransport.NewServer(
4459 endpoints.ConcatEndpoint,
4560 decodeGRPCConcatRequest,
4661 encodeGRPCConcatResponse,
47 append(options, grpctransport.ServerBefore(opentracing.GRPCToContext(tracer, "Concat", logger)))...,
62 append(options, grpctransport.ServerBefore(opentracing.GRPCToContext(otTracer, "Concat", logger)))...,
4863 ),
4964 }
5065 }
6984 // of the conn. The caller is responsible for constructing the conn, and
7085 // eventually closing the underlying transport. We bake-in certain middlewares,
7186 // implementing the client library pattern.
72 func NewGRPCClient(conn *grpc.ClientConn, tracer stdopentracing.Tracer, logger log.Logger) addservice.Service {
87 func NewGRPCClient(conn *grpc.ClientConn, otTracer stdopentracing.Tracer, zipkinTracer *stdzipkin.Tracer, logger log.Logger) addservice.Service {
7388 // We construct a single ratelimiter middleware, to limit the total outgoing
7489 // QPS from this client to all methods on the remote instance. We also
7590 // construct per-endpoint circuitbreaker middlewares to demonstrate how
7691 // that's done, although they could easily be combined into a single breaker
7792 // for the entire remote instance, too.
7893 limiter := ratelimit.NewErroringLimiter(rate.NewLimiter(rate.Every(time.Second), 100))
94
95 // Zipkin GRPC Client Trace can either be instantiated per gRPC method with a
96 // provided operation name or a global tracing client can be instantiated
97 // without an operation name and fed to each Go kit client as ClientOption.
98 // In the latter case, the operation name will be the endpoint's grpc method
99 // path.
100 //
101 // In this example, we demonstrace a global tracing client.
102 zipkinClient := zipkin.GRPCClientTrace(zipkinTracer)
103
104 // global client middlewares
105 options := []grpctransport.ClientOption{
106 zipkinClient,
107 }
79108
80109 // Each individual endpoint is an http/transport.Client (which implements
81110 // endpoint.Endpoint) that gets wrapped with various middlewares. If you
90119 encodeGRPCSumRequest,
91120 decodeGRPCSumResponse,
92121 pb.SumReply{},
93 grpctransport.ClientBefore(opentracing.ContextToGRPC(tracer, logger)),
122 append(options, grpctransport.ClientBefore(opentracing.ContextToGRPC(otTracer, logger)))...,
94123 ).Endpoint()
95 sumEndpoint = opentracing.TraceClient(tracer, "Sum")(sumEndpoint)
124 sumEndpoint = opentracing.TraceClient(otTracer, "Sum")(sumEndpoint)
96125 sumEndpoint = limiter(sumEndpoint)
97126 sumEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{
98127 Name: "Sum",
111140 encodeGRPCConcatRequest,
112141 decodeGRPCConcatResponse,
113142 pb.ConcatReply{},
114 grpctransport.ClientBefore(opentracing.ContextToGRPC(tracer, logger)),
143 append(options, grpctransport.ClientBefore(opentracing.ContextToGRPC(otTracer, logger)))...,
115144 ).Endpoint()
116 concatEndpoint = opentracing.TraceClient(tracer, "Concat")(concatEndpoint)
145 concatEndpoint = opentracing.TraceClient(otTracer, "Concat")(concatEndpoint)
117146 concatEndpoint = limiter(concatEndpoint)
118147 concatEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{
119148 Name: "Concat",
1313 "golang.org/x/time/rate"
1414
1515 stdopentracing "github.com/opentracing/opentracing-go"
16 stdzipkin "github.com/openzipkin/zipkin-go"
1617 "github.com/sony/gobreaker"
1718
1819 "github.com/go-kit/kit/circuitbreaker"
2021 "github.com/go-kit/kit/log"
2122 "github.com/go-kit/kit/ratelimit"
2223 "github.com/go-kit/kit/tracing/opentracing"
24 "github.com/go-kit/kit/tracing/zipkin"
2325 httptransport "github.com/go-kit/kit/transport/http"
2426
2527 "github.com/go-kit/kit/examples/addsvc/pkg/addendpoint"
2830
2931 // NewHTTPHandler returns an HTTP handler that makes a set of endpoints
3032 // available on predefined paths.
31 func NewHTTPHandler(endpoints addendpoint.Set, tracer stdopentracing.Tracer, logger log.Logger) http.Handler {
33 func NewHTTPHandler(endpoints addendpoint.Set, otTracer stdopentracing.Tracer, zipkinTracer *stdzipkin.Tracer, logger log.Logger) http.Handler {
34 // Zipkin HTTP Server Trace can either be instantiated per endpoint with a
35 // provided operation name or a global tracing service can be instantiated
36 // without an operation name and fed to each Go kit endpoint as ServerOption.
37 // In the latter case, the operation name will be the endpoint's http method.
38 // We demonstrate a global tracing service here.
39 zipkinServer := zipkin.HTTPServerTrace(zipkinTracer)
40
3241 options := []httptransport.ServerOption{
3342 httptransport.ServerErrorEncoder(errorEncoder),
3443 httptransport.ServerErrorLogger(logger),
35 }
44 zipkinServer,
45 }
46
3647 m := http.NewServeMux()
3748 m.Handle("/sum", httptransport.NewServer(
3849 endpoints.SumEndpoint,
3950 decodeHTTPSumRequest,
4051 encodeHTTPGenericResponse,
41 append(options, httptransport.ServerBefore(opentracing.HTTPToContext(tracer, "Sum", logger)))...,
52 append(options, httptransport.ServerBefore(opentracing.HTTPToContext(otTracer, "Sum", logger)))...,
4253 ))
4354 m.Handle("/concat", httptransport.NewServer(
4455 endpoints.ConcatEndpoint,
4556 decodeHTTPConcatRequest,
4657 encodeHTTPGenericResponse,
47 append(options, httptransport.ServerBefore(opentracing.HTTPToContext(tracer, "Concat", logger)))...,
58 append(options, httptransport.ServerBefore(opentracing.HTTPToContext(otTracer, "Concat", logger)))...,
4859 ))
4960 return m
5061 }
5364 // remote instance. We expect instance to come from a service discovery system,
5465 // so likely of the form "host:port". We bake-in certain middlewares,
5566 // implementing the client library pattern.
56 func NewHTTPClient(instance string, tracer stdopentracing.Tracer, logger log.Logger) (addservice.Service, error) {
67 func NewHTTPClient(instance string, otTracer stdopentracing.Tracer, zipkinTracer *stdzipkin.Tracer, logger log.Logger) (addservice.Service, error) {
5768 // Quickly sanitize the instance string.
5869 if !strings.HasPrefix(instance, "http") {
5970 instance = "http://" + instance
6980 // that's done, although they could easily be combined into a single breaker
7081 // for the entire remote instance, too.
7182 limiter := ratelimit.NewErroringLimiter(rate.NewLimiter(rate.Every(time.Second), 100))
83
84 // Zipkin HTTP Client Trace can either be instantiated per endpoint with a
85 // provided operation name or a global tracing client can be instantiated
86 // without an operation name and fed to each Go kit endpoint as ClientOption.
87 // In the latter case, the operation name will be the endpoint's http method.
88 zipkinClient := zipkin.HTTPClientTrace(zipkinTracer)
89
90 // global client middlewares
91 options := []httptransport.ClientOption{
92 zipkinClient,
93 }
7294
7395 // Each individual endpoint is an http/transport.Client (which implements
7496 // endpoint.Endpoint) that gets wrapped with various middlewares. If you
81103 copyURL(u, "/sum"),
82104 encodeHTTPGenericRequest,
83105 decodeHTTPSumResponse,
84 httptransport.ClientBefore(opentracing.ContextToHTTP(tracer, logger)),
106 append(options, httptransport.ClientBefore(opentracing.ContextToHTTP(otTracer, logger)))...,
85107 ).Endpoint()
86 sumEndpoint = opentracing.TraceClient(tracer, "Sum")(sumEndpoint)
108 sumEndpoint = opentracing.TraceClient(otTracer, "Sum")(sumEndpoint)
109 sumEndpoint = zipkin.TraceEndpoint(zipkinTracer, "Sum")(sumEndpoint)
87110 sumEndpoint = limiter(sumEndpoint)
88111 sumEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{
89112 Name: "Sum",
100123 copyURL(u, "/concat"),
101124 encodeHTTPGenericRequest,
102125 decodeHTTPConcatResponse,
103 httptransport.ClientBefore(opentracing.ContextToHTTP(tracer, logger)),
126 append(options, httptransport.ClientBefore(opentracing.ContextToHTTP(otTracer, logger)))...,
104127 ).Endpoint()
105 concatEndpoint = opentracing.TraceClient(tracer, "Concat")(concatEndpoint)
128 concatEndpoint = opentracing.TraceClient(otTracer, "Concat")(concatEndpoint)
129 concatEndpoint = zipkin.TraceEndpoint(zipkinTracer, "Concat")(concatEndpoint)
106130 concatEndpoint = limiter(concatEndpoint)
107131 concatEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{
108132 Name: "Concat",
1919 "github.com/gorilla/mux"
2020 "github.com/hashicorp/consul/api"
2121 stdopentracing "github.com/opentracing/opentracing-go"
22 stdzipkin "github.com/openzipkin/zipkin-go"
2223 "google.golang.org/grpc"
2324
2425 "github.com/go-kit/kit/endpoint"
6667
6768 // Transport domain.
6869 tracer := stdopentracing.GlobalTracer() // no-op
70 zipkinTracer, _ := stdzipkin.NewTracer(nil, stdzipkin.WithNoopTracer(true))
6971 ctx := context.Background()
7072 r := mux.NewRouter()
7173
8789 instancer = consulsd.NewInstancer(client, logger, "addsvc", tags, passingOnly)
8890 )
8991 {
90 factory := addsvcFactory(addendpoint.MakeSumEndpoint, tracer, logger)
92 factory := addsvcFactory(addendpoint.MakeSumEndpoint, tracer, zipkinTracer, logger)
9193 endpointer := sd.NewEndpointer(instancer, factory, logger)
9294 balancer := lb.NewRoundRobin(endpointer)
9395 retry := lb.Retry(*retryMax, *retryTimeout, balancer)
9496 endpoints.SumEndpoint = retry
9597 }
9698 {
97 factory := addsvcFactory(addendpoint.MakeConcatEndpoint, tracer, logger)
99 factory := addsvcFactory(addendpoint.MakeConcatEndpoint, tracer, zipkinTracer, logger)
98100 endpointer := sd.NewEndpointer(instancer, factory, logger)
99101 balancer := lb.NewRoundRobin(endpointer)
100102 retry := lb.Retry(*retryMax, *retryTimeout, balancer)
105107 // HTTP handler, and just install it under a particular path prefix in
106108 // our router.
107109
108 r.PathPrefix("/addsvc").Handler(http.StripPrefix("/addsvc", addtransport.NewHTTPHandler(endpoints, tracer, logger)))
110 r.PathPrefix("/addsvc").Handler(http.StripPrefix("/addsvc", addtransport.NewHTTPHandler(endpoints, tracer, zipkinTracer, logger)))
109111 }
110112
111113 // stringsvc routes.
164166 logger.Log("exit", <-errc)
165167 }
166168
167 func addsvcFactory(makeEndpoint func(addservice.Service) endpoint.Endpoint, tracer stdopentracing.Tracer, logger log.Logger) sd.Factory {
169 func addsvcFactory(makeEndpoint func(addservice.Service) endpoint.Endpoint, tracer stdopentracing.Tracer, zipkinTracer *stdzipkin.Tracer, logger log.Logger) sd.Factory {
168170 return func(instance string) (endpoint.Endpoint, io.Closer, error) {
169171 // We could just as easily use the HTTP or Thrift client package to make
170172 // the connection to addsvc. We've chosen gRPC arbitrarily. Note that
175177 if err != nil {
176178 return nil, nil, err
177179 }
178 service := addtransport.NewGRPCClient(conn, tracer, logger)
180 service := addtransport.NewGRPCClient(conn, tracer, zipkinTracer, logger)
179181 endpoint := makeEndpoint(service)
180182
181183 // Notice that the addsvc gRPC client converts the connection to a
00 # package tracing
11
2 `package tracing` provides [Dapper][]-style request tracing to services.
2 `package tracing` provides [Dapper]-style request tracing to services.
33
44 ## Rationale
55
99 benefit from request tracing; sufficiently large infrastructures will require
1010 it.
1111
12 ## Zipkin
13
14 [Zipkin] is one of the most used OSS distributed tracing platforms available
15 with support for many different languages and frameworks. Go kit provides
16 bindings to the native Go tracing implementation [zipkin-go]. If using Zipkin
17 with Go kit in a polyglot microservices environment, this is the preferred
18 binding to use. Instrumentation exists for `kit/transport/http` and
19 `kit/transport/grpc`. The bindings are highlighted in the [addsvc] example. For
20 more information regarding Zipkin feel free to visit [Zipkin's Gitter].
21
1222 ## OpenTracing
1323
14 Go kit builds on top of the [OpenTracing] API and uses the [opentracing-go]
15 package to provide tracing middlewares for its servers and clients. Currently
16 `kit/transport/http` and `kit/transport/grpc` transports are supported.
24 Go kit supports the [OpenTracing] API and uses the [opentracing-go] package to
25 provide tracing middlewares for its servers and clients. Currently OpenTracing
26 instrumentation exists for `kit/transport/http` and `kit/transport/grpc`.
1727
18 Since [OpenTracing] is an upcoming standard API, Go kit should support a
19 multitude of tracing backends. If a Tracer implementation in Go for your
20 back-end exists, it should work out of the box. The following tracing back-ends
21 are known to work with Go kit through the OpenTracing interface and are
22 highlighted in the [addsvc] example.
28 Since [OpenTracing] is an effort to provide a generic API, Go kit should support
29 a multitude of tracing backends. If a Tracer implementation or OpenTracing
30 bridge in Go for your back-end exists, it should work out of the box.
2331
32 Please note that the "world view" of existing tracing systems do differ.
33 OpenTracing can not guarantee you that tracing alignment is perfect in a
34 microservice environment especially one which is not exclusively OpenTracing
35 enabled or switching from one tracing backend to another truly entails just a
36 change in configuration.
37
38 The following tracing back-ends are known to work with Go kit through the
39 OpenTracing interface and are highlighted in the [addsvc] example.
40
41 ### AppDash
42
43 [Appdash] support is available straight from their system repository in the
44 [appdash/opentracing] directory.
2445
2546 ### LightStep
2647
2748 [LightStep] support is available through their standard Go package
2849 [lightstep-tracer-go].
2950
30 ### AppDash
31
32 [Appdash] support is available straight from their system repository in the
33 [appdash/opentracing] directory.
34
3551 ### Zipkin
3652
37 [Zipkin] support is now available from the [zipkin-go-opentracing] package which
38 can be found at the [Open Zipkin GitHub] page. This means our old custom
39 `tracing/zipkin` package is now deprecated. In the `kit/tracing/zipkin`
40 directory you can still find the `docker-compose` script to bootstrap a Zipkin
41 development environment and a [README] detailing how to transition from the
42 old package to the new.
53 [Zipkin] support is available through the [zipkin-go-opentracing] package.
4354
4455 [Dapper]: http://research.google.com/pubs/pub36356.html
4556 [addsvc]:https://github.com/go-kit/kit/tree/master/examples/addsvc
5162 [Zipkin]: http://zipkin.io/
5263 [Open Zipkin GitHub]: https://github.com/openzipkin
5364 [zipkin-go-opentracing]: https://github.com/openzipkin/zipkin-go-opentracing
65 [zipkin-go]: https://github.com/openzipkin/zipkin-go
66 [Zipkin's Gitter]: https://gitter.im/openzipkin/zipkin
5467
5568 [Appdash]: https://github.com/sourcegraph/appdash
5669 [appdash/opentracing]: https://github.com/sourcegraph/appdash/tree/master/opentracing
22 // As your infrastructure grows, it becomes important to be able to trace a
33 // request, as it travels through multiple services and back to the user.
44 // Package tracing provides endpoints and transport helpers and middlewares to
5 // capture and emit request-scoped information. We use the excellent OpenTracing
6 // project to bind to concrete tracing systems.
5 // capture and emit request-scoped information.
76 package tracing
44 Great efforts have been made to make [Zipkin] easier to test, develop and
55 experiment against. [Zipkin] can now be run from a single Docker container or by
66 running its self-contained executable jar without extensive configuration. In
7 its default configuration you will run Zipkin with a HTTP collector, In memory
7 its default configuration you will run [Zipkin] with a HTTP collector, In memory
88 Span storage backend and web UI on port 9411.
99
1010 Example:
1414
1515 [zipkin]: http://zipkin.io
1616
17 Instrumenting your services with Zipkin distributed tracing using the default
18 configuration is now possible with the latest release of [zipkin-go-opentracing]
19 as it includes an HTTP transport for sending spans to the [Zipkin] HTTP
20 Collector.
21
2217 ## Middleware Usage
2318
24 Follow the [addsvc] example to check out how to wire the Zipkin Middleware. The
25 changes should be relatively minor.
19 Follow the [addsvc] example to check out how to wire the [Zipkin] Middleware.
20 The changes should be relatively minor.
2621
27 The [zipkin-go-opentracing] package has support for HTTP, Kafka and Scribe
28 collectors as well as using Go Kit's [Log] package for logging.
22 The [zipkin-go] package has Reporters to send Spans to the [Zipkin] HTTP and
23 Kafka Collectors.
2924
30 ### Configuring for the Zipkin HTTP Collector
25 ### Configuring the Zipkin HTTP Reporter
3126
32 To select the transport for the HTTP Collector, you configure the `Recorder`
33 with the appropriate collector like this:
27 To use the HTTP Reporter with a [Zipkin] instance running on localhost you
28 bootstrap [zipkin-go] like this:
3429
3530 ```go
3631 var (
37 debugMode = false
3832 serviceName = "MyService"
3933 serviceHostPort = "localhost:8000"
40 zipkinHTTPEndpoint = "localhost:9411"
34 zipkinHTTPEndpoint = "http://localhost:9411/api/v2/spans"
4135 )
42 collector, err = zipkin.NewHTTPCollector(zipkinHTTPEndpoint)
43 if err != nil {
44 // handle error
45 }
46 tracer, err = zipkin.NewTracer(
47 zipkin.NewRecorder(collector, debugMode, serviceHostPort, serviceName),
36
37 // create an instance of the HTTP Reporter.
38 reporter := zipkin.NewReporter(zipkinHTTPEndpoint)
39
40 // create our tracer's local endpoint (how the service is identified in Zipkin).
41 localEndpoint, err := zipkin.NewEndpoint(serviceName, serviceHostPort)
42
43 // create our tracer instance.
44 tracer, err = zipkin.NewTracer(reporter, zipkin.WithLocalEndpoint(localEndpoint))
4845 ...
49 )
46
5047 ```
5148
52 ### Span per Node vs. Span per RPC
53 By default Zipkin V1 considers either side of an RPC to have the same identity
54 and differs in that respect from many other tracing systems which consider the
55 caller to be the parent and the receiver to be the child. The OpenTracing
56 specification does not dictate one model over the other, but the Zipkin team is
57 looking into these [single-host-spans] to potentially bring Zipkin more in-line
58 with the other tracing systems.
59
60 [single-host-spans]: https://github.com/openzipkin/zipkin/issues/963
61
62 In case of a `span per node` the receiver will create a child span from the
63 propagated parent span like this:
64
65 ```
66 Span per Node propagation and identities
67
68 CALLER: RECEIVER:
69 ---------------------------------
70 traceId -> traceId
71 spanId (new)
72 spanId -> parentSpanId
73 parentSpanId
74 ```
75
76 **Note:** most tracing implementations supporting the `span per node` model
77 therefore do not propagate their `parentSpanID` as its not needed.
78
79 A typical Zipkin implementation will use the `span per RPC` model and recreate
80 the span identity from the caller on the receiver's end and then annotates its
81 values on top of it. Propagation will happen like this:
82
83 ```
84 Span per RPC propagation and identities
85
86 CALLER: RECEIVER:
87 ---------------------------------
88 traceId -> traceId
89 spanId -> spanId
90 parentSpanId -> parentSpanId
91 ```
92
93 The [zipkin-go-opentracing] implementation allows you to choose which model you
94 wish to use. Make sure you select the same model consistently for all your
95 services that are required to communicate with each other or you will have trace
96 propagation issues. If using non OpenTracing / legacy instrumentation, it's
97 probably best to use the `span per RPC call` model.
98
99 To adhere to the more common tracing philosophy of `span per node`, the Tracer
100 defaults to `span per node`. To set the `span per RPC call` mode start your
101 tracer like this:
102
103 ```go
104 tracer, err = zipkin.NewTracer(
105 zipkin.NewRecorder(...),
106 zipkin.ClientServerSameSpan(true),
107 )
108 ```
109
110 [zipkin-go-opentracing]: https://github.com/openzipkin/zipkin-go-opentracing
49 [zipkin-go]: https://github.com/openzipkin/zipkin-go
11150 [addsvc]:https://github.com/go-kit/kit/tree/master/examples/addsvc
11251 [Log]: https://github.com/go-kit/kit/tree/master/log
11352
11453 ### Tracing Resources
11554
116 In our legacy implementation we had the `NewChildSpan` method to allow
117 annotation of resources such as databases, caches and other services that do not
118 have server side tracing support. Since OpenTracing has no specific method of
119 dealing with these items explicitely that is compatible with Zipkin's `SA`
120 annotation, the [zipkin-go-opentracing] has implemented support using the
121 OpenTracing Tags system. Here is an example of how one would be able to record
122 a resource span compatible with standard OpenTracing and triggering an `SA`
123 annotation in [zipkin-go-opentracing]:
124
55 Here is an example of how you could trace resources and work with local spans.
12556 ```go
126 // you need to import the ext package for the Tag helper functions
12757 import (
128 "github.com/opentracing/opentracing-go"
129 "github.com/opentracing/opentracing-go/ext"
58 zipkin "github.com/openzipkin/zipkin-go"
13059 )
13160
13261 func (svc *Service) GetMeSomeExamples(ctx context.Context, ...) ([]Examples, error) {
133 // Example of annotating a database query:
134 var (
135 serviceName = "MySQL"
136 serviceHost = "mysql.example.com"
137 servicePort = uint16(3306)
138 queryLabel = "GetExamplesByParam"
139 query = "select * from example where param = 'value'"
140 )
62 // Example of annotating a database query:
63 var (
64 spanContext model.SpanContext
65 serviceName = "MySQL"
66 serviceHost = "mysql.example.com:3306"
67 queryLabel = "GetExamplesByParam"
68 query = "select * from example where param = :value"
69 )
14170
142 // retrieve the parent span, if not found create a new trace
143 parentSpan := opentracing.SpanFromContext(ctx)
144 if parentSpan == nil {
145 parentSpan = opentracing.StartSpan(queryLabel)
146 defer parentSpan.Finish()
147 }
71 // retrieve the parent span from context to use as parent if available.
72 if parentSpan := zipkin.SpanFromContext(ctx); parentSpan != nil {
73 spanContext = parentSpan.Context()
74 }
14875
149 // create a new span to record the resource interaction
150 span := opentracing.StartChildSpan(parentSpan, queryLabel)
76 // create the remote Zipkin endpoint
77 ep, _ := zipkin.NewEndpoint(serviceName, serviceHost)
15178
152 // span.kind "resource" triggers SA annotation
153 ext.SpanKind.Set(span, "resource")
79 // create a new span to record the resource interaction
80 span := zipkin.StartSpan(
81 queryLabel,
82 zipkin.Parent(parentSpan.Context()),
83 zipkin.WithRemoteEndpoint(ep),
84 )
15485
155 // this will label the span's service & hostPort (called Endpoint in Zipkin)
156 ext.PeerService.Set(span, serviceName)
157 ext.PeerHostname.Set(span, serviceHost)
158 ext.PeerPort.Set(span, servicePort)
159
160 // a Tag is the equivalent of a Zipkin Binary Annotation (key:value pair)
86 // add interesting key/value pair to our span
16187 span.SetTag("query", query)
16288
163 // a LogEvent is the equivalent of a Zipkin Annotation (timestamped)
164 span.LogEvent("query:start")
89 // add interesting timed event to our span
90 span.Annotate(time.Now(), "query:start")
16591
16692 // do the actual query...
16793
16894 // let's annotate the end...
169 span.LogEvent("query:end")
95 span.Annotate(time.Now(), "query:end")
17096
17197 // we're done with this span.
17298 span.Finish()
0 // Package zipkin provides Go kit integration to the OpenZipkin project through
1 // the use of zipkin-go, the official OpenZipkin tracer implementation for Go.
2 // OpenZipkin is the most used open source distributed tracing ecosystem with
3 // many different libraries and interoperability options.
4 package zipkin
0 package zipkin
1
2 import (
3 "context"
4
5 "github.com/openzipkin/zipkin-go"
6 "github.com/openzipkin/zipkin-go/model"
7
8 "github.com/go-kit/kit/endpoint"
9 )
10
11 // TraceEndpoint returns an Endpoint middleware, tracing a Go kit endpoint.
12 // This endpoint tracer should be used in combination with a Go kit Transport
13 // tracing middleware or custom before and after transport functions as
14 // propagation of SpanContext is not provided in this middleware.
15 func TraceEndpoint(tracer *zipkin.Tracer, name string) endpoint.Middleware {
16 return func(next endpoint.Endpoint) endpoint.Endpoint {
17 return func(ctx context.Context, request interface{}) (interface{}, error) {
18 var sc model.SpanContext
19 if parentSpan := zipkin.SpanFromContext(ctx); parentSpan != nil {
20 sc = parentSpan.Context()
21 }
22 sp := tracer.StartSpan(name, zipkin.Parent(sc))
23 defer sp.Finish()
24
25 ctx = zipkin.NewContext(ctx, sp)
26 return next(ctx, request)
27 }
28 }
29 }
0 package zipkin_test
1
2 import (
3 "context"
4 "testing"
5
6 "github.com/openzipkin/zipkin-go"
7 "github.com/openzipkin/zipkin-go/reporter/recorder"
8
9 "github.com/go-kit/kit/endpoint"
10 zipkinkit "github.com/go-kit/kit/tracing/zipkin"
11 )
12
13 const spanName = "test"
14
15 func TestTraceEndpoint(t *testing.T) {
16 rec := recorder.NewReporter()
17 tr, _ := zipkin.NewTracer(rec)
18 mw := zipkinkit.TraceEndpoint(tr, spanName)
19 mw(endpoint.Nop)(context.Background(), nil)
20
21 spans := rec.Flush()
22
23 if want, have := 1, len(spans); want != have {
24 t.Fatalf("incorrect number of spans, wanted %d, got %d", want, have)
25 }
26
27 if want, have := spanName, spans[0].Name; want != have {
28 t.Fatalf("incorrect span name, wanted %s, got %s", want, have)
29 }
30 }
0 package zipkin
1
2 import (
3 "context"
4 "strconv"
5
6 zipkin "github.com/openzipkin/zipkin-go"
7 "github.com/openzipkin/zipkin-go/model"
8 "github.com/openzipkin/zipkin-go/propagation/b3"
9 "google.golang.org/grpc/metadata"
10 "google.golang.org/grpc/status"
11
12 "github.com/go-kit/kit/log"
13 kitgrpc "github.com/go-kit/kit/transport/grpc"
14 )
15
16 // GRPCClientTrace enables native Zipkin tracing of a Go kit gRPC transport
17 // Client.
18 //
19 // Go kit creates gRPC transport clients per remote endpoint. This middleware
20 // can be set-up individually by adding the endpoint name for each of the Go kit
21 // transport clients using the Name() TracerOption.
22 // If wanting to use the gRPC FullMethod (/service/method) as Span name you can
23 // create a global client tracer omitting the Name() TracerOption, which you can
24 // then feed to each Go kit gRPC transport client.
25 // If instrumenting a client to an external (not on your platform) service, you
26 // will probably want to disallow propagation of SpanContext using the
27 // AllowPropagation TracerOption and setting it to false.
28 func GRPCClientTrace(tracer *zipkin.Tracer, options ...TracerOption) kitgrpc.ClientOption {
29 config := tracerOptions{
30 tags: make(map[string]string),
31 name: "",
32 logger: log.NewNopLogger(),
33 propagate: true,
34 }
35
36 for _, option := range options {
37 option(&config)
38 }
39
40 clientBefore := kitgrpc.ClientBefore(
41 func(ctx context.Context, md *metadata.MD) context.Context {
42 var (
43 spanContext model.SpanContext
44 name string
45 )
46
47 if config.name != "" {
48 name = config.name
49 } else {
50 name = ctx.Value(kitgrpc.ContextKeyRequestMethod).(string)
51 }
52
53 if parent := zipkin.SpanFromContext(ctx); parent != nil {
54 spanContext = parent.Context()
55 }
56
57 span := tracer.StartSpan(
58 name,
59 zipkin.Kind(model.Client),
60 zipkin.Tags(config.tags),
61 zipkin.Parent(spanContext),
62 zipkin.FlushOnFinish(false),
63 )
64
65 if config.propagate {
66 if err := b3.InjectGRPC(md)(span.Context()); err != nil {
67 config.logger.Log("err", err)
68 }
69 }
70
71 return zipkin.NewContext(ctx, span)
72 },
73 )
74
75 clientAfter := kitgrpc.ClientAfter(
76 func(ctx context.Context, _ metadata.MD, _ metadata.MD) context.Context {
77 if span := zipkin.SpanFromContext(ctx); span != nil {
78 span.Finish()
79 }
80
81 return ctx
82 },
83 )
84
85 clientFinalizer := kitgrpc.ClientFinalizer(
86 func(ctx context.Context, err error) {
87 if span := zipkin.SpanFromContext(ctx); span != nil {
88 if err != nil {
89 zipkin.TagError.Set(span, err.Error())
90 }
91 // calling span.Finish() a second time is a noop, if we didn't get to
92 // ClientAfter we can at least time the early bail out by calling it
93 // here.
94 span.Finish()
95 // send span to the Reporter
96 span.Flush()
97 }
98 },
99 )
100
101 return func(c *kitgrpc.Client) {
102 clientBefore(c)
103 clientAfter(c)
104 clientFinalizer(c)
105 }
106
107 }
108
109 // GRPCServerTrace enables native Zipkin tracing of a Go kit gRPC transport
110 // Server.
111 //
112 // Go kit creates gRPC transport servers per gRPC method. This middleware can be
113 // set-up individually by adding the method name for each of the Go kit method
114 // servers using the Name() TracerOption.
115 // If wanting to use the gRPC FullMethod (/service/method) as Span name you can
116 // create a global server tracer omitting the Name() TracerOption, which you can
117 // then feed to each Go kit method server. For this to work you will need to
118 // wire the Go kit gRPC Interceptor too.
119 // If instrumenting a service to external (not on your platform) clients, you
120 // will probably want to disallow propagation of a client SpanContext using
121 // the AllowPropagation TracerOption and setting it to false.
122 func GRPCServerTrace(tracer *zipkin.Tracer, options ...TracerOption) kitgrpc.ServerOption {
123 config := tracerOptions{
124 tags: make(map[string]string),
125 name: "",
126 logger: log.NewNopLogger(),
127 propagate: true,
128 }
129
130 for _, option := range options {
131 option(&config)
132 }
133
134 serverBefore := kitgrpc.ServerBefore(
135 func(ctx context.Context, md metadata.MD) context.Context {
136 var (
137 spanContext model.SpanContext
138 name string
139 tags = make(map[string]string)
140 )
141
142 rpcMethod, ok := ctx.Value(kitgrpc.ContextKeyRequestMethod).(string)
143 if !ok {
144 config.logger.Log("unable to retrieve method name: missing gRPC interceptor hook")
145 } else {
146 tags["grpc.method"] = rpcMethod
147 }
148
149 if config.name != "" {
150 name = config.name
151 } else {
152 name = rpcMethod
153 }
154
155 if config.propagate {
156 spanContext = tracer.Extract(b3.ExtractGRPC(&md))
157 if spanContext.Err != nil {
158 config.logger.Log("err", spanContext.Err)
159 }
160 }
161
162 span := tracer.StartSpan(
163 name,
164 zipkin.Kind(model.Server),
165 zipkin.Tags(config.tags),
166 zipkin.Tags(tags),
167 zipkin.Parent(spanContext),
168 zipkin.FlushOnFinish(false),
169 )
170
171 return zipkin.NewContext(ctx, span)
172 },
173 )
174
175 serverAfter := kitgrpc.ServerAfter(
176 func(ctx context.Context, _ *metadata.MD, _ *metadata.MD) context.Context {
177 if span := zipkin.SpanFromContext(ctx); span != nil {
178 span.Finish()
179 }
180
181 return ctx
182 },
183 )
184
185 serverFinalizer := kitgrpc.ServerFinalizer(
186 func(ctx context.Context, err error) {
187 if span := zipkin.SpanFromContext(ctx); span != nil {
188 if err != nil {
189 if status, ok := status.FromError(err); ok {
190 statusCode := strconv.FormatUint(uint64(status.Code()), 10)
191 zipkin.TagGRPCStatusCode.Set(span, statusCode)
192 zipkin.TagError.Set(span, status.Message())
193 } else {
194 zipkin.TagError.Set(span, err.Error())
195 }
196 }
197
198 // calling span.Finish() a second time is a noop, if we didn't get to
199 // ServerAfter we can at least time the early bail out by calling it
200 // here.
201 span.Finish()
202 // send span to the Reporter
203 span.Flush()
204 }
205 },
206 )
207
208 return func(s *kitgrpc.Server) {
209 serverBefore(s)
210 serverAfter(s)
211 serverFinalizer(s)
212 }
213 }
0 package zipkin_test
1
2 import (
3 "context"
4 "testing"
5
6 zipkin "github.com/openzipkin/zipkin-go"
7 "github.com/openzipkin/zipkin-go/propagation/b3"
8 "github.com/openzipkin/zipkin-go/reporter/recorder"
9 "google.golang.org/grpc"
10 "google.golang.org/grpc/metadata"
11
12 "github.com/go-kit/kit/endpoint"
13 kitzipkin "github.com/go-kit/kit/tracing/zipkin"
14 grpctransport "github.com/go-kit/kit/transport/grpc"
15 )
16
17 type dummy struct{}
18
19 func unaryInterceptor(
20 ctx context.Context, method string, req, reply interface{},
21 cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption,
22 ) error {
23 return nil
24 }
25
26 func TestGRPCClientTrace(t *testing.T) {
27 rec := recorder.NewReporter()
28 defer rec.Close()
29
30 tr, _ := zipkin.NewTracer(rec)
31
32 clientTracer := kitzipkin.GRPCClientTrace(tr)
33
34 cc, err := grpc.Dial(
35 "",
36 grpc.WithUnaryInterceptor(unaryInterceptor),
37 grpc.WithInsecure(),
38 )
39 if err != nil {
40 t.Fatalf("unable to create gRPC dialer: %s", err.Error())
41 }
42
43 ep := grpctransport.NewClient(
44 cc,
45 "dummyService",
46 "dummyMethod",
47 func(context.Context, interface{}) (interface{}, error) { return nil, nil },
48 func(context.Context, interface{}) (interface{}, error) { return nil, nil },
49 dummy{},
50 clientTracer,
51 ).Endpoint()
52
53 parentSpan := tr.StartSpan("test")
54 ctx := zipkin.NewContext(context.Background(), parentSpan)
55
56 if _, err = ep(ctx, nil); err != nil {
57 t.Errorf("unexpected error: %s", err.Error())
58 }
59
60 spans := rec.Flush()
61 if want, have := 1, len(spans); want != have {
62 t.Fatalf("incorrect number of spans, want %d, have %d", want, have)
63 }
64
65 if spans[0].SpanContext.ParentID == nil {
66 t.Fatalf("incorrect parent ID, want %s have nil", parentSpan.Context().ID)
67 }
68
69 if want, have := parentSpan.Context().ID, *spans[0].SpanContext.ParentID; want != have {
70 t.Fatalf("incorrect parent ID, want %s, have %s", want, have)
71 }
72 }
73
74 func TestGRPCServerTrace(t *testing.T) {
75 rec := recorder.NewReporter()
76 defer rec.Close()
77
78 tr, _ := zipkin.NewTracer(rec)
79
80 serverTracer := kitzipkin.GRPCServerTrace(tr)
81
82 server := grpctransport.NewServer(
83 endpoint.Nop,
84 func(context.Context, interface{}) (interface{}, error) { return nil, nil },
85 func(context.Context, interface{}) (interface{}, error) { return nil, nil },
86 serverTracer,
87 )
88
89 md := metadata.MD{}
90 parentSpan := tr.StartSpan("test")
91
92 b3.InjectGRPC(&md)(parentSpan.Context())
93
94 ctx := metadata.NewIncomingContext(context.Background(), md)
95 server.ServeGRPC(ctx, nil)
96
97 spans := rec.Flush()
98
99 if want, have := 1, len(spans); want != have {
100 t.Fatalf("incorrect number of spans, want %d, have %d", want, have)
101 }
102
103 if want, have := parentSpan.Context().TraceID, spans[0].SpanContext.TraceID; want != have {
104 t.Errorf("incorrect TraceID, want %+v, have %+v", want, have)
105 }
106
107 if want, have := parentSpan.Context().ID, spans[0].SpanContext.ID; want != have {
108 t.Errorf("incorrect span ID, want %d, have %d", want, have)
109 }
110 }
0 package zipkin
1
2 import (
3 "context"
4 "net/http"
5 "strconv"
6
7 zipkin "github.com/openzipkin/zipkin-go"
8 "github.com/openzipkin/zipkin-go/model"
9 "github.com/openzipkin/zipkin-go/propagation/b3"
10
11 "github.com/go-kit/kit/log"
12 kithttp "github.com/go-kit/kit/transport/http"
13 )
14
15 // HTTPClientTrace enables native Zipkin tracing of a Go kit HTTP transport
16 // Client.
17 //
18 // Go kit creates HTTP transport clients per remote endpoint. This middleware
19 // can be set-up individually by adding the endpoint name for each of the Go kit
20 // transport clients using the Name() TracerOption.
21 // If wanting to use the HTTP Method (Get, Post, Put, etc.) as Span name you can
22 // create a global client tracer omitting the Name() TracerOption, which you can
23 // then feed to each Go kit transport client.
24 // If instrumenting a client to an external (not on your platform) service, you
25 // will probably want to disallow propagation of SpanContext using the
26 // AllowPropagation TracerOption and setting it to false.
27 func HTTPClientTrace(tracer *zipkin.Tracer, options ...TracerOption) kithttp.ClientOption {
28 config := tracerOptions{
29 tags: make(map[string]string),
30 name: "",
31 logger: log.NewNopLogger(),
32 propagate: true,
33 }
34
35 for _, option := range options {
36 option(&config)
37 }
38
39 clientBefore := kithttp.ClientBefore(
40 func(ctx context.Context, req *http.Request) context.Context {
41 var (
42 spanContext model.SpanContext
43 name string
44 )
45
46 if config.name != "" {
47 name = config.name
48 } else {
49 name = req.Method
50 }
51
52 if parent := zipkin.SpanFromContext(ctx); parent != nil {
53 spanContext = parent.Context()
54 }
55
56 tags := map[string]string{
57 string(zipkin.TagHTTPMethod): req.Method,
58 string(zipkin.TagHTTPUrl): req.URL.String(),
59 }
60
61 span := tracer.StartSpan(
62 name,
63 zipkin.Kind(model.Client),
64 zipkin.Tags(config.tags),
65 zipkin.Tags(tags),
66 zipkin.Parent(spanContext),
67 zipkin.FlushOnFinish(false),
68 )
69
70 if config.propagate {
71 if err := b3.InjectHTTP(req)(span.Context()); err != nil {
72 config.logger.Log("err", err)
73 }
74 }
75
76 return zipkin.NewContext(ctx, span)
77 },
78 )
79
80 clientAfter := kithttp.ClientAfter(
81 func(ctx context.Context, res *http.Response) context.Context {
82 if span := zipkin.SpanFromContext(ctx); span != nil {
83 zipkin.TagHTTPResponseSize.Set(span, strconv.FormatInt(res.ContentLength, 10))
84 zipkin.TagHTTPStatusCode.Set(span, strconv.Itoa(res.StatusCode))
85 if res.StatusCode > 399 {
86 zipkin.TagError.Set(span, strconv.Itoa(res.StatusCode))
87 }
88 span.Finish()
89 }
90
91 return ctx
92 },
93 )
94
95 clientFinalizer := kithttp.ClientFinalizer(
96 func(ctx context.Context, err error) {
97 if span := zipkin.SpanFromContext(ctx); span != nil {
98 if err != nil {
99 zipkin.TagError.Set(span, err.Error())
100 }
101 // calling span.Finish() a second time is a noop, if we didn't get to
102 // ClientAfter we can at least time the early bail out by calling it
103 // here.
104 span.Finish()
105 // send span to the Reporter
106 span.Flush()
107 }
108 },
109 )
110
111 return func(c *kithttp.Client) {
112 clientBefore(c)
113 clientAfter(c)
114 clientFinalizer(c)
115 }
116 }
117
118 // HTTPServerTrace enables native Zipkin tracing of a Go kit HTTP transport
119 // Server.
120 //
121 // Go kit creates HTTP transport servers per HTTP endpoint. This middleware can
122 // be set-up individually by adding the method name for each of the Go kit
123 // method servers using the Name() TracerOption.
124 // If wanting to use the HTTP method (Get, Post, Put, etc.) as Span name you can
125 // create a global server tracer omitting the Name() TracerOption, which you can
126 // then feed to each Go kit method server.
127 //
128 // If instrumenting a service to external (not on your platform) clients, you
129 // will probably want to disallow propagation of a client SpanContext using
130 // the AllowPropagation TracerOption and setting it to false.
131 func HTTPServerTrace(tracer *zipkin.Tracer, options ...TracerOption) kithttp.ServerOption {
132 config := tracerOptions{
133 tags: make(map[string]string),
134 name: "",
135 logger: log.NewNopLogger(),
136 propagate: true,
137 }
138
139 for _, option := range options {
140 option(&config)
141 }
142
143 serverBefore := kithttp.ServerBefore(
144 func(ctx context.Context, req *http.Request) context.Context {
145 var (
146 spanContext model.SpanContext
147 name string
148 )
149
150 if config.name != "" {
151 name = config.name
152 } else {
153 name = req.Method
154 }
155
156 if config.propagate {
157 spanContext = tracer.Extract(b3.ExtractHTTP(req))
158 if spanContext.Err != nil {
159 config.logger.Log("err", spanContext.Err)
160 }
161 }
162
163 tags := map[string]string{
164 string(zipkin.TagHTTPMethod): req.Method,
165 string(zipkin.TagHTTPPath): req.URL.Path,
166 }
167
168 span := tracer.StartSpan(
169 name,
170 zipkin.Kind(model.Server),
171 zipkin.Tags(config.tags),
172 zipkin.Tags(tags),
173 zipkin.Parent(spanContext),
174 zipkin.FlushOnFinish(false),
175 )
176
177 return zipkin.NewContext(ctx, span)
178 },
179 )
180
181 serverAfter := kithttp.ServerAfter(
182 func(ctx context.Context, _ http.ResponseWriter) context.Context {
183 if span := zipkin.SpanFromContext(ctx); span != nil {
184 span.Finish()
185 }
186
187 return ctx
188 },
189 )
190
191 serverFinalizer := kithttp.ServerFinalizer(
192 func(ctx context.Context, code int, r *http.Request) {
193 if span := zipkin.SpanFromContext(ctx); span != nil {
194 zipkin.TagHTTPStatusCode.Set(span, strconv.Itoa(code))
195 if rs, ok := ctx.Value(kithttp.ContextKeyResponseSize).(int64); ok {
196 zipkin.TagHTTPResponseSize.Set(span, strconv.FormatInt(rs, 10))
197 }
198
199 // calling span.Finish() a second time is a noop, if we didn't get to
200 // ServerAfter we can at least time the early bail out by calling it
201 // here.
202 span.Finish()
203 // send span to the Reporter
204 span.Flush()
205 }
206 },
207 )
208
209 return func(s *kithttp.Server) {
210 serverBefore(s)
211 serverAfter(s)
212 serverFinalizer(s)
213 }
214 }
0 package zipkin_test
1
2 import (
3 "context"
4 "fmt"
5 "net/http"
6 "net/http/httptest"
7 "net/url"
8 "reflect"
9 "testing"
10
11 "github.com/openzipkin/zipkin-go"
12 "github.com/openzipkin/zipkin-go/model"
13 "github.com/openzipkin/zipkin-go/propagation/b3"
14 "github.com/openzipkin/zipkin-go/reporter/recorder"
15
16 "github.com/go-kit/kit/endpoint"
17 zipkinkit "github.com/go-kit/kit/tracing/zipkin"
18 kithttp "github.com/go-kit/kit/transport/http"
19 )
20
21 const (
22 testName = "test"
23 testBody = "test_body"
24 testTagKey = "test_key"
25 testTagValue = "test_value"
26 )
27
28 func TestHttpClientTracePropagatesParentSpan(t *testing.T) {
29 rec := recorder.NewReporter()
30 defer rec.Close()
31
32 tr, _ := zipkin.NewTracer(rec)
33
34 rURL, _ := url.Parse("http://test.com")
35
36 clientTracer := zipkinkit.HTTPClientTrace(tr)
37 ep := kithttp.NewClient(
38 "GET",
39 rURL,
40 func(ctx context.Context, r *http.Request, i interface{}) error {
41 return nil
42 },
43 func(ctx context.Context, r *http.Response) (response interface{}, err error) {
44 return nil, nil
45 },
46 clientTracer,
47 ).Endpoint()
48
49 parentSpan := tr.StartSpan("test")
50
51 ctx := zipkin.NewContext(context.Background(), parentSpan)
52
53 _, err := ep(ctx, nil)
54 if err != nil {
55 t.Fatalf("unexpected error: %s", err.Error())
56 }
57
58 spans := rec.Flush()
59 if want, have := 1, len(spans); want != have {
60 t.Fatalf("incorrect number of spans, want %d, have %d", want, have)
61 }
62
63 span := spans[0]
64 if span.SpanContext.ParentID == nil {
65 t.Fatalf("incorrect parent ID, want %s have nil", parentSpan.Context().ID)
66 }
67
68 if want, have := parentSpan.Context().ID, *span.SpanContext.ParentID; want != have {
69 t.Fatalf("incorrect parent ID, want %s, have %s", want, have)
70 }
71 }
72
73 func TestHTTPClientTraceAddsExpectedTags(t *testing.T) {
74 dataProvider := []struct {
75 ResponseStatusCode int
76 ErrorTagValue string
77 }{
78 {http.StatusOK, ""},
79 {http.StatusForbidden, fmt.Sprint(http.StatusForbidden)},
80 }
81
82 for _, data := range dataProvider {
83 testHTTPClientTraceCase(t, data.ResponseStatusCode, data.ErrorTagValue)
84 }
85 }
86
87 func testHTTPClientTraceCase(t *testing.T, responseStatusCode int, errTagValue string) {
88 ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
89 w.WriteHeader(responseStatusCode)
90 w.Write([]byte(testBody))
91 }))
92 defer ts.Close()
93
94 rec := recorder.NewReporter()
95 defer rec.Close()
96
97 tr, err := zipkin.NewTracer(rec)
98 if err != nil {
99 t.Errorf("Unwanted error: %s", err.Error())
100 }
101
102 rMethod := "GET"
103 rURL, _ := url.Parse(ts.URL)
104
105 clientTracer := zipkinkit.HTTPClientTrace(
106 tr,
107 zipkinkit.Name(testName),
108 zipkinkit.Tags(map[string]string{testTagKey: testTagValue}),
109 )
110
111 ep := kithttp.NewClient(
112 rMethod,
113 rURL,
114 func(ctx context.Context, r *http.Request, i interface{}) error {
115 return nil
116 },
117 func(ctx context.Context, r *http.Response) (response interface{}, err error) {
118 return nil, nil
119 },
120 clientTracer,
121 ).Endpoint()
122
123 _, err = ep(context.Background(), nil)
124 if err != nil {
125 t.Fatalf("unwanted error: %s", err.Error())
126 }
127
128 spans := rec.Flush()
129 if want, have := 1, len(spans); want != have {
130 t.Fatalf("incorrect number of spans, wanted %d, got %d", want, have)
131 }
132
133 span := spans[0]
134 if span.SpanContext.ParentID != nil {
135 t.Fatalf("incorrect parentID, wanted nil, got %s", span.SpanContext.ParentID)
136 }
137
138 if want, have := testName, span.Name; want != have {
139 t.Fatalf("incorrect span name, wanted %s, got %s", want, have)
140 }
141
142 if want, have := model.Client, span.Kind; want != have {
143 t.Fatalf("incorrect span kind, wanted %s, got %s", want, have)
144 }
145
146 tags := map[string]string{
147 testTagKey: testTagValue,
148 string(zipkin.TagHTTPStatusCode): fmt.Sprint(responseStatusCode),
149 string(zipkin.TagHTTPMethod): rMethod,
150 string(zipkin.TagHTTPUrl): rURL.String(),
151 string(zipkin.TagHTTPResponseSize): fmt.Sprint(len(testBody)),
152 }
153
154 if errTagValue != "" {
155 tags[string(zipkin.TagError)] = fmt.Sprint(errTagValue)
156 }
157
158 if !reflect.DeepEqual(span.Tags, tags) {
159 t.Fatalf("invalid tags set, wanted %+v, got %+v", tags, span.Tags)
160 }
161 }
162
163 func TestHTTPServerTrace(t *testing.T) {
164 rec := recorder.NewReporter()
165 defer rec.Close()
166
167 // explicitely show we use the default of RPC shared spans in Zipkin as it
168 // is idiomatic for Zipkin to share span identifiers between client and
169 // server side.
170 tr, _ := zipkin.NewTracer(rec, zipkin.WithSharedSpans(true))
171
172 handler := kithttp.NewServer(
173 endpoint.Nop,
174 func(context.Context, *http.Request) (interface{}, error) { return nil, nil },
175 func(context.Context, http.ResponseWriter, interface{}) error { return nil },
176 zipkinkit.HTTPServerTrace(tr),
177 )
178
179 server := httptest.NewServer(handler)
180 defer server.Close()
181
182 const httpMethod = "GET"
183
184 req, err := http.NewRequest(httpMethod, server.URL, nil)
185 if err != nil {
186 t.Fatalf("unable to create HTTP request: %s", err.Error())
187 }
188
189 parentSpan := tr.StartSpan("Dummy")
190
191 b3.InjectHTTP(req)(parentSpan.Context())
192
193 client := http.Client{}
194 resp, err := client.Do(req)
195 if err != nil {
196 t.Fatalf("unable to send HTTP request: %s", err.Error())
197 }
198 resp.Body.Close()
199
200 spans := rec.Flush()
201 if want, have := 1, len(spans); want != have {
202 t.Fatalf("incorrect number of spans, want %d, have %d", want, have)
203 }
204
205 if want, have := parentSpan.Context().TraceID, spans[0].SpanContext.TraceID; want != have {
206 t.Errorf("incorrect TraceID, want %+v, have %+v", want, have)
207 }
208
209 if want, have := parentSpan.Context().ID, spans[0].SpanContext.ID; want != have {
210 t.Errorf("incorrect span ID, want %d, have %d", want, have)
211 }
212
213 if want, have := httpMethod, spans[0].Name; want != have {
214 t.Errorf("incorrect span name, want %s, have %s", want, have)
215 }
216 }
0 package zipkin
1
2 import "github.com/go-kit/kit/log"
3
4 // TracerOption allows for functional options to our Zipkin tracing middleware.
5 type TracerOption func(o *tracerOptions)
6
7 // Name sets the name for an instrumented transport endpoint. If name is omitted
8 // at tracing middleware creation, the method of the transport or transport rpc
9 // name is used.
10 func Name(name string) TracerOption {
11 return func(o *tracerOptions) {
12 o.name = name
13 }
14 }
15
16 // Tags adds default tags to our Zipkin transport spans.
17 func Tags(tags map[string]string) TracerOption {
18 return func(o *tracerOptions) {
19 for k, v := range tags {
20 o.tags[k] = v
21 }
22 }
23 }
24
25 // Logger adds a Go kit logger to our Zipkin Middleware to log SpanContext
26 // extract / inject errors if they occur. Default is Noop.
27 func Logger(logger log.Logger) TracerOption {
28 return func(o *tracerOptions) {
29 if logger != nil {
30 o.logger = logger
31 }
32 }
33 }
34
35 // AllowPropagation instructs the tracer to allow or deny propagation of the
36 // span context between this instrumented client or service and its peers. If
37 // the instrumented client connects to services outside its own platform or if
38 // the instrumented service receives requests from untrusted clients it is
39 // strongly advised to disallow propagation. Propagation between services inside
40 // your own platform benefit from propagation. Default for both TraceClient and
41 // TraceServer is to allow propagation.
42 func AllowPropagation(propagate bool) TracerOption {
43 return func(o *tracerOptions) {
44 o.propagate = propagate
45 }
46 }
47
48 type tracerOptions struct {
49 tags map[string]string
50 name string
51 logger log.Logger
52 propagate bool
53 }
2121 grpcReply reflect.Type
2222 before []ClientRequestFunc
2323 after []ClientResponseFunc
24 finalizer []ClientFinalizerFunc
2425 }
2526
2627 // NewClient constructs a usable Client for a single remote endpoint.
7475 return func(c *Client) { c.after = append(c.after, after...) }
7576 }
7677
78 // ClientFinalizer is executed at the end of every gRPC request.
79 // By default, no finalizer is registered.
80 func ClientFinalizer(f ...ClientFinalizerFunc) ClientOption {
81 return func(s *Client) { s.finalizer = append(s.finalizer, f...) }
82 }
83
7784 // Endpoint returns a usable endpoint that will invoke the gRPC specified by the
7885 // client.
7986 func (c Client) Endpoint() endpoint.Endpoint {
80 return func(ctx context.Context, request interface{}) (interface{}, error) {
87 return func(ctx context.Context, request interface{}) (response interface{}, err error) {
8188 ctx, cancel := context.WithCancel(ctx)
8289 defer cancel()
90
91 if c.finalizer != nil {
92 defer func() {
93 for _, f := range c.finalizer {
94 f(ctx, err)
95 }
96 }()
97 }
98
99 ctx = context.WithValue(ctx, ContextKeyRequestMethod, c.method)
83100
84101 req, err := c.enc(ctx, request)
85102 if err != nil {
94111
95112 var header, trailer metadata.MD
96113 grpcReply := reflect.New(c.grpcReply).Interface()
97 if err = grpc.Invoke(
98 ctx, c.method, req, grpcReply, c.client,
99 grpc.Header(&header), grpc.Trailer(&trailer),
114 if err = c.client.Invoke(
115 ctx, c.method, req, grpcReply, grpc.Header(&header),
116 grpc.Trailer(&trailer),
100117 ); err != nil {
101118 return nil, err
102119 }
105122 ctx = f(ctx, header, trailer)
106123 }
107124
108 response, err := c.dec(ctx, grpcReply)
125 response, err = c.dec(ctx, grpcReply)
109126 if err != nil {
110127 return nil, err
111128 }
112129 return response, nil
113130 }
114131 }
132
133 // ClientFinalizerFunc can be used to perform work at the end of a client gRPC
134 // request, after the response is returned. The principal
135 // intended use is for error logging. Additional response parameters are
136 // provided in the context under keys with the ContextKeyResponse prefix.
137 // Note: err may be nil. There maybe also no additional response parameters depending on
138 // when an error occurs.
139 type ClientFinalizerFunc func(ctx context.Context, err error)
7373 }
7474 return key, val
7575 }
76
77 type contextKey int
78
79 const (
80 ContextKeyRequestMethod contextKey = iota
81 )
00 package grpc
11
22 import (
3 "context"
4
35 oldcontext "golang.org/x/net/context"
46 "google.golang.org/grpc"
57 "google.golang.org/grpc/metadata"
1719
1820 // Server wraps an endpoint and implements grpc.Handler.
1921 type Server struct {
20 e endpoint.Endpoint
21 dec DecodeRequestFunc
22 enc EncodeResponseFunc
23 before []ServerRequestFunc
24 after []ServerResponseFunc
25 logger log.Logger
22 e endpoint.Endpoint
23 dec DecodeRequestFunc
24 enc EncodeResponseFunc
25 before []ServerRequestFunc
26 after []ServerResponseFunc
27 finalizer []ServerFinalizerFunc
28 logger log.Logger
2629 }
2730
2831 // NewServer constructs a new server, which implements wraps the provided
6972 return func(s *Server) { s.logger = logger }
7073 }
7174
75 // ServerFinalizer is executed at the end of every gRPC request.
76 // By default, no finalizer is registered.
77 func ServerFinalizer(f ...ServerFinalizerFunc) ServerOption {
78 return func(s *Server) { s.finalizer = append(s.finalizer, f...) }
79 }
80
7281 // ServeGRPC implements the Handler interface.
73 func (s Server) ServeGRPC(ctx oldcontext.Context, req interface{}) (oldcontext.Context, interface{}, error) {
82 func (s Server) ServeGRPC(ctx oldcontext.Context, req interface{}) (retctx oldcontext.Context, resp interface{}, err error) {
7483 // Retrieve gRPC metadata.
7584 md, ok := metadata.FromIncomingContext(ctx)
7685 if !ok {
7786 md = metadata.MD{}
7887 }
7988
89 if len(s.finalizer) > 0 {
90 defer func() {
91 for _, f := range s.finalizer {
92 f(ctx, err)
93 }
94 }()
95 }
96
8097 for _, f := range s.before {
8198 ctx = f(ctx, md)
8299 }
83100
84 request, err := s.dec(ctx, req)
101 var (
102 request interface{}
103 response interface{}
104 grpcResp interface{}
105 )
106
107 request, err = s.dec(ctx, req)
85108 if err != nil {
86109 s.logger.Log("err", err)
87110 return ctx, nil, err
88111 }
89112
90 response, err := s.e(ctx, request)
113 response, err = s.e(ctx, request)
91114 if err != nil {
92115 s.logger.Log("err", err)
93116 return ctx, nil, err
98121 ctx = f(ctx, &mdHeader, &mdTrailer)
99122 }
100123
101 grpcResp, err := s.enc(ctx, response)
124 grpcResp, err = s.enc(ctx, response)
102125 if err != nil {
103126 s.logger.Log("err", err)
104127 return ctx, nil, err
120143
121144 return ctx, grpcResp, nil
122145 }
146
147 // ServerFinalizerFunc can be used to perform work at the end of an gRPC
148 // request, after the response has been written to the client.
149 type ServerFinalizerFunc func(ctx context.Context, err error)
150
151 // Interceptor is a grpc UnaryInterceptor that injects the method name into
152 // context so it can be consumed by Go kit gRPC middlewares. The Interceptor
153 // typically is added at creation time of the grpc-go server.
154 // Like this: `grpc.NewServer(grpc.UnaryInterceptor(kitgrpc.Interceptor))`
155 func Interceptor(
156 ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler,
157 ) (resp interface{}, err error) {
158 ctx = context.WithValue(ctx, ContextKeyRequestMethod, info.FullMethod)
159 return handler(ctx, req)
160 }
2020 dec DecodeResponseFunc
2121 before []RequestFunc
2222 after []ClientResponseFunc
23 finalizer ClientFinalizerFunc
23 finalizer []ClientFinalizerFunc
2424 bufferedStream bool
2525 }
2626
7272
7373 // ClientFinalizer is executed at the end of every HTTP request.
7474 // By default, no finalizer is registered.
75 func ClientFinalizer(f ClientFinalizerFunc) ClientOption {
76 return func(s *Client) { s.finalizer = f }
75 func ClientFinalizer(f ...ClientFinalizerFunc) ClientOption {
76 return func(s *Client) { s.finalizer = append(s.finalizer, f...) }
7777 }
7878
7979 // BufferedStream sets whether the Response.Body is left open, allowing it
9898 ctx = context.WithValue(ctx, ContextKeyResponseHeaders, resp.Header)
9999 ctx = context.WithValue(ctx, ContextKeyResponseSize, resp.ContentLength)
100100 }
101 c.finalizer(ctx, err)
101 for _, f := range c.finalizer {
102 f(ctx, err)
103 }
102104 }()
103105 }
104106
1616 before []RequestFunc
1717 after []ServerResponseFunc
1818 errorEncoder ErrorEncoder
19 finalizer ServerFinalizerFunc
19 finalizer []ServerFinalizerFunc
2020 logger log.Logger
2121 }
2222
7575
7676 // ServerFinalizer is executed at the end of every HTTP request.
7777 // By default, no finalizer is registered.
78 func ServerFinalizer(f ServerFinalizerFunc) ServerOption {
79 return func(s *Server) { s.finalizer = f }
78 func ServerFinalizer(f ...ServerFinalizerFunc) ServerOption {
79 return func(s *Server) { s.finalizer = append(s.finalizer, f...) }
8080 }
8181
8282 // ServeHTTP implements http.Handler.
8383 func (s Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
8484 ctx := r.Context()
8585
86 if s.finalizer != nil {
86 if len(s.finalizer) > 0 {
8787 iw := &interceptingWriter{w, http.StatusOK, 0}
8888 defer func() {
8989 ctx = context.WithValue(ctx, ContextKeyResponseHeaders, iw.Header())
9090 ctx = context.WithValue(ctx, ContextKeyResponseSize, iw.written)
91 s.finalizer(ctx, iw.code, r)
91 for _, f := range s.finalizer {
92 f(ctx, iw.code, r)
93 }
9294 }()
9395 w = iw
9496 }