Codebase list golang-github-go-kit-kit / 79f8b0c
Add a logger, standard tags, and round corners Ben Sigelman 7 years ago
11 changed file(s) with 115 addition(s) and 62 deletion(s). Raw diff Collapse all Expand all
77 "github.com/go-kit/kit/endpoint"
88 "github.com/go-kit/kit/examples/addsvc/pb"
99 "github.com/go-kit/kit/loadbalancer"
10 "github.com/go-kit/kit/log"
1011 kitot "github.com/go-kit/kit/tracing/opentracing"
1112 grpctransport "github.com/go-kit/kit/transport/grpc"
1213 "github.com/opentracing/opentracing-go"
1516 // MakeSumEndpointFactory returns a loadbalancer.Factory that transforms GRPC
1617 // host:port strings into Endpoints that call the Sum method on a GRPC server
1718 // at that address.
18 func MakeSumEndpointFactory(tracer opentracing.Tracer) loadbalancer.Factory {
19 func MakeSumEndpointFactory(tracer opentracing.Tracer, tracingLogger log.Logger) loadbalancer.Factory {
1920 return func(instance string) (endpoint.Endpoint, io.Closer, error) {
2021 cc, err := grpc.Dial(instance, grpc.WithInsecure())
2122 return grpctransport.NewClient(
2526 encodeSumRequest,
2627 decodeSumResponse,
2728 pb.SumReply{},
28 grpctransport.SetClientBefore(kitot.ToGRPCRequest(tracer)),
29 grpctransport.SetClientBefore(kitot.ToGRPCRequest(tracer, tracingLogger)),
2930 ).Endpoint(), cc, err
3031 }
3132 }
3334 // MakeConcatEndpointFactory returns a loadbalancer.Factory that transforms
3435 // GRPC host:port strings into Endpoints that call the Concat method on a GRPC
3536 // server at that address.
36 func MakeConcatEndpointFactory(tracer opentracing.Tracer) loadbalancer.Factory {
37 func MakeConcatEndpointFactory(tracer opentracing.Tracer, tracingLogger log.Logger) loadbalancer.Factory {
3738 return func(instance string) (endpoint.Endpoint, io.Closer, error) {
3839 cc, err := grpc.Dial(instance, grpc.WithInsecure())
3940 return grpctransport.NewClient(
4344 encodeConcatRequest,
4445 decodeConcatResponse,
4546 pb.ConcatReply{},
46 grpctransport.SetClientBefore(kitot.ToGRPCRequest(tracer)),
47 grpctransport.SetClientBefore(kitot.ToGRPCRequest(tracer, tracingLogger)),
4748 ).Endpoint(), cc, err
4849 }
4950 }
66 "github.com/go-kit/kit/endpoint"
77 "github.com/go-kit/kit/examples/addsvc/server"
88 "github.com/go-kit/kit/loadbalancer"
9 "github.com/go-kit/kit/log"
910 kitot "github.com/go-kit/kit/tracing/opentracing"
1011 httptransport "github.com/go-kit/kit/transport/http"
1112 "github.com/opentracing/opentracing-go"
1516 // an Endpoint.
1617 //
1718 // The path of the url is reset to /sum.
18 func MakeSumEndpointFactory(tracer opentracing.Tracer) loadbalancer.Factory {
19 func MakeSumEndpointFactory(tracer opentracing.Tracer, tracingLogger log.Logger) loadbalancer.Factory {
1920 return func(instance string) (endpoint.Endpoint, io.Closer, error) {
2021 sumURL, err := url.Parse(instance)
2122 if err != nil {
2930 server.EncodeSumRequest,
3031 server.DecodeSumResponse,
3132 httptransport.SetClient(nil),
32 httptransport.SetClientBefore(kitot.ToHTTPRequest(tracer)),
33 httptransport.SetClientBefore(kitot.ToHTTPRequest(tracer, tracingLogger)),
3334 )
3435
3536 return client.Endpoint(), nil, nil
4041 // into an Endpoint.
4142 //
4243 // The path of the url is reset to /concat.
43 func MakeConcatEndpointFactory(tracer opentracing.Tracer) loadbalancer.Factory {
44 func MakeConcatEndpointFactory(tracer opentracing.Tracer, tracingLogger log.Logger) loadbalancer.Factory {
4445 return func(instance string) (endpoint.Endpoint, io.Closer, error) {
4546 concatURL, err := url.Parse(instance)
4647 if err != nil {
5455 server.EncodeConcatRequest,
5556 server.DecodeConcatResponse,
5657 httptransport.SetClient(nil),
57 httptransport.SetClientBefore(kitot.ToHTTPRequest(tracer)),
58 httptransport.SetClientBefore(kitot.ToHTTPRequest(tracer, tracingLogger)),
5859 )
5960
6061 return client.Endpoint(), nil, nil
3737 thriftFramed = flag.Bool("thrift.framed", false, "true to enable framing")
3838
3939 // Two OpenTracing backends (to demonstrate how they can be interchanged):
40 appdashHostport = flag.String("appdash_hostport", "", "Enable Appdash tracing via an Appdash server host:port")
41 lightstepAccessToken = flag.String("lightstep_access_token", "", "Enable LightStep tracing via a LightStep access token")
40 appdashAddr = flag.String("appdash.addr", "", "Enable Appdash tracing via an Appdash server host:port")
41 lightstepAccessToken = flag.String("lightstep.token", "", "Enable LightStep tracing via a LightStep access token")
4242 )
4343 flag.Parse()
4444 if len(os.Args) < 4 {
5656 logger = log.NewLogfmtLogger(os.Stdout)
5757 logger = log.NewContext(logger).With("caller", log.DefaultCaller)
5858 logger = log.NewContext(logger).With("transport", *transport)
59 tracingLogger := log.NewContext(logger).With("component", "tracing")
5960
6061 // Set up OpenTracing
6162 var tracer opentracing.Tracer
6263 {
63 if len(*appdashHostport) > 0 {
64 tracer = appdashot.NewTracer(appdash.NewRemoteCollector(*appdashHostport))
65 }
66 if len(*lightstepAccessToken) > 0 {
67 if tracer != nil {
68 panic("Attempted to configure multiple OpenTracing implementations")
69 }
64 switch {
65 case *appdashAddr != "" && *lightstepAccessToken == "":
66 tracer = appdashot.NewTracer(appdash.NewRemoteCollector(*appdashAddr))
67 case *appdashAddr == "" && *lightstepAccessToken != "":
7068 tracer = lightstep.NewTracer(lightstep.Options{
7169 AccessToken: *lightstepAccessToken,
7270 })
73 }
74 if tracer == nil {
75 tracer = opentracing.GlobalTracer() // the noop tracer
71 defer lightstep.FlushLightStepTracer(tracer)
72 case *appdashAddr == "" && *lightstepAccessToken == "":
73 tracer = opentracing.GlobalTracer() // no-op
74 default:
75 panic("specify either -appdash.addr or -lightstep.access.token, not both")
7676 }
7777 }
7878
8484 switch *transport {
8585 case "grpc":
8686 instances = strings.Split(*grpcAddrs, ",")
87 sumFactory = grpcclient.MakeSumEndpointFactory(tracer)
88 concatFactory = grpcclient.MakeConcatEndpointFactory(tracer)
87 sumFactory = grpcclient.MakeSumEndpointFactory(tracer, tracingLogger)
88 concatFactory = grpcclient.MakeConcatEndpointFactory(tracer, tracingLogger)
8989
9090 case "httpjson":
9191 instances = strings.Split(*httpAddrs, ",")
9494 instances[i] = "http://" + rawurl
9595 }
9696 }
97 sumFactory = httpjsonclient.MakeSumEndpointFactory(tracer)
98 concatFactory = httpjsonclient.MakeConcatEndpointFactory(tracer)
97 sumFactory = httpjsonclient.MakeSumEndpointFactory(tracer, tracingLogger)
98 concatFactory = httpjsonclient.MakeConcatEndpointFactory(tracer, tracingLogger)
9999
100100 case "netrpc":
101101 instances = strings.Split(*netrpcAddrs, ",")
135135 logger.Log("err", "invalid method "+method)
136136 os.Exit(1)
137137 }
138
139 if len(*lightstepAccessToken) > 0 {
140 lightstep.FlushLightStepTracer(tracer)
141 }
142138 }
143139
144140 func buildEndpoint(tracer opentracing.Tracer, operationName string, instances []string, factory loadbalancer.Factory, seed int64, logger log.Logger) endpoint.Endpoint {
55 "github.com/go-kit/kit/examples/addsvc/pb"
66 "github.com/go-kit/kit/examples/addsvc/server"
77 servergrpc "github.com/go-kit/kit/examples/addsvc/server/grpc"
8 "github.com/go-kit/kit/log"
89 kitot "github.com/go-kit/kit/tracing/opentracing"
910 "github.com/go-kit/kit/transport/grpc"
1011 "github.com/opentracing/opentracing-go"
1415 sum, concat grpc.Handler
1516 }
1617
17 func newGRPCBinding(ctx context.Context, tracer opentracing.Tracer, svc server.AddService) grpcBinding {
18 func newGRPCBinding(ctx context.Context, tracer opentracing.Tracer, svc server.AddService, tracingLogger log.Logger) grpcBinding {
1819 return grpcBinding{
1920 sum: grpc.NewServer(
2021 ctx,
2122 kitot.TraceServer(tracer, "sum")(makeSumEndpoint(svc)),
2223 servergrpc.DecodeSumRequest,
2324 servergrpc.EncodeSumResponse,
24 grpc.ServerBefore(kitot.FromGRPCRequest(tracer, "")),
25 grpc.ServerBefore(kitot.FromGRPCRequest(tracer, "", tracingLogger)),
2526 ),
2627 concat: grpc.NewServer(
2728 ctx,
2829 kitot.TraceServer(tracer, "concat")(makeConcatEndpoint(svc)),
2930 servergrpc.DecodeConcatRequest,
3031 servergrpc.EncodeConcatResponse,
31 grpc.ServerBefore(kitot.FromGRPCRequest(tracer, "")),
32 grpc.ServerBefore(kitot.FromGRPCRequest(tracer, "", tracingLogger)),
3233 ),
3334 }
3435 }
5050 thriftFramed = fs.Bool("thrift.framed", false, "true to enable framing")
5151
5252 // Supported OpenTracing backends
53 appdashHostport = fs.String("appdash_hostport", "", "Enable Appdash tracing via an Appdash server host:port")
54 lightstepAccessToken = fs.String("lightstep_access_token", "", "Enable LightStep tracing via a LightStep access token")
53 appdashAddr = fs.String("appdash.addr", "", "Enable Appdash tracing via an Appdash server host:port")
54 lightstepAccessToken = fs.String("lightstep.token", "", "Enable LightStep tracing via a LightStep access token")
5555 )
5656 flag.Usage = fs.Usage // only show our flags
5757 if err := fs.Parse(os.Args[1:]); err != nil {
8686 // Set up OpenTracing
8787 var tracer opentracing.Tracer
8888 {
89 if len(*appdashHostport) > 0 {
90 tracer = appdashot.NewTracer(appdash.NewRemoteCollector(*appdashHostport))
91 }
92 if len(*lightstepAccessToken) > 0 {
93 if tracer != nil {
94 panic("Attempted to configure multiple OpenTracing implementations")
95 }
89 switch {
90 case *appdashAddr != "" && *lightstepAccessToken == "":
91 tracer = appdashot.NewTracer(appdash.NewRemoteCollector(*appdashAddr))
92 case *appdashAddr == "" && *lightstepAccessToken != "":
9693 tracer = lightstep.NewTracer(lightstep.Options{
9794 AccessToken: *lightstepAccessToken,
9895 })
99 }
100 if tracer == nil {
101 tracer = opentracing.GlobalTracer() // the noop tracer
96 defer lightstep.FlushLightStepTracer(tracer)
97 case *appdashAddr == "" && *lightstepAccessToken == "":
98 tracer = opentracing.GlobalTracer() // no-op
99 default:
100 panic("specify either -appdash.addr or -lightstep.access.token, not both")
102101 }
103102 }
104103
130129 go func() {
131130 var (
132131 transportLogger = log.NewContext(logger).With("transport", "HTTP/JSON")
132 tracingLogger = log.NewContext(transportLogger).With("component", "tracing")
133133 mux = http.NewServeMux()
134134 sum, concat endpoint.Endpoint
135135 )
142142 server.DecodeSumRequest,
143143 server.EncodeSumResponse,
144144 httptransport.ServerErrorLogger(transportLogger),
145 httptransport.ServerBefore(kitot.FromHTTPRequest(tracer, "sum")),
145 httptransport.ServerBefore(kitot.FromHTTPRequest(tracer, "sum", tracingLogger)),
146146 ))
147147
148148 concat = makeConcatEndpoint(svc)
153153 server.DecodeConcatRequest,
154154 server.EncodeConcatResponse,
155155 httptransport.ServerErrorLogger(transportLogger),
156 httptransport.ServerBefore(kitot.FromHTTPRequest(tracer, "concat")),
156 httptransport.ServerBefore(kitot.FromHTTPRequest(tracer, "concat", tracingLogger)),
157157 ))
158158
159159 transportLogger.Log("addr", *httpAddr)
163163 // Transport: gRPC
164164 go func() {
165165 transportLogger := log.NewContext(logger).With("transport", "gRPC")
166 tracingLogger := log.NewContext(transportLogger).With("component", "tracing")
166167 ln, err := net.Listen("tcp", *grpcAddr)
167168 if err != nil {
168169 errc <- err
169170 return
170171 }
171172 s := grpc.NewServer() // uses its own, internal context
172 pb.RegisterAddServer(s, newGRPCBinding(root, tracer, svc))
173 pb.RegisterAddServer(s, newGRPCBinding(root, tracer, svc, tracingLogger))
173174 transportLogger.Log("addr", *grpcAddr)
174175 errc <- s.Serve(ln)
175176 }()
7777 factory loadbalancer.Factory
7878 }{
7979 "addsvc": {
80 {path: "/api/addsvc/concat", factory: grpc.MakeConcatEndpointFactory(opentracing.GlobalTracer())},
81 {path: "/api/addsvc/sum", factory: grpc.MakeSumEndpointFactory(opentracing.GlobalTracer())},
80 {path: "/api/addsvc/concat", factory: grpc.MakeConcatEndpointFactory(opentracing.GlobalTracer(), nil)},
81 {path: "/api/addsvc/sum", factory: grpc.MakeSumEndpointFactory(opentracing.GlobalTracer(), nil)},
8282 },
8383 "stringsvc": {
8484 {path: "/api/stringsvc/uppercase", factory: httpFactory(ctx, "GET", "uppercase/")},
2121 } else {
2222 serverSpan.SetOperationName(operationName)
2323 }
24 defer serverSpan.Finish()
2425 otext.SpanKind.Set(serverSpan, otext.SpanKindRPCServer)
2526 ctx = opentracing.ContextWithSpan(ctx, serverSpan)
26 defer serverSpan.Finish()
2727 return next(ctx, request)
2828 }
2929 }
3939 OperationName: operationName,
4040 Parent: parentSpan, // may be nil
4141 })
42 defer clientSpan.Finish()
4243 otext.SpanKind.Set(clientSpan, otext.SpanKindRPCClient)
4344 ctx = opentracing.ContextWithSpan(ctx, clientSpan)
44 defer clientSpan.Finish()
4545 return next(ctx, request)
4646 }
4747 }
00 package opentracing
11
22 import (
3 "github.com/go-kit/kit/log"
34 "github.com/opentracing/opentracing-go"
45 "golang.org/x/net/context"
56 "google.golang.org/grpc/metadata"
89 // ToGRPCRequest returns a grpc RequestFunc that injects an OpenTracing Span
910 // found in `ctx` into the grpc Metadata. If no such Span can be found, the
1011 // RequestFunc is a noop.
11 func ToGRPCRequest(tracer opentracing.Tracer) func(ctx context.Context, md *metadata.MD) context.Context {
12 //
13 // The logger is used to report errors and may be nil.
14 func ToGRPCRequest(tracer opentracing.Tracer, logger log.Logger) func(ctx context.Context, md *metadata.MD) context.Context {
1215 return func(ctx context.Context, md *metadata.MD) context.Context {
1316 if span := opentracing.SpanFromContext(ctx); span != nil {
1417 // There's nothing we can do with an error here.
15 _ = tracer.Inject(span, opentracing.TextMap, metadataReaderWriter{md})
18 err := tracer.Inject(span, opentracing.TextMap, metadataReaderWriter{md})
19 if err != nil && logger != nil {
20 logger.Log("msg", "Inject failed", "err", err)
21 }
1622 }
1723 return ctx
1824 }
2329 // `operationName` accordingly. If no trace could be found in `req`, the Span
2430 // will be a trace root. The Span is incorporated in the returned Context and
2531 // can be retrieved with opentracing.SpanFromContext(ctx).
26 func FromGRPCRequest(tracer opentracing.Tracer, operationName string) func(ctx context.Context, md *metadata.MD) context.Context {
32 //
33 // The logger is used to report errors and may be nil.
34 func FromGRPCRequest(tracer opentracing.Tracer, operationName string, logger log.Logger) func(ctx context.Context, md *metadata.MD) context.Context {
2735 return func(ctx context.Context, md *metadata.MD) context.Context {
2836 span, err := tracer.Join(operationName, opentracing.TextMap, metadataReaderWriter{md})
29 if err != nil || span == nil {
37 if err != nil && logger != nil {
38 logger.Log("msg", "Join failed", "err", err)
39 }
40 if span == nil {
3041 span = tracer.StartSpan(operationName)
3142 }
3243 return opentracing.ContextWithSpan(ctx, span)
1717 // Initialize the ctx with a Span to inject.
1818 beforeSpan := tracer.StartSpan("to_inject").(*mocktracer.MockSpan)
1919 defer beforeSpan.Finish()
20 beforeSpan.SetBaggageItem("baggage", "check")
2021 beforeCtx := opentracing.ContextWithSpan(context.Background(), beforeSpan)
2122
22 var toGRPCFunc grpc.RequestFunc = kitot.ToGRPCRequest(tracer)
23 var toGRPCFunc grpc.RequestFunc = kitot.ToGRPCRequest(tracer, nil)
2324 md := metadata.Pairs()
2425 // Call the RequestFunc.
2526 afterCtx := toGRPCFunc(beforeCtx, &md)
3637 }
3738
3839 // Use FromGRPCRequest to verify that we can join with the trace given MD.
39 var fromGRPCFunc grpc.RequestFunc = kitot.FromGRPCRequest(tracer, "joined")
40 var fromGRPCFunc grpc.RequestFunc = kitot.FromGRPCRequest(tracer, "joined", nil)
4041 joinCtx := fromGRPCFunc(afterCtx, &md)
4142 joinedSpan := opentracing.SpanFromContext(joinCtx).(*mocktracer.MockSpan)
4243
5152 if want, have := "joined", joinedSpan.OperationName; want != have {
5253 t.Errorf("Want %q, have %q", want, have)
5354 }
55 if want, have := "check", joinedSpan.BaggageItem("baggage"); want != have {
56 t.Errorf("Want %q, have %q", want, have)
57 }
5458 }
00 package opentracing
11
22 import (
3 "net"
34 "net/http"
5 "strconv"
46
7 "github.com/go-kit/kit/log"
58 kithttp "github.com/go-kit/kit/transport/http"
69 "github.com/opentracing/opentracing-go"
10 "github.com/opentracing/opentracing-go/ext"
711 "golang.org/x/net/context"
812 )
913
1014 // ToHTTPRequest returns an http RequestFunc that injects an OpenTracing Span
1115 // found in `ctx` into the http headers. If no such Span can be found, the
1216 // RequestFunc is a noop.
13 func ToHTTPRequest(tracer opentracing.Tracer) kithttp.RequestFunc {
17 //
18 // The logger is used to report errors and may be nil.
19 func ToHTTPRequest(tracer opentracing.Tracer, logger log.Logger) kithttp.RequestFunc {
1420 return func(ctx context.Context, req *http.Request) context.Context {
1521 // Try to find a Span in the Context.
1622 if span := opentracing.SpanFromContext(ctx); span != nil {
23 // Add standard OpenTracing tags.
24 ext.HTTPMethod.Set(span, req.URL.RequestURI())
25 host, portString, err := net.SplitHostPort(req.URL.Host)
26 if err == nil {
27 ext.PeerHostname.Set(span, host)
28 if port, err := strconv.Atoi(portString); err != nil {
29 ext.PeerPort.Set(span, uint16(port))
30 }
31 } else {
32 ext.PeerHostname.Set(span, req.URL.Host)
33 }
34
1735 // There's nothing we can do with any errors here.
18 _ = tracer.Inject(span, opentracing.TextMap, opentracing.HTTPHeaderTextMapCarrier(req.Header))
36 err = tracer.Inject(
37 span,
38 opentracing.TextMap,
39 opentracing.HTTPHeaderTextMapCarrier(req.Header),
40 )
41 if err != nil && logger != nil {
42 logger.Log("msg", "Join failed", "err", err)
43 }
1944 }
2045 return ctx
2146 }
2651 // `operationName` accordingly. If no trace could be found in `req`, the Span
2752 // will be a trace root. The Span is incorporated in the returned Context and
2853 // can be retrieved with opentracing.SpanFromContext(ctx).
29 func FromHTTPRequest(tracer opentracing.Tracer, operationName string) kithttp.RequestFunc {
54 //
55 // The logger is used to report errors and may be nil.
56 func FromHTTPRequest(tracer opentracing.Tracer, operationName string, logger log.Logger) kithttp.RequestFunc {
3057 return func(ctx context.Context, req *http.Request) context.Context {
3158 // Try to join to a trace propagated in `req`. There's nothing we can
3259 // do with any errors here, so we ignore them.
33 span, _ := tracer.Join(operationName, opentracing.TextMap, opentracing.HTTPHeaderTextMapCarrier(req.Header))
60 span, err := tracer.Join(
61 operationName,
62 opentracing.TextMap,
63 opentracing.HTTPHeaderTextMapCarrier(req.Header),
64 )
65 if err != nil && logger != nil {
66 logger.Log("msg", "Join failed", "err", err)
67 }
3468 if span == nil {
3569 span = opentracing.StartSpan(operationName)
3670 }
1616 // Initialize the ctx with a Span to inject.
1717 beforeSpan := tracer.StartSpan("to_inject").(*mocktracer.MockSpan)
1818 defer beforeSpan.Finish()
19 beforeSpan.SetBaggageItem("baggage", "check")
1920 beforeCtx := opentracing.ContextWithSpan(context.Background(), beforeSpan)
2021
21 var toHTTPFunc kithttp.RequestFunc = kitot.ToHTTPRequest(tracer)
22 var toHTTPFunc kithttp.RequestFunc = kitot.ToHTTPRequest(tracer, nil)
2223 req, _ := http.NewRequest("GET", "http://test.biz/url", nil)
2324 // Call the RequestFunc.
2425 afterCtx := toHTTPFunc(beforeCtx, req)
3536 }
3637
3738 // Use FromHTTPRequest to verify that we can join with the trace given a req.
38 var fromHTTPFunc kithttp.RequestFunc = kitot.FromHTTPRequest(tracer, "joined")
39 var fromHTTPFunc kithttp.RequestFunc = kitot.FromHTTPRequest(tracer, "joined", nil)
3940 joinCtx := fromHTTPFunc(afterCtx, req)
4041 joinedSpan := opentracing.SpanFromContext(joinCtx).(*mocktracer.MockSpan)
4142
5051 if want, have := "joined", joinedSpan.OperationName; want != have {
5152 t.Errorf("Want %q, have %q", want, have)
5253 }
54 if want, have := "check", joinedSpan.BaggageItem("baggage"); want != have {
55 t.Errorf("Want %q, have %q", want, have)
56 }
5357 }