Codebase list golang-github-go-kit-kit / 5458306
Merge branch 'master' of github.com:go-kit/kit into provider JP Robinson 7 years ago
20 changed file(s) with 798 addition(s) and 211 deletion(s). Raw diff Collapse all Expand all
2929 _testmain.go
3030
3131 *.exe
32
33 # https://github.com/github/gitignore/blob/master/Global/Vim.gitignore
34 # swap
35 [._]*.s[a-w][a-z]
36 [._]s[a-w][a-z]
37 # session
38 Session.vim
39 # temporary
40 .netrwhist
41 *~
42 # auto-generated tag files
43 tags
44
639639
640640 ## Advanced topics
641641
642 ### Threading a context
643
644 The context object is used to carry information across conceptual boundaries in the scope of a single request.
645 In our example, we haven't yet threaded the context through our business logic.
646 But that's almost always a good idea.
647 It allows you to pass request-scoped information between business logic and middlewares,
648 and is necessary for more sophisticated tasks like granular distributed tracing annotations.
649
650 Concretely, this means your business logic interfaces will look like
651
652 ```go
653 type MyService interface {
654 Foo(context.Context, string, int) (string, error)
655 Bar(context.Context, string) error
656 Baz(context.Context) (int, error)
657 }
658 ```
659
642660 ### Request tracing
643661
644662 Once your infrastructure grows beyond a certain size, it becomes important to trace requests through multiple services, so you can identify and troubleshoot hotspots.
648666
649667 It's possible to use Go kit to create a client package to your service, to make consuming your service easier from other Go programs.
650668 Effectively, your client package will provide an implementation of your service interface, which invokes a remote service instance using a specific transport.
651 An example is in the works. Stay tuned.
652
653 ### Threading a context
654
655 The context object is used to carry information across conceptual boundaries in the scope of a single request.
656 In our example, we haven't threaded the context through our business logic.
657 But it may be useful to do so, to get access to request-scoped information like trace IDs.
658 It may also be possible to pass things like loggers and metrics objects through the context, although this is not currently recommended.
669 See [package addsvc/client](https://github.com/go-kit/kit/tree/master/examples/addsvc/client) for an example.
659670
660671 ## Other examples
661672
662 ### Transport-specific
663
664 We plan to have small example services that bind to each of our supported transports.
665 Stay tuned.
666
667673 ### addsvc
668674
669675 [addsvc](https://github.com/go-kit/kit/blob/master/examples/addsvc) was the original example application.
670 It exposes a set of operations over all supported transports.
676 It exposes a set of operations over **all supported transports**.
671677 It's fully logged, instrumented, and uses Zipkin request tracing.
672678 It also demonstrates how to create and use client packages.
673679 It's a good example of a fully-featured Go kit service.
675681 ### profilesvc
676682
677683 [profilesvc](https://github.com/go-kit/kit/blob/master/examples/profilesvc)
678 demonstrates how to use Go kit to build a REST-ish microservice.
684 demonstrates how to use Go kit to build a REST-ish microservice.
679685
680686 ### apigateway
681
682 Track [issue #202](https://github.com/go-kit/kit/issues/202) for progress.
687 [apigateway](https://github.com/go-kit/kit/blob/master/examples/apigateway/main.go)
688 demonstrates how to implement the API gateway pattern,
689 backed by a Consul service discovery system.
690
691 ### shipping
692
693 [shipping](https://github.com/go-kit/kit/tree/master/examples/shipping)
694 is a complete, "real-world" application composed of multiple microservices,
695 based on Domain Driven Design principles.
22 import (
33 "io"
44
5 kitot "github.com/go-kit/kit/tracing/opentracing"
6 "github.com/opentracing/opentracing-go"
57 "google.golang.org/grpc"
68
79 "github.com/go-kit/kit/endpoint"
810 "github.com/go-kit/kit/examples/addsvc/pb"
11 "github.com/go-kit/kit/loadbalancer"
12 "github.com/go-kit/kit/log"
913 grpctransport "github.com/go-kit/kit/transport/grpc"
1014 )
1115
12 // SumEndpointFactory transforms GRPC host:port strings into Endpoints that call the Sum method on a GRPC server
16 // MakeSumEndpointFactory returns a loadbalancer.Factory that transforms GRPC
17 // host:port strings into Endpoints that call the Sum method on a GRPC server
1318 // at that address.
14 func SumEndpointFactory(instance string) (endpoint.Endpoint, io.Closer, error) {
15 cc, err := grpc.Dial(instance, grpc.WithInsecure())
16 return grpctransport.NewClient(
17 cc,
18 "Add",
19 "Sum",
20 encodeSumRequest,
21 decodeSumResponse,
22 pb.SumReply{},
23 ).Endpoint(), cc, err
19 func MakeSumEndpointFactory(tracer opentracing.Tracer, tracingLogger log.Logger) loadbalancer.Factory {
20 return func(instance string) (endpoint.Endpoint, io.Closer, error) {
21 cc, err := grpc.Dial(instance, grpc.WithInsecure())
22 return grpctransport.NewClient(
23 cc,
24 "Add",
25 "Sum",
26 encodeSumRequest,
27 decodeSumResponse,
28 pb.SumReply{},
29 grpctransport.SetClientBefore(kitot.ToGRPCRequest(tracer, tracingLogger)),
30 ).Endpoint(), cc, err
31 }
2432 }
2533
26 // ConcatEndpointFactory transforms GRPC host:port strings into Endpoints that call the Concat method on a GRPC server
27 // at that address.
28 func ConcatEndpointFactory(instance string) (endpoint.Endpoint, io.Closer, error) {
29 cc, err := grpc.Dial(instance, grpc.WithInsecure())
30 return grpctransport.NewClient(
31 cc,
32 "Add",
33 "Concat",
34 encodeConcatRequest,
35 decodeConcatResponse,
36 pb.ConcatReply{},
37 ).Endpoint(), cc, err
34 // MakeConcatEndpointFactory returns a loadbalancer.Factory that transforms
35 // GRPC host:port strings into Endpoints that call the Concat method on a GRPC
36 // server at that address.
37 func MakeConcatEndpointFactory(tracer opentracing.Tracer, tracingLogger log.Logger) loadbalancer.Factory {
38 return func(instance string) (endpoint.Endpoint, io.Closer, error) {
39 cc, err := grpc.Dial(instance, grpc.WithInsecure())
40 return grpctransport.NewClient(
41 cc,
42 "Add",
43 "Concat",
44 encodeConcatRequest,
45 decodeConcatResponse,
46 pb.ConcatReply{},
47 grpctransport.SetClientBefore(kitot.ToGRPCRequest(tracer, tracingLogger)),
48 ).Endpoint(), cc, err
49 }
3850 }
33 "io"
44 "net/url"
55
6 "github.com/opentracing/opentracing-go"
7
68 "github.com/go-kit/kit/endpoint"
79 "github.com/go-kit/kit/examples/addsvc/server"
10 "github.com/go-kit/kit/loadbalancer"
11 "github.com/go-kit/kit/log"
12 kitot "github.com/go-kit/kit/tracing/opentracing"
813 httptransport "github.com/go-kit/kit/transport/http"
914 )
1015
11 // SumEndpointFactory transforms a http url into an Endpoint.
16 // MakeSumEndpointFactory generates a Factory that transforms an http url into
17 // an Endpoint.
18 //
1219 // The path of the url is reset to /sum.
13 func SumEndpointFactory(instance string) (endpoint.Endpoint, io.Closer, error) {
14 sumURL, err := url.Parse(instance)
15 if err != nil {
16 return nil, nil, err
20 func MakeSumEndpointFactory(tracer opentracing.Tracer, tracingLogger log.Logger) loadbalancer.Factory {
21 return func(instance string) (endpoint.Endpoint, io.Closer, error) {
22 sumURL, err := url.Parse(instance)
23 if err != nil {
24 return nil, nil, err
25 }
26 sumURL.Path = "/sum"
27
28 client := httptransport.NewClient(
29 "GET",
30 sumURL,
31 server.EncodeSumRequest,
32 server.DecodeSumResponse,
33 httptransport.SetClient(nil),
34 httptransport.SetClientBefore(kitot.ToHTTPRequest(tracer, tracingLogger)),
35 )
36
37 return client.Endpoint(), nil, nil
1738 }
18 sumURL.Path = "/sum"
19
20 client := httptransport.NewClient(
21 "GET",
22 sumURL,
23 server.EncodeSumRequest,
24 server.DecodeSumResponse,
25 httptransport.SetClient(nil),
26 )
27
28 return client.Endpoint(), nil, nil
2939 }
3040
31 // ConcatEndpointFactory transforms a http url into an Endpoint.
41 // MakeConcatEndpointFactory generates a Factory that transforms an http url
42 // into an Endpoint.
43 //
3244 // The path of the url is reset to /concat.
33 func ConcatEndpointFactory(instance string) (endpoint.Endpoint, io.Closer, error) {
34 concatURL, err := url.Parse(instance)
35 if err != nil {
36 return nil, nil, err
45 func MakeConcatEndpointFactory(tracer opentracing.Tracer, tracingLogger log.Logger) loadbalancer.Factory {
46 return func(instance string) (endpoint.Endpoint, io.Closer, error) {
47 concatURL, err := url.Parse(instance)
48 if err != nil {
49 return nil, nil, err
50 }
51 concatURL.Path = "/concat"
52
53 client := httptransport.NewClient(
54 "GET",
55 concatURL,
56 server.EncodeConcatRequest,
57 server.DecodeConcatResponse,
58 httptransport.SetClient(nil),
59 httptransport.SetClientBefore(kitot.ToHTTPRequest(tracer, tracingLogger)),
60 )
61
62 return client.Endpoint(), nil, nil
3763 }
38 concatURL.Path = "/concat"
39
40 client := httptransport.NewClient(
41 "GET",
42 concatURL,
43 server.EncodeConcatRequest,
44 server.DecodeConcatResponse,
45 httptransport.SetClient(nil),
46 )
47
48 return client.Endpoint(), nil, nil
4964 }
88 "strings"
99 "time"
1010
11 "github.com/lightstep/lightstep-tracer-go"
12 "github.com/opentracing/opentracing-go"
13 appdashot "github.com/sourcegraph/appdash/opentracing"
1114 "golang.org/x/net/context"
15 "sourcegraph.com/sourcegraph/appdash"
1216
1317 "github.com/go-kit/kit/endpoint"
1418 grpcclient "github.com/go-kit/kit/examples/addsvc/client/grpc"
1822 "github.com/go-kit/kit/loadbalancer"
1923 "github.com/go-kit/kit/loadbalancer/static"
2024 "github.com/go-kit/kit/log"
25 kitot "github.com/go-kit/kit/tracing/opentracing"
2126 )
2227
2328 func main() {
3035 thriftProtocol = flag.String("thrift.protocol", "binary", "binary, compact, json, simplejson")
3136 thriftBufferSize = flag.Int("thrift.buffer.size", 0, "0 for unbuffered")
3237 thriftFramed = flag.Bool("thrift.framed", false, "true to enable framing")
38
39 // Two OpenTracing backends (to demonstrate how they can be interchanged):
40 appdashAddr = flag.String("appdash.addr", "", "Enable Appdash tracing via an Appdash server host:port")
41 lightstepAccessToken = flag.String("lightstep.token", "", "Enable LightStep tracing via a LightStep access token")
3342 )
3443 flag.Parse()
3544 if len(os.Args) < 4 {
4756 logger = log.NewLogfmtLogger(os.Stdout)
4857 logger = log.NewContext(logger).With("caller", log.DefaultCaller)
4958 logger = log.NewContext(logger).With("transport", *transport)
59 tracingLogger := log.NewContext(logger).With("component", "tracing")
60
61 // Set up OpenTracing
62 var tracer opentracing.Tracer
63 {
64 switch {
65 case *appdashAddr != "" && *lightstepAccessToken == "":
66 tracer = appdashot.NewTracer(appdash.NewRemoteCollector(*appdashAddr))
67 case *appdashAddr == "" && *lightstepAccessToken != "":
68 tracer = lightstep.NewTracer(lightstep.Options{
69 AccessToken: *lightstepAccessToken,
70 })
71 defer lightstep.FlushLightStepTracer(tracer)
72 case *appdashAddr == "" && *lightstepAccessToken == "":
73 tracer = opentracing.GlobalTracer() // no-op
74 default:
75 panic("specify either -appdash.addr or -lightstep.access.token, not both")
76 }
77 }
5078
5179 var (
5280 instances []string
5684 switch *transport {
5785 case "grpc":
5886 instances = strings.Split(*grpcAddrs, ",")
59 sumFactory = grpcclient.SumEndpointFactory
60 concatFactory = grpcclient.ConcatEndpointFactory
87 sumFactory = grpcclient.MakeSumEndpointFactory(tracer, tracingLogger)
88 concatFactory = grpcclient.MakeConcatEndpointFactory(tracer, tracingLogger)
6189
6290 case "httpjson":
6391 instances = strings.Split(*httpAddrs, ",")
6694 instances[i] = "http://" + rawurl
6795 }
6896 }
69 sumFactory = httpjsonclient.SumEndpointFactory
70 concatFactory = httpjsonclient.ConcatEndpointFactory
97 sumFactory = httpjsonclient.MakeSumEndpointFactory(tracer, tracingLogger)
98 concatFactory = httpjsonclient.MakeConcatEndpointFactory(tracer, tracingLogger)
7199
72100 case "netrpc":
73101 instances = strings.Split(*netrpcAddrs, ",")
85113 os.Exit(1)
86114 }
87115
88 sum := buildEndpoint(instances, sumFactory, randomSeed, logger)
89 concat := buildEndpoint(instances, concatFactory, randomSeed, logger)
116 sum := buildEndpoint(tracer, "sum", instances, sumFactory, randomSeed, logger)
117 concat := buildEndpoint(tracer, "concat", instances, concatFactory, randomSeed, logger)
90118
91119 svc := newClient(root, sum, concat, logger)
92120
109137 }
110138 }
111139
112 func buildEndpoint(instances []string, factory loadbalancer.Factory, seed int64, logger log.Logger) endpoint.Endpoint {
140 func buildEndpoint(tracer opentracing.Tracer, operationName string, instances []string, factory loadbalancer.Factory, seed int64, logger log.Logger) endpoint.Endpoint {
113141 publisher := static.NewPublisher(instances, factory, logger)
114142 random := loadbalancer.NewRandom(publisher, seed)
115 return loadbalancer.Retry(10, 10*time.Second, random)
143 endpoint := loadbalancer.Retry(10, 10*time.Second, random)
144 return kitot.TraceClient(tracer, operationName)(endpoint)
116145 }
22 import (
33 "golang.org/x/net/context"
44
5 "github.com/opentracing/opentracing-go"
6
57 "github.com/go-kit/kit/examples/addsvc/pb"
68 "github.com/go-kit/kit/examples/addsvc/server"
79 servergrpc "github.com/go-kit/kit/examples/addsvc/server/grpc"
10 "github.com/go-kit/kit/log"
11 kitot "github.com/go-kit/kit/tracing/opentracing"
812 "github.com/go-kit/kit/transport/grpc"
913 )
1014
1216 sum, concat grpc.Handler
1317 }
1418
15 func newGRPCBinding(ctx context.Context, svc server.AddService) grpcBinding {
19 func newGRPCBinding(ctx context.Context, tracer opentracing.Tracer, svc server.AddService, tracingLogger log.Logger) grpcBinding {
1620 return grpcBinding{
17 sum: grpc.NewServer(ctx, makeSumEndpoint(svc), servergrpc.DecodeSumRequest, servergrpc.EncodeSumResponse),
18 concat: grpc.NewServer(ctx, makeConcatEndpoint(svc), servergrpc.DecodeConcatRequest, servergrpc.EncodeConcatResponse),
21 sum: grpc.NewServer(
22 ctx,
23 kitot.TraceServer(tracer, "sum")(makeSumEndpoint(svc)),
24 servergrpc.DecodeSumRequest,
25 servergrpc.EncodeSumResponse,
26 grpc.ServerBefore(kitot.FromGRPCRequest(tracer, "", tracingLogger)),
27 ),
28 concat: grpc.NewServer(
29 ctx,
30 kitot.TraceServer(tracer, "concat")(makeConcatEndpoint(svc)),
31 servergrpc.DecodeConcatRequest,
32 servergrpc.EncodeConcatResponse,
33 grpc.ServerBefore(kitot.FromGRPCRequest(tracer, "", tracingLogger)),
34 ),
1935 }
2036 }
2137
1414 "time"
1515
1616 "github.com/apache/thrift/lib/go/thrift"
17 "github.com/lightstep/lightstep-tracer-go"
18 "github.com/opentracing/opentracing-go"
1719 stdprometheus "github.com/prometheus/client_golang/prometheus"
20 appdashot "github.com/sourcegraph/appdash/opentracing"
1821 "golang.org/x/net/context"
1922 "google.golang.org/grpc"
23 "sourcegraph.com/sourcegraph/appdash"
2024
2125 "github.com/go-kit/kit/endpoint"
2226 "github.com/go-kit/kit/examples/addsvc/pb"
2630 "github.com/go-kit/kit/metrics"
2731 "github.com/go-kit/kit/metrics/expvar"
2832 "github.com/go-kit/kit/metrics/prometheus"
33 kitot "github.com/go-kit/kit/tracing/opentracing"
2934 "github.com/go-kit/kit/tracing/zipkin"
3035 httptransport "github.com/go-kit/kit/transport/http"
3136 )
3540 // of glog. So, we define a new flag set, to keep those domains distinct.
3641 fs := flag.NewFlagSet("", flag.ExitOnError)
3742 var (
38 debugAddr = fs.String("debug.addr", ":8000", "Address for HTTP debug/instrumentation server")
39 httpAddr = fs.String("http.addr", ":8001", "Address for HTTP (JSON) server")
40 grpcAddr = fs.String("grpc.addr", ":8002", "Address for gRPC server")
41 netrpcAddr = fs.String("netrpc.addr", ":8003", "Address for net/rpc server")
42 thriftAddr = fs.String("thrift.addr", ":8004", "Address for Thrift server")
43 thriftProtocol = fs.String("thrift.protocol", "binary", "binary, compact, json, simplejson")
44 thriftBufferSize = fs.Int("thrift.buffer.size", 0, "0 for unbuffered")
45 thriftFramed = fs.Bool("thrift.framed", false, "true to enable framing")
46 zipkinHostPort = fs.String("zipkin.host.port", "my.service.domain:12345", "Zipkin host:port")
47 zipkinServiceName = fs.String("zipkin.service.name", "addsvc", "Zipkin service name")
48 zipkinCollectorAddr = fs.String("zipkin.collector.addr", "", "Zipkin Kafka collector address (empty will log spans)")
43 debugAddr = fs.String("debug.addr", ":8000", "Address for HTTP debug/instrumentation server")
44 httpAddr = fs.String("http.addr", ":8001", "Address for HTTP (JSON) server")
45 grpcAddr = fs.String("grpc.addr", ":8002", "Address for gRPC server")
46 netrpcAddr = fs.String("netrpc.addr", ":8003", "Address for net/rpc server")
47 thriftAddr = fs.String("thrift.addr", ":8004", "Address for Thrift server")
48 thriftProtocol = fs.String("thrift.protocol", "binary", "binary, compact, json, simplejson")
49 thriftBufferSize = fs.Int("thrift.buffer.size", 0, "0 for unbuffered")
50 thriftFramed = fs.Bool("thrift.framed", false, "true to enable framing")
51
52 // Supported OpenTracing backends
53 appdashAddr = fs.String("appdash.addr", "", "Enable Appdash tracing via an Appdash server host:port")
54 lightstepAccessToken = fs.String("lightstep.token", "", "Enable LightStep tracing via a LightStep access token")
4955 )
5056 flag.Usage = fs.Usage // only show our flags
5157 if err := fs.Parse(os.Args[1:]); err != nil {
7783 ))
7884 }
7985
80 // package tracing
81 var collector zipkin.Collector
82 {
83 zipkinLogger := log.NewContext(logger).With("component", "zipkin")
84 collector = loggingCollector{zipkinLogger} // TODO(pb)
85 if *zipkinCollectorAddr != "" {
86 var err error
87 if collector, err = zipkin.NewKafkaCollector(
88 []string{*zipkinCollectorAddr},
89 zipkin.KafkaLogger(zipkinLogger),
90 ); err != nil {
91 zipkinLogger.Log("err", err)
92 os.Exit(1)
93 }
86 // Set up OpenTracing
87 var tracer opentracing.Tracer
88 {
89 switch {
90 case *appdashAddr != "" && *lightstepAccessToken == "":
91 tracer = appdashot.NewTracer(appdash.NewRemoteCollector(*appdashAddr))
92 case *appdashAddr == "" && *lightstepAccessToken != "":
93 tracer = lightstep.NewTracer(lightstep.Options{
94 AccessToken: *lightstepAccessToken,
95 })
96 defer lightstep.FlushLightStepTracer(tracer)
97 case *appdashAddr == "" && *lightstepAccessToken == "":
98 tracer = opentracing.GlobalTracer() // no-op
99 default:
100 panic("specify either -appdash.addr or -lightstep.access.token, not both")
94101 }
95102 }
96103
123130 var (
124131 transportLogger = log.NewContext(logger).With("transport", "HTTP/JSON")
125132 tracingLogger = log.NewContext(transportLogger).With("component", "tracing")
126 newSumSpan = zipkin.MakeNewSpanFunc(*zipkinHostPort, *zipkinServiceName, "sum")
127 newConcatSpan = zipkin.MakeNewSpanFunc(*zipkinHostPort, *zipkinServiceName, "concat")
128 traceSum = zipkin.ToContext(newSumSpan, tracingLogger)
129 traceConcat = zipkin.ToContext(newConcatSpan, tracingLogger)
130133 mux = http.NewServeMux()
131134 sum, concat endpoint.Endpoint
132135 )
133136
134137 sum = makeSumEndpoint(svc)
135 sum = zipkin.AnnotateServer(newSumSpan, collector)(sum)
138 sum = kitot.TraceServer(tracer, "sum")(sum)
136139 mux.Handle("/sum", httptransport.NewServer(
137140 root,
138141 sum,
139142 server.DecodeSumRequest,
140143 server.EncodeSumResponse,
141 httptransport.ServerBefore(traceSum),
142144 httptransport.ServerErrorLogger(transportLogger),
145 httptransport.ServerBefore(kitot.FromHTTPRequest(tracer, "sum", tracingLogger)),
143146 ))
144147
145148 concat = makeConcatEndpoint(svc)
146 concat = zipkin.AnnotateServer(newConcatSpan, collector)(concat)
149 concat = kitot.TraceServer(tracer, "concat")(concat)
147150 mux.Handle("/concat", httptransport.NewServer(
148151 root,
149152 concat,
150153 server.DecodeConcatRequest,
151154 server.EncodeConcatResponse,
152 httptransport.ServerBefore(traceConcat),
153155 httptransport.ServerErrorLogger(transportLogger),
156 httptransport.ServerBefore(kitot.FromHTTPRequest(tracer, "concat", tracingLogger)),
154157 ))
155158
156159 transportLogger.Log("addr", *httpAddr)
160163 // Transport: gRPC
161164 go func() {
162165 transportLogger := log.NewContext(logger).With("transport", "gRPC")
166 tracingLogger := log.NewContext(transportLogger).With("component", "tracing")
163167 ln, err := net.Listen("tcp", *grpcAddr)
164168 if err != nil {
165169 errc <- err
166170 return
167171 }
168172 s := grpc.NewServer() // uses its own, internal context
169 pb.RegisterAddServer(s, newGRPCBinding(root, svc))
173 pb.RegisterAddServer(s, newGRPCBinding(root, tracer, svc, tracingLogger))
170174 transportLogger.Log("addr", *grpcAddr)
171175 errc <- s.Serve(ln)
172176 }()
1616
1717 "github.com/gorilla/mux"
1818 "github.com/hashicorp/consul/api"
19 "github.com/opentracing/opentracing-go"
1920 "golang.org/x/net/context"
2021
2122 "github.com/go-kit/kit/endpoint"
7677 factory loadbalancer.Factory
7778 }{
7879 "addsvc": {
79 {path: "/api/addsvc/concat", factory: grpc.ConcatEndpointFactory},
80 {path: "/api/addsvc/sum", factory: grpc.SumEndpointFactory},
80 {path: "/api/addsvc/concat", factory: grpc.MakeConcatEndpointFactory(opentracing.GlobalTracer(), nil)},
81 {path: "/api/addsvc/sum", factory: grpc.MakeSumEndpointFactory(opentracing.GlobalTracer(), nil)},
8182 },
8283 "stringsvc": {
8384 {path: "/api/stringsvc/uppercase", factory: httpFactory(ctx, "GET", "uppercase/")},
00 package log
11
22 import (
3 "bytes"
34 "io"
5 "sync"
46
57 "github.com/go-logfmt/logfmt"
68 )
9
10 type logfmtEncoder struct {
11 *logfmt.Encoder
12 buf bytes.Buffer
13 }
14
15 func (l *logfmtEncoder) Reset() {
16 l.Encoder.Reset()
17 l.buf.Reset()
18 }
19
20 var logfmtEncoderPool = sync.Pool{
21 New: func() interface{} {
22 var enc logfmtEncoder
23 enc.Encoder = logfmt.NewEncoder(&enc.buf)
24 return &enc
25 },
26 }
727
828 type logfmtLogger struct {
929 w io.Writer
1737 }
1838
1939 func (l logfmtLogger) Log(keyvals ...interface{}) error {
40 enc := logfmtEncoderPool.Get().(*logfmtEncoder)
41 enc.Reset()
42 defer logfmtEncoderPool.Put(enc)
43
44 if err := enc.EncodeKeyvals(keyvals...); err != nil {
45 return err
46 }
47
48 // Add newline to the end of the buffer
49 if err := enc.EndRecord(); err != nil {
50 return err
51 }
52
2053 // The Logger interface requires implementations to be safe for concurrent
2154 // use by multiple goroutines. For this implementation that means making
22 // only one call to l.w.Write() for each call to Log. We first collect all
23 // of the bytes into b, and then call l.w.Write(b).
24 b, err := logfmt.MarshalKeyvals(keyvals...)
25 if err != nil {
26 return err
27 }
28 b = append(b, '\n')
29 if _, err := l.w.Write(b); err != nil {
55 // only one call to l.w.Write() for each call to Log.
56 if _, err := l.w.Write(enc.buf.Bytes()); err != nil {
3057 return err
3158 }
3259 return nil
0 // Package discard implements a backend for package metrics that succeeds
1 // without doing anything.
2 package discard
3
4 import "github.com/go-kit/kit/metrics"
5
6 type counter struct {
7 name string
8 }
9
10 // NewCounter returns a Counter that does nothing.
11 func NewCounter(name string) metrics.Counter { return &counter{name} }
12
13 func (c *counter) Name() string { return c.name }
14 func (c *counter) With(metrics.Field) metrics.Counter { return c }
15 func (c *counter) Add(delta uint64) {}
16
17 type gauge struct {
18 name string
19 }
20
21 // NewGauge returns a Gauge that does nothing.
22 func NewGauge(name string) metrics.Gauge { return &gauge{name} }
23
24 func (g *gauge) Name() string { return g.name }
25 func (g *gauge) With(metrics.Field) metrics.Gauge { return g }
26 func (g *gauge) Set(value float64) {}
27 func (g *gauge) Add(delta float64) {}
28 func (g *gauge) Get() float64 { return 0 }
29
30 type histogram struct {
31 name string
32 }
33
34 // NewHistogram returns a Histogram that does nothing.
35 func NewHistogram(name string) metrics.Histogram { return &histogram{name} }
36
37 func (h *histogram) Name() string { return h.name }
38 func (h *histogram) With(metrics.Field) metrics.Histogram { return h }
39 func (h *histogram) Observe(value int64) {}
40 func (h *histogram) Distribution() ([]metrics.Bucket, []metrics.Quantile) {
41 return []metrics.Bucket{}, []metrics.Quantile{}
42 }
2323
2424 const maxBufferSize = 1400 // bytes
2525
26 type dogstatsdCounter struct {
26 type counter struct {
2727 key string
2828 c chan string
2929 tags []metrics.Field
4141 // NewCounterTick is the same as NewCounter, but allows the user to pass in a
4242 // ticker channel instead of invoking time.Tick.
4343 func NewCounterTick(w io.Writer, key string, reportTicker <-chan time.Time, tags []metrics.Field) metrics.Counter {
44 c := &dogstatsdCounter{
44 c := &counter{
4545 key: key,
4646 c: make(chan string),
4747 tags: tags,
5050 return c
5151 }
5252
53 func (c *dogstatsdCounter) Name() string { return c.key }
54
55 func (c *dogstatsdCounter) With(f metrics.Field) metrics.Counter {
56 return &dogstatsdCounter{
53 func (c *counter) Name() string { return c.key }
54
55 func (c *counter) With(f metrics.Field) metrics.Counter {
56 return &counter{
5757 key: c.key,
5858 c: c.c,
5959 tags: append(c.tags, f),
6060 }
6161 }
6262
63 func (c *dogstatsdCounter) Add(delta uint64) { c.c <- applyTags(fmt.Sprintf("%d|c", delta), c.tags) }
64
65 type dogstatsdGauge struct {
63 func (c *counter) Add(delta uint64) { c.c <- applyTags(fmt.Sprintf("%d|c", delta), c.tags) }
64
65 type gauge struct {
6666 key string
6767 lastValue uint64 // math.Float64frombits
6868 g chan string
8181 // NewGaugeTick is the same as NewGauge, but allows the user to pass in a ticker
8282 // channel instead of invoking time.Tick.
8383 func NewGaugeTick(w io.Writer, key string, reportTicker <-chan time.Time, tags []metrics.Field) metrics.Gauge {
84 g := &dogstatsdGauge{
84 g := &gauge{
8585 key: key,
8686 g: make(chan string),
8787 tags: tags,
9090 return g
9191 }
9292
93 func (g *dogstatsdGauge) Name() string { return g.key }
94
95 func (g *dogstatsdGauge) With(f metrics.Field) metrics.Gauge {
96 return &dogstatsdGauge{
93 func (g *gauge) Name() string { return g.key }
94
95 func (g *gauge) With(f metrics.Field) metrics.Gauge {
96 return &gauge{
9797 key: g.key,
9898 lastValue: g.lastValue,
9999 g: g.g,
101101 }
102102 }
103103
104 func (g *dogstatsdGauge) Add(delta float64) {
104 func (g *gauge) Add(delta float64) {
105105 // https://github.com/etsy/statsd/blob/master/docs/metric_types.md#gauges
106106 sign := "+"
107107 if delta < 0 {
110110 g.g <- applyTags(fmt.Sprintf("%s%f|g", sign, delta), g.tags)
111111 }
112112
113 func (g *dogstatsdGauge) Set(value float64) {
113 func (g *gauge) Set(value float64) {
114114 atomic.StoreUint64(&g.lastValue, math.Float64bits(value))
115115 g.g <- applyTags(fmt.Sprintf("%f|g", value), g.tags)
116116 }
117117
118 func (g *dogstatsdGauge) Get() float64 {
118 func (g *gauge) Get() float64 {
119119 return math.Float64frombits(atomic.LoadUint64(&g.lastValue))
120120 }
121121
146146 return c
147147 }
148148
149 type dogstatsdHistogram struct {
149 type histogram struct {
150150 key string
151151 h chan string
152152 tags []metrics.Field
176176 // NewHistogramTick is the same as NewHistogram, but allows the user to pass a
177177 // ticker channel instead of invoking time.Tick.
178178 func NewHistogramTick(w io.Writer, key string, reportTicker <-chan time.Time, tags []metrics.Field) metrics.Histogram {
179 h := &dogstatsdHistogram{
179 h := &histogram{
180180 key: key,
181181 h: make(chan string),
182182 tags: tags,
185185 return h
186186 }
187187
188 func (h *dogstatsdHistogram) Name() string { return h.key }
189
190 func (h *dogstatsdHistogram) With(f metrics.Field) metrics.Histogram {
191 return &dogstatsdHistogram{
188 func (h *histogram) Name() string { return h.key }
189
190 func (h *histogram) With(f metrics.Field) metrics.Histogram {
191 return &histogram{
192192 key: h.key,
193193 h: h.h,
194194 tags: append(h.tags, f),
195195 }
196196 }
197197
198 func (h *dogstatsdHistogram) Observe(value int64) {
198 func (h *histogram) Observe(value int64) {
199199 h.h <- applyTags(fmt.Sprintf("%d|ms", value), h.tags)
200200 }
201201
202 func (h *dogstatsdHistogram) Distribution() ([]metrics.Bucket, []metrics.Quantile) {
202 func (h *histogram) Distribution() ([]metrics.Bucket, []metrics.Quantile) {
203203 // TODO(pb): no way to do this without introducing e.g. codahale/hdrhistogram
204204 return []metrics.Bucket{}, []metrics.Quantile{}
205205 }
1313 // PrometheusLabelValueUnknown.
1414 var PrometheusLabelValueUnknown = "unknown"
1515
16 type prometheusCounter struct {
16 type counter struct {
1717 *prometheus.CounterVec
1818 name string
1919 Pairs map[string]string
2828 for _, fieldName := range fieldKeys {
2929 p[fieldName] = PrometheusLabelValueUnknown
3030 }
31 return prometheusCounter{
31 return counter{
3232 CounterVec: m,
3333 name: opts.Name,
3434 Pairs: p,
3535 }
3636 }
3737
38 func (c prometheusCounter) Name() string { return c.name }
39
40 func (c prometheusCounter) With(f metrics.Field) metrics.Counter {
41 return prometheusCounter{
38 func (c counter) Name() string { return c.name }
39
40 func (c counter) With(f metrics.Field) metrics.Counter {
41 return counter{
4242 CounterVec: c.CounterVec,
4343 name: c.name,
4444 Pairs: merge(c.Pairs, f),
4545 }
4646 }
4747
48 func (c prometheusCounter) Add(delta uint64) {
48 func (c counter) Add(delta uint64) {
4949 c.CounterVec.With(prometheus.Labels(c.Pairs)).Add(float64(delta))
5050 }
5151
52 type prometheusGauge struct {
52 type gauge struct {
5353 *prometheus.GaugeVec
5454 name string
5555 Pairs map[string]string
6060 func NewGauge(opts prometheus.GaugeOpts, fieldKeys []string) metrics.Gauge {
6161 m := prometheus.NewGaugeVec(opts, fieldKeys)
6262 prometheus.MustRegister(m)
63 return prometheusGauge{
63 return gauge{
6464 GaugeVec: m,
6565 name: opts.Name,
6666 Pairs: pairsFrom(fieldKeys),
6767 }
6868 }
6969
70 func (g prometheusGauge) Name() string { return g.name }
71
72 func (g prometheusGauge) With(f metrics.Field) metrics.Gauge {
73 return prometheusGauge{
70 func (g gauge) Name() string { return g.name }
71
72 func (g gauge) With(f metrics.Field) metrics.Gauge {
73 return gauge{
7474 GaugeVec: g.GaugeVec,
7575 name: g.name,
7676 Pairs: merge(g.Pairs, f),
7777 }
7878 }
7979
80 func (g prometheusGauge) Set(value float64) {
80 func (g gauge) Set(value float64) {
8181 g.GaugeVec.With(prometheus.Labels(g.Pairs)).Set(value)
8282 }
8383
84 func (g prometheusGauge) Add(delta float64) {
84 func (g gauge) Add(delta float64) {
8585 g.GaugeVec.With(prometheus.Labels(g.Pairs)).Add(delta)
8686 }
8787
88 func (g prometheusGauge) Get() float64 {
88 func (g gauge) Get() float64 {
8989 // TODO(pb): see https://github.com/prometheus/client_golang/issues/58
9090 return 0.0
9191 }
9898 prometheus.MustRegister(prometheus.NewGaugeFunc(opts, callback))
9999 }
100100
101 type prometheusSummary struct {
101 type summary struct {
102102 *prometheus.SummaryVec
103103 name string
104104 Pairs map[string]string
112112 func NewSummary(opts prometheus.SummaryOpts, fieldKeys []string) metrics.Histogram {
113113 m := prometheus.NewSummaryVec(opts, fieldKeys)
114114 prometheus.MustRegister(m)
115 return prometheusSummary{
115 return summary{
116116 SummaryVec: m,
117117 name: opts.Name,
118118 Pairs: pairsFrom(fieldKeys),
119119 }
120120 }
121121
122 func (s prometheusSummary) Name() string { return s.name }
123
124 func (s prometheusSummary) With(f metrics.Field) metrics.Histogram {
125 return prometheusSummary{
122 func (s summary) Name() string { return s.name }
123
124 func (s summary) With(f metrics.Field) metrics.Histogram {
125 return summary{
126126 SummaryVec: s.SummaryVec,
127127 name: s.name,
128128 Pairs: merge(s.Pairs, f),
129129 }
130130 }
131131
132 func (s prometheusSummary) Observe(value int64) {
132 func (s summary) Observe(value int64) {
133133 s.SummaryVec.With(prometheus.Labels(s.Pairs)).Observe(float64(value))
134134 }
135135
136 func (s prometheusSummary) Distribution() ([]metrics.Bucket, []metrics.Quantile) {
136 func (s summary) Distribution() ([]metrics.Bucket, []metrics.Quantile) {
137137 // TODO(pb): see https://github.com/prometheus/client_golang/issues/58
138138 return []metrics.Bucket{}, []metrics.Quantile{}
139139 }
140140
141 type prometheusHistogram struct {
141 type histogram struct {
142142 *prometheus.HistogramVec
143143 name string
144144 Pairs map[string]string
152152 func NewHistogram(opts prometheus.HistogramOpts, fieldKeys []string) metrics.Histogram {
153153 m := prometheus.NewHistogramVec(opts, fieldKeys)
154154 prometheus.MustRegister(m)
155 return prometheusHistogram{
155 return histogram{
156156 HistogramVec: m,
157157 name: opts.Name,
158158 Pairs: pairsFrom(fieldKeys),
159159 }
160160 }
161161
162 func (h prometheusHistogram) Name() string { return h.name }
163
164 func (h prometheusHistogram) With(f metrics.Field) metrics.Histogram {
165 return prometheusHistogram{
162 func (h histogram) Name() string { return h.name }
163
164 func (h histogram) With(f metrics.Field) metrics.Histogram {
165 return histogram{
166166 HistogramVec: h.HistogramVec,
167167 name: h.name,
168168 Pairs: merge(h.Pairs, f),
169169 }
170170 }
171171
172 func (h prometheusHistogram) Observe(value int64) {
172 func (h histogram) Observe(value int64) {
173173 h.HistogramVec.With(prometheus.Labels(h.Pairs)).Observe(float64(value))
174174 }
175175
176 func (h prometheusHistogram) Distribution() ([]metrics.Bucket, []metrics.Quantile) {
176 func (h histogram) Distribution() ([]metrics.Bucket, []metrics.Quantile) {
177177 // TODO(pb): see https://github.com/prometheus/client_golang/issues/58
178178 return []metrics.Bucket{}, []metrics.Quantile{}
179179 }
2929
3030 const maxBufferSize = 1400 // bytes
3131
32 type statsdCounter struct {
32 type counter struct {
3333 key string
3434 c chan string
3535 }
4747 // NewCounterTick is the same as NewCounter, but allows the user to pass in a
4848 // ticker channel instead of invoking time.Tick.
4949 func NewCounterTick(w io.Writer, key string, reportTicker <-chan time.Time) metrics.Counter {
50 c := &statsdCounter{
50 c := &counter{
5151 key: key,
5252 c: make(chan string),
5353 }
5555 return c
5656 }
5757
58 func (c *statsdCounter) Name() string { return c.key }
59
60 func (c *statsdCounter) With(metrics.Field) metrics.Counter { return c }
61
62 func (c *statsdCounter) Add(delta uint64) { c.c <- fmt.Sprintf("%d|c", delta) }
63
64 type statsdGauge struct {
58 func (c *counter) Name() string { return c.key }
59
60 func (c *counter) With(metrics.Field) metrics.Counter { return c }
61
62 func (c *counter) Add(delta uint64) { c.c <- fmt.Sprintf("%d|c", delta) }
63
64 type gauge struct {
6565 key string
6666 lastValue uint64 // math.Float64frombits
6767 g chan string
8080 // NewGaugeTick is the same as NewGauge, but allows the user to pass in a ticker
8181 // channel instead of invoking time.Tick.
8282 func NewGaugeTick(w io.Writer, key string, reportTicker <-chan time.Time) metrics.Gauge {
83 g := &statsdGauge{
83 g := &gauge{
8484 key: key,
8585 g: make(chan string),
8686 }
8888 return g
8989 }
9090
91 func (g *statsdGauge) Name() string { return g.key }
92
93 func (g *statsdGauge) With(metrics.Field) metrics.Gauge { return g }
94
95 func (g *statsdGauge) Add(delta float64) {
91 func (g *gauge) Name() string { return g.key }
92
93 func (g *gauge) With(metrics.Field) metrics.Gauge { return g }
94
95 func (g *gauge) Add(delta float64) {
9696 // https://github.com/etsy/statsd/blob/master/docs/metric_types.md#gauges
9797 sign := "+"
9898 if delta < 0 {
101101 g.g <- fmt.Sprintf("%s%f|g", sign, delta)
102102 }
103103
104 func (g *statsdGauge) Set(value float64) {
104 func (g *gauge) Set(value float64) {
105105 atomic.StoreUint64(&g.lastValue, math.Float64bits(value))
106106 g.g <- fmt.Sprintf("%f|g", value)
107107 }
108108
109 func (g *statsdGauge) Get() float64 {
109 func (g *gauge) Get() float64 {
110110 return math.Float64frombits(atomic.LoadUint64(&g.lastValue))
111111 }
112112
137137 return c
138138 }
139139
140 type statsdHistogram struct {
140 type histogram struct {
141141 key string
142142 h chan string
143143 }
166166 // NewHistogramTick is the same as NewHistogram, but allows the user to pass a
167167 // ticker channel instead of invoking time.Tick.
168168 func NewHistogramTick(w io.Writer, key string, reportTicker <-chan time.Time) metrics.Histogram {
169 h := &statsdHistogram{
169 h := &histogram{
170170 key: key,
171171 h: make(chan string),
172172 }
174174 return h
175175 }
176176
177 func (h *statsdHistogram) Name() string { return h.key }
178
179 func (h *statsdHistogram) With(metrics.Field) metrics.Histogram { return h }
180
181 func (h *statsdHistogram) Observe(value int64) {
177 func (h *histogram) Name() string { return h.key }
178
179 func (h *histogram) With(metrics.Field) metrics.Histogram { return h }
180
181 func (h *histogram) Observe(value int64) {
182182 h.h <- fmt.Sprintf("%d|ms", value)
183183 }
184184
185 func (h *statsdHistogram) Distribution() ([]metrics.Bucket, []metrics.Quantile) {
185 func (h *histogram) Distribution() ([]metrics.Bucket, []metrics.Quantile) {
186186 // TODO(pb): no way to do this without introducing e.g. codahale/hdrhistogram
187187 return []metrics.Bucket{}, []metrics.Quantile{}
188188 }
0 package opentracing
1
2 import (
3 "github.com/opentracing/opentracing-go"
4 otext "github.com/opentracing/opentracing-go/ext"
5 "golang.org/x/net/context"
6
7 "github.com/go-kit/kit/endpoint"
8 )
9
10 // TraceServer returns a Middleware that wraps the `next` Endpoint in an
11 // OpenTracing Span called `operationName`.
12 //
13 // If `ctx` already has a Span, it is re-used and the operation name is
14 // overwritten. If `ctx` does not yet have a Span, one is created here.
15 func TraceServer(tracer opentracing.Tracer, operationName string) endpoint.Middleware {
16 return func(next endpoint.Endpoint) endpoint.Endpoint {
17 return func(ctx context.Context, request interface{}) (interface{}, error) {
18 serverSpan := opentracing.SpanFromContext(ctx)
19 if serverSpan == nil {
20 // All we can do is create a new root span.
21 serverSpan = tracer.StartSpan(operationName)
22 } else {
23 serverSpan.SetOperationName(operationName)
24 }
25 defer serverSpan.Finish()
26 otext.SpanKind.Set(serverSpan, otext.SpanKindRPCServer)
27 ctx = opentracing.ContextWithSpan(ctx, serverSpan)
28 return next(ctx, request)
29 }
30 }
31 }
32
33 // TraceClient returns a Middleware that wraps the `next` Endpoint in an
34 // OpenTracing Span called `operationName`.
35 func TraceClient(tracer opentracing.Tracer, operationName string) endpoint.Middleware {
36 return func(next endpoint.Endpoint) endpoint.Endpoint {
37 return func(ctx context.Context, request interface{}) (interface{}, error) {
38 parentSpan := opentracing.SpanFromContext(ctx)
39 clientSpan := tracer.StartSpanWithOptions(opentracing.StartSpanOptions{
40 OperationName: operationName,
41 Parent: parentSpan, // may be nil
42 })
43 defer clientSpan.Finish()
44 otext.SpanKind.Set(clientSpan, otext.SpanKindRPCClient)
45 ctx = opentracing.ContextWithSpan(ctx, clientSpan)
46 return next(ctx, request)
47 }
48 }
49 }
0 package opentracing_test
1
2 import (
3 "testing"
4
5 "github.com/opentracing/opentracing-go"
6 "github.com/opentracing/opentracing-go/mocktracer"
7 "golang.org/x/net/context"
8
9 "github.com/go-kit/kit/endpoint"
10 kitot "github.com/go-kit/kit/tracing/opentracing"
11 )
12
13 func TestTraceServer(t *testing.T) {
14 tracer := mocktracer.New()
15
16 // Initialize the ctx with a nameless Span.
17 contextSpan := tracer.StartSpan("").(*mocktracer.MockSpan)
18 ctx := opentracing.ContextWithSpan(context.Background(), contextSpan)
19
20 var innerEndpoint endpoint.Endpoint
21 innerEndpoint = func(context.Context, interface{}) (interface{}, error) {
22 return struct{}{}, nil
23 }
24 tracedEndpoint := kitot.TraceServer(tracer, "testOp")(innerEndpoint)
25 if _, err := tracedEndpoint(ctx, struct{}{}); err != nil {
26 t.Fatal(err)
27 }
28 if want, have := 1, len(tracer.FinishedSpans); want != have {
29 t.Fatalf("Want %v span(s), found %v", want, have)
30 }
31
32 endpointSpan := tracer.FinishedSpans[0]
33 // Test that the op name is updated
34 if want, have := "testOp", endpointSpan.OperationName; want != have {
35 t.Fatalf("Want %q, have %q", want, have)
36 }
37 // ... and that the ID is unmodified.
38 if want, have := contextSpan.SpanID, endpointSpan.SpanID; want != have {
39 t.Errorf("Want SpanID %q, have %q", want, have)
40 }
41 }
42
43 func TestTraceServerNoContextSpan(t *testing.T) {
44 tracer := mocktracer.New()
45
46 var innerEndpoint endpoint.Endpoint
47 innerEndpoint = func(context.Context, interface{}) (interface{}, error) {
48 return struct{}{}, nil
49 }
50 tracedEndpoint := kitot.TraceServer(tracer, "testOp")(innerEndpoint)
51 // Empty/background context:
52 if _, err := tracedEndpoint(context.Background(), struct{}{}); err != nil {
53 t.Fatal(err)
54 }
55 // tracedEndpoint created a new Span:
56 if want, have := 1, len(tracer.FinishedSpans); want != have {
57 t.Fatalf("Want %v span(s), found %v", want, have)
58 }
59
60 endpointSpan := tracer.FinishedSpans[0]
61 if want, have := "testOp", endpointSpan.OperationName; want != have {
62 t.Fatalf("Want %q, have %q", want, have)
63 }
64 }
65
66 func TestTraceClient(t *testing.T) {
67 tracer := mocktracer.New()
68
69 // Initialize the ctx with a parent Span.
70 parentSpan := tracer.StartSpan("parent").(*mocktracer.MockSpan)
71 defer parentSpan.Finish()
72 ctx := opentracing.ContextWithSpan(context.Background(), parentSpan)
73
74 var innerEndpoint endpoint.Endpoint
75 innerEndpoint = func(context.Context, interface{}) (interface{}, error) {
76 return struct{}{}, nil
77 }
78 tracedEndpoint := kitot.TraceClient(tracer, "testOp")(innerEndpoint)
79 if _, err := tracedEndpoint(ctx, struct{}{}); err != nil {
80 t.Fatal(err)
81 }
82 // tracedEndpoint created a new Span:
83 if want, have := 1, len(tracer.FinishedSpans); want != have {
84 t.Fatalf("Want %v span(s), found %v", want, have)
85 }
86
87 endpointSpan := tracer.FinishedSpans[0]
88 if want, have := "testOp", endpointSpan.OperationName; want != have {
89 t.Fatalf("Want %q, have %q", want, have)
90 }
91 // ... and that the parent ID is set appropriately.
92 if want, have := parentSpan.SpanID, endpointSpan.ParentID; want != have {
93 t.Errorf("Want ParentID %q, have %q", want, have)
94 }
95 }
0 package opentracing
1
2 import (
3 "encoding/base64"
4 "strings"
5
6 "github.com/opentracing/opentracing-go"
7 "golang.org/x/net/context"
8 "google.golang.org/grpc/metadata"
9
10 "github.com/go-kit/kit/log"
11 )
12
13 // ToGRPCRequest returns a grpc RequestFunc that injects an OpenTracing Span
14 // found in `ctx` into the grpc Metadata. If no such Span can be found, the
15 // RequestFunc is a noop.
16 //
17 // The logger is used to report errors and may be nil.
18 func ToGRPCRequest(tracer opentracing.Tracer, logger log.Logger) func(ctx context.Context, md *metadata.MD) context.Context {
19 return func(ctx context.Context, md *metadata.MD) context.Context {
20 if span := opentracing.SpanFromContext(ctx); span != nil {
21 // There's nothing we can do with an error here.
22 err := tracer.Inject(span, opentracing.TextMap, metadataReaderWriter{md})
23 if err != nil && logger != nil {
24 logger.Log("msg", "Inject failed", "err", err)
25 }
26 }
27 return ctx
28 }
29 }
30
31 // FromGRPCRequest returns a grpc RequestFunc that tries to join with an
32 // OpenTracing trace found in `req` and starts a new Span called
33 // `operationName` accordingly. If no trace could be found in `req`, the Span
34 // will be a trace root. The Span is incorporated in the returned Context and
35 // can be retrieved with opentracing.SpanFromContext(ctx).
36 //
37 // The logger is used to report errors and may be nil.
38 func FromGRPCRequest(tracer opentracing.Tracer, operationName string, logger log.Logger) func(ctx context.Context, md *metadata.MD) context.Context {
39 return func(ctx context.Context, md *metadata.MD) context.Context {
40 span, err := tracer.Join(operationName, opentracing.TextMap, metadataReaderWriter{md})
41 if err != nil && logger != nil {
42 logger.Log("msg", "Join failed", "err", err)
43 }
44 if span == nil {
45 span = tracer.StartSpan(operationName)
46 }
47 return opentracing.ContextWithSpan(ctx, span)
48 }
49 }
50
51 // A type that conforms to opentracing.TextMapReader and
52 // opentracing.TextMapWriter.
53 type metadataReaderWriter struct {
54 *metadata.MD
55 }
56
57 func (w metadataReaderWriter) Set(key, val string) {
58 key = strings.ToLower(key)
59 if strings.HasSuffix(key, "-bin") {
60 val = string(base64.StdEncoding.EncodeToString([]byte(val)))
61 }
62 (*w.MD)[key] = append((*w.MD)[key], val)
63 }
64
65 func (w metadataReaderWriter) ForeachKey(handler func(key, val string) error) error {
66 for k, vals := range *w.MD {
67 for _, v := range vals {
68 if err := handler(k, v); err != nil {
69 return err
70 }
71 }
72 }
73 return nil
74 }
0 package opentracing_test
1
2 import (
3 "testing"
4
5 "github.com/opentracing/opentracing-go"
6 "github.com/opentracing/opentracing-go/mocktracer"
7 "golang.org/x/net/context"
8 "google.golang.org/grpc/metadata"
9
10 kitot "github.com/go-kit/kit/tracing/opentracing"
11 "github.com/go-kit/kit/transport/grpc"
12 )
13
14 func TestTraceGRPCRequestRoundtrip(t *testing.T) {
15 tracer := mocktracer.New()
16
17 // Initialize the ctx with a Span to inject.
18 beforeSpan := tracer.StartSpan("to_inject").(*mocktracer.MockSpan)
19 defer beforeSpan.Finish()
20 beforeSpan.SetBaggageItem("baggage", "check")
21 beforeCtx := opentracing.ContextWithSpan(context.Background(), beforeSpan)
22
23 var toGRPCFunc grpc.RequestFunc = kitot.ToGRPCRequest(tracer, nil)
24 md := metadata.Pairs()
25 // Call the RequestFunc.
26 afterCtx := toGRPCFunc(beforeCtx, &md)
27
28 // The Span should not have changed.
29 afterSpan := opentracing.SpanFromContext(afterCtx)
30 if beforeSpan != afterSpan {
31 t.Errorf("Should not swap in a new span")
32 }
33
34 // No spans should have finished yet.
35 if want, have := 0, len(tracer.FinishedSpans); want != have {
36 t.Errorf("Want %v span(s), found %v", want, have)
37 }
38
39 // Use FromGRPCRequest to verify that we can join with the trace given MD.
40 var fromGRPCFunc grpc.RequestFunc = kitot.FromGRPCRequest(tracer, "joined", nil)
41 joinCtx := fromGRPCFunc(afterCtx, &md)
42 joinedSpan := opentracing.SpanFromContext(joinCtx).(*mocktracer.MockSpan)
43
44 if joinedSpan.SpanID == beforeSpan.SpanID {
45 t.Error("SpanID should have changed", joinedSpan.SpanID, beforeSpan.SpanID)
46 }
47
48 // Check that the parent/child relationship is as expected for the joined span.
49 if want, have := beforeSpan.SpanID, joinedSpan.ParentID; want != have {
50 t.Errorf("Want ParentID %q, have %q", want, have)
51 }
52 if want, have := "joined", joinedSpan.OperationName; want != have {
53 t.Errorf("Want %q, have %q", want, have)
54 }
55 if want, have := "check", joinedSpan.BaggageItem("baggage"); want != have {
56 t.Errorf("Want %q, have %q", want, have)
57 }
58 }
0 package opentracing
1
2 import (
3 "net"
4 "net/http"
5 "strconv"
6
7 "github.com/opentracing/opentracing-go"
8 "github.com/opentracing/opentracing-go/ext"
9 "golang.org/x/net/context"
10
11 "github.com/go-kit/kit/log"
12 kithttp "github.com/go-kit/kit/transport/http"
13 )
14
15 // ToHTTPRequest returns an http RequestFunc that injects an OpenTracing Span
16 // found in `ctx` into the http headers. If no such Span can be found, the
17 // RequestFunc is a noop.
18 //
19 // The logger is used to report errors and may be nil.
20 func ToHTTPRequest(tracer opentracing.Tracer, logger log.Logger) kithttp.RequestFunc {
21 return func(ctx context.Context, req *http.Request) context.Context {
22 // Try to find a Span in the Context.
23 if span := opentracing.SpanFromContext(ctx); span != nil {
24 // Add standard OpenTracing tags.
25 ext.HTTPMethod.Set(span, req.URL.RequestURI())
26 host, portString, err := net.SplitHostPort(req.URL.Host)
27 if err == nil {
28 ext.PeerHostname.Set(span, host)
29 if port, err := strconv.Atoi(portString); err != nil {
30 ext.PeerPort.Set(span, uint16(port))
31 }
32 } else {
33 ext.PeerHostname.Set(span, req.URL.Host)
34 }
35
36 // There's nothing we can do with any errors here.
37 err = tracer.Inject(
38 span,
39 opentracing.TextMap,
40 opentracing.HTTPHeaderTextMapCarrier(req.Header),
41 )
42 if err != nil && logger != nil {
43 logger.Log("msg", "Join failed", "err", err)
44 }
45 }
46 return ctx
47 }
48 }
49
50 // FromHTTPRequest returns an http RequestFunc that tries to join with an
51 // OpenTracing trace found in `req` and starts a new Span called
52 // `operationName` accordingly. If no trace could be found in `req`, the Span
53 // will be a trace root. The Span is incorporated in the returned Context and
54 // can be retrieved with opentracing.SpanFromContext(ctx).
55 //
56 // The logger is used to report errors and may be nil.
57 func FromHTTPRequest(tracer opentracing.Tracer, operationName string, logger log.Logger) kithttp.RequestFunc {
58 return func(ctx context.Context, req *http.Request) context.Context {
59 // Try to join to a trace propagated in `req`. There's nothing we can
60 // do with any errors here, so we ignore them.
61 span, err := tracer.Join(
62 operationName,
63 opentracing.TextMap,
64 opentracing.HTTPHeaderTextMapCarrier(req.Header),
65 )
66 if err != nil && logger != nil {
67 logger.Log("msg", "Join failed", "err", err)
68 }
69 if span == nil {
70 span = opentracing.StartSpan(operationName)
71 }
72 return opentracing.ContextWithSpan(ctx, span)
73 }
74 }
0 package opentracing_test
1
2 import (
3 "net/http"
4 "testing"
5
6 "github.com/opentracing/opentracing-go"
7 "github.com/opentracing/opentracing-go/mocktracer"
8 "golang.org/x/net/context"
9
10 kitot "github.com/go-kit/kit/tracing/opentracing"
11 kithttp "github.com/go-kit/kit/transport/http"
12 )
13
14 func TestTraceHTTPRequestRoundtrip(t *testing.T) {
15 tracer := mocktracer.New()
16
17 // Initialize the ctx with a Span to inject.
18 beforeSpan := tracer.StartSpan("to_inject").(*mocktracer.MockSpan)
19 defer beforeSpan.Finish()
20 beforeSpan.SetBaggageItem("baggage", "check")
21 beforeCtx := opentracing.ContextWithSpan(context.Background(), beforeSpan)
22
23 var toHTTPFunc kithttp.RequestFunc = kitot.ToHTTPRequest(tracer, nil)
24 req, _ := http.NewRequest("GET", "http://test.biz/url", nil)
25 // Call the RequestFunc.
26 afterCtx := toHTTPFunc(beforeCtx, req)
27
28 // The Span should not have changed.
29 afterSpan := opentracing.SpanFromContext(afterCtx)
30 if beforeSpan != afterSpan {
31 t.Errorf("Should not swap in a new span")
32 }
33
34 // No spans should have finished yet.
35 if want, have := 0, len(tracer.FinishedSpans); want != have {
36 t.Errorf("Want %v span(s), found %v", want, have)
37 }
38
39 // Use FromHTTPRequest to verify that we can join with the trace given a req.
40 var fromHTTPFunc kithttp.RequestFunc = kitot.FromHTTPRequest(tracer, "joined", nil)
41 joinCtx := fromHTTPFunc(afterCtx, req)
42 joinedSpan := opentracing.SpanFromContext(joinCtx).(*mocktracer.MockSpan)
43
44 if joinedSpan.SpanID == beforeSpan.SpanID {
45 t.Error("SpanID should have changed", joinedSpan.SpanID, beforeSpan.SpanID)
46 }
47
48 // Check that the parent/child relationship is as expected for the joined span.
49 if want, have := beforeSpan.SpanID, joinedSpan.ParentID; want != have {
50 t.Errorf("Want ParentID %q, have %q", want, have)
51 }
52 if want, have := "joined", joinedSpan.OperationName; want != have {
53 t.Errorf("Want %q, have %q", want, have)
54 }
55 if want, have := "check", joinedSpan.BaggageItem("baggage"); want != have {
56 t.Errorf("Want %q, have %q", want, have)
57 }
58 }
66 )
77
88 // RequestFunc may take information from an HTTP request and put it into a
9 // request context. In Servers, BeforeFuncs are executed prior to invoking the
10 // endpoint. In Clients, BeforeFuncs are executed after creating the request
9 // request context. In Servers, RequestFuncs are executed prior to invoking the
10 // endpoint. In Clients, RequestFuncs are executed after creating the request
1111 // but prior to invoking the HTTP client.
1212 type RequestFunc func(context.Context, *http.Request) context.Context
1313