Codebase list golang-github-go-kit-kit / 7b60fe7
Demonstrate the OpenTracing bindings via addsvc Rather than support both Zipkin and OpenTracing, this diff replaces the former with the latter. It would be feasible to support both, though. Ben Sigelman 8 years ago
7 changed file(s) with 175 addition(s) and 100 deletion(s). Raw diff Collapse all Expand all
66
77 "github.com/go-kit/kit/endpoint"
88 "github.com/go-kit/kit/examples/addsvc/pb"
9 "github.com/go-kit/kit/loadbalancer"
10 kitot "github.com/go-kit/kit/tracing/opentracing"
911 grpctransport "github.com/go-kit/kit/transport/grpc"
12 "github.com/opentracing/opentracing-go"
1013 )
1114
1215 // SumEndpointFactory transforms GRPC host:port strings into Endpoints that call the Sum method on a GRPC server
1316 // at that address.
14 func SumEndpointFactory(instance string) (endpoint.Endpoint, io.Closer, error) {
15 cc, err := grpc.Dial(instance, grpc.WithInsecure())
16 return grpctransport.NewClient(
17 cc,
18 "Add",
19 "Sum",
20 encodeSumRequest,
21 decodeSumResponse,
22 pb.SumReply{},
23 ).Endpoint(), cc, err
17 func NewSumEndpointFactory(tracer opentracing.Tracer) loadbalancer.Factory {
18 return func(instance string) (endpoint.Endpoint, io.Closer, error) {
19 cc, err := grpc.Dial(instance, grpc.WithInsecure())
20 return grpctransport.NewClient(
21 cc,
22 "Add",
23 "Sum",
24 encodeSumRequest,
25 decodeSumResponse,
26 pb.SumReply{},
27 grpctransport.SetClientBefore(kitot.ToGRPCRequest(tracer)),
28 ).Endpoint(), cc, err
29 }
2430 }
2531
2632 // ConcatEndpointFactory transforms GRPC host:port strings into Endpoints that call the Concat method on a GRPC server
2733 // at that address.
28 func ConcatEndpointFactory(instance string) (endpoint.Endpoint, io.Closer, error) {
29 cc, err := grpc.Dial(instance, grpc.WithInsecure())
30 return grpctransport.NewClient(
31 cc,
32 "Add",
33 "Concat",
34 encodeConcatRequest,
35 decodeConcatResponse,
36 pb.ConcatReply{},
37 ).Endpoint(), cc, err
34 func NewConcatEndpointFactory(tracer opentracing.Tracer) loadbalancer.Factory {
35 return func(instance string) (endpoint.Endpoint, io.Closer, error) {
36 cc, err := grpc.Dial(instance, grpc.WithInsecure())
37 return grpctransport.NewClient(
38 cc,
39 "Add",
40 "Concat",
41 encodeConcatRequest,
42 decodeConcatResponse,
43 pb.ConcatReply{},
44 grpctransport.SetClientBefore(kitot.ToGRPCRequest(tracer)),
45 ).Endpoint(), cc, err
46 }
3847 }
55
66 "github.com/go-kit/kit/endpoint"
77 "github.com/go-kit/kit/examples/addsvc/server"
8 "github.com/go-kit/kit/loadbalancer"
9 kitot "github.com/go-kit/kit/tracing/opentracing"
810 httptransport "github.com/go-kit/kit/transport/http"
11 "github.com/opentracing/opentracing-go"
912 )
1013
11 // SumEndpointFactory transforms a http url into an Endpoint.
14 // SumEndpointFactory generates a Factory that transforms an http url into an
15 // Endpoint.
16 //
1217 // The path of the url is reset to /sum.
13 func SumEndpointFactory(instance string) (endpoint.Endpoint, io.Closer, error) {
14 sumURL, err := url.Parse(instance)
15 if err != nil {
16 return nil, nil, err
18 func NewSumEndpointFactory(tracer opentracing.Tracer) loadbalancer.Factory {
19 return func(instance string) (endpoint.Endpoint, io.Closer, error) {
20 sumURL, err := url.Parse(instance)
21 if err != nil {
22 return nil, nil, err
23 }
24 sumURL.Path = "/sum"
25
26 client := httptransport.NewClient(
27 "GET",
28 sumURL,
29 server.EncodeSumRequest,
30 server.DecodeSumResponse,
31 httptransport.SetClient(nil),
32 httptransport.SetClientBefore(kitot.ToHTTPRequest(tracer)),
33 )
34
35 return client.Endpoint(), nil, nil
1736 }
18 sumURL.Path = "/sum"
19
20 client := httptransport.NewClient(
21 "GET",
22 sumURL,
23 server.EncodeSumRequest,
24 server.DecodeSumResponse,
25 httptransport.SetClient(nil),
26 )
27
28 return client.Endpoint(), nil, nil
2937 }
3038
31 // ConcatEndpointFactory transforms a http url into an Endpoint.
39 // NewConcatEndpointFactory generates a Factory that transforms an http url
40 // into an Endpoint.
41 //
3242 // The path of the url is reset to /concat.
33 func ConcatEndpointFactory(instance string) (endpoint.Endpoint, io.Closer, error) {
34 concatURL, err := url.Parse(instance)
35 if err != nil {
36 return nil, nil, err
43 func NewConcatEndpointFactory(tracer opentracing.Tracer) loadbalancer.Factory {
44 return func(instance string) (endpoint.Endpoint, io.Closer, error) {
45 concatURL, err := url.Parse(instance)
46 if err != nil {
47 return nil, nil, err
48 }
49 concatURL.Path = "/concat"
50
51 client := httptransport.NewClient(
52 "GET",
53 concatURL,
54 server.EncodeConcatRequest,
55 server.DecodeConcatResponse,
56 httptransport.SetClient(nil),
57 httptransport.SetClientBefore(kitot.ToHTTPRequest(tracer)),
58 )
59
60 return client.Endpoint(), nil, nil
3761 }
38 concatURL.Path = "/concat"
39
40 client := httptransport.NewClient(
41 "GET",
42 concatURL,
43 server.EncodeConcatRequest,
44 server.DecodeConcatResponse,
45 httptransport.SetClient(nil),
46 )
47
48 return client.Endpoint(), nil, nil
4962 }
1818 "github.com/go-kit/kit/loadbalancer"
1919 "github.com/go-kit/kit/loadbalancer/static"
2020 "github.com/go-kit/kit/log"
21 kitot "github.com/go-kit/kit/tracing/opentracing"
22 "github.com/lightstep/lightstep-tracer-go"
23 "github.com/opentracing/opentracing-go"
24 appdashot "github.com/sourcegraph/appdash/opentracing"
25 "sourcegraph.com/sourcegraph/appdash"
2126 )
2227
2328 func main() {
3035 thriftProtocol = flag.String("thrift.protocol", "binary", "binary, compact, json, simplejson")
3136 thriftBufferSize = flag.Int("thrift.buffer.size", 0, "0 for unbuffered")
3237 thriftFramed = flag.Bool("thrift.framed", false, "true to enable framing")
38
39 // Two OpenTracing backends (to demonstrate how they can be interchanged):
40 appdashHostport = flag.String("appdash_hostport", "", "Enable Appdash tracing via an Appdash server host:port")
41 lightstepAccessToken = flag.String("lightstep_access_token", "", "Enable LightStep tracing via a LightStep access token")
3342 )
3443 flag.Parse()
3544 if len(os.Args) < 4 {
4857 logger = log.NewContext(logger).With("caller", log.DefaultCaller)
4958 logger = log.NewContext(logger).With("transport", *transport)
5059
60 // Set up OpenTracing
61 var tracer opentracing.Tracer
62 {
63 if len(*appdashHostport) > 0 {
64 tracer = appdashot.NewTracer(appdash.NewRemoteCollector(*appdashHostport))
65 }
66 if len(*lightstepAccessToken) > 0 {
67 if tracer != nil {
68 panic("Attempted to configure multiple OpenTracing implementations")
69 }
70 tracer = lightstep.NewTracer(lightstep.Options{
71 AccessToken: *lightstepAccessToken,
72 })
73 }
74 if tracer == nil {
75 tracer = opentracing.GlobalTracer() // the noop tracer
76 }
77 }
78
5179 var (
5280 instances []string
5381 sumFactory, concatFactory loadbalancer.Factory
5684 switch *transport {
5785 case "grpc":
5886 instances = strings.Split(*grpcAddrs, ",")
59 sumFactory = grpcclient.SumEndpointFactory
60 concatFactory = grpcclient.ConcatEndpointFactory
87 sumFactory = grpcclient.NewSumEndpointFactory(tracer)
88 concatFactory = grpcclient.NewConcatEndpointFactory(tracer)
6189
6290 case "httpjson":
6391 instances = strings.Split(*httpAddrs, ",")
6694 instances[i] = "http://" + rawurl
6795 }
6896 }
69 sumFactory = httpjsonclient.SumEndpointFactory
70 concatFactory = httpjsonclient.ConcatEndpointFactory
97 sumFactory = httpjsonclient.NewSumEndpointFactory(tracer)
98 concatFactory = httpjsonclient.NewConcatEndpointFactory(tracer)
7199
72100 case "netrpc":
73101 instances = strings.Split(*netrpcAddrs, ",")
85113 os.Exit(1)
86114 }
87115
88 sum := buildEndpoint(instances, sumFactory, randomSeed, logger)
89 concat := buildEndpoint(instances, concatFactory, randomSeed, logger)
116 sum := buildEndpoint(tracer, "sum", instances, sumFactory, randomSeed, logger)
117 concat := buildEndpoint(tracer, "concat", instances, concatFactory, randomSeed, logger)
90118
91119 svc := newClient(root, sum, concat, logger)
92120
107135 logger.Log("err", "invalid method "+method)
108136 os.Exit(1)
109137 }
138
139 if len(*lightstepAccessToken) > 0 {
140 lightstep.FlushLightStepTracer(tracer)
141 }
110142 }
111143
112 func buildEndpoint(instances []string, factory loadbalancer.Factory, seed int64, logger log.Logger) endpoint.Endpoint {
144 func buildEndpoint(tracer opentracing.Tracer, operationName string, instances []string, factory loadbalancer.Factory, seed int64, logger log.Logger) endpoint.Endpoint {
113145 publisher := static.NewPublisher(instances, factory, logger)
114146 random := loadbalancer.NewRandom(publisher, seed)
115 return loadbalancer.Retry(10, 10*time.Second, random)
147 endpoint := loadbalancer.Retry(10, 10*time.Second, random)
148 return kitot.TraceClient(tracer, operationName)(endpoint)
116149 }
55 "github.com/go-kit/kit/examples/addsvc/pb"
66 "github.com/go-kit/kit/examples/addsvc/server"
77 servergrpc "github.com/go-kit/kit/examples/addsvc/server/grpc"
8 kitot "github.com/go-kit/kit/tracing/opentracing"
89 "github.com/go-kit/kit/transport/grpc"
10 "github.com/opentracing/opentracing-go"
911 )
1012
1113 type grpcBinding struct {
1214 sum, concat grpc.Handler
1315 }
1416
15 func newGRPCBinding(ctx context.Context, svc server.AddService) grpcBinding {
17 func newGRPCBinding(ctx context.Context, tracer opentracing.Tracer, svc server.AddService) grpcBinding {
1618 return grpcBinding{
17 sum: grpc.NewServer(ctx, makeSumEndpoint(svc), servergrpc.DecodeSumRequest, servergrpc.EncodeSumResponse),
18 concat: grpc.NewServer(ctx, makeConcatEndpoint(svc), servergrpc.DecodeConcatRequest, servergrpc.EncodeConcatResponse),
19 sum: grpc.NewServer(
20 ctx,
21 kitot.TraceServer(tracer, "sum")(makeSumEndpoint(svc)),
22 servergrpc.DecodeSumRequest,
23 servergrpc.EncodeSumResponse,
24 grpc.ServerBefore(kitot.FromGRPCRequest(tracer, "")),
25 ),
26 concat: grpc.NewServer(
27 ctx,
28 kitot.TraceServer(tracer, "concat")(makeConcatEndpoint(svc)),
29 servergrpc.DecodeConcatRequest,
30 servergrpc.EncodeConcatResponse,
31 grpc.ServerBefore(kitot.FromGRPCRequest(tracer, "")),
32 ),
1933 }
2034 }
2135
1414 "time"
1515
1616 "github.com/apache/thrift/lib/go/thrift"
17 kitot "github.com/go-kit/kit/tracing/opentracing"
18 "github.com/lightstep/lightstep-tracer-go"
19 "github.com/opentracing/opentracing-go"
1720 stdprometheus "github.com/prometheus/client_golang/prometheus"
21 appdashot "github.com/sourcegraph/appdash/opentracing"
1822 "golang.org/x/net/context"
1923 "google.golang.org/grpc"
24 "sourcegraph.com/sourcegraph/appdash"
2025
2126 "github.com/go-kit/kit/endpoint"
2227 "github.com/go-kit/kit/examples/addsvc/pb"
3540 // of glog. So, we define a new flag set, to keep those domains distinct.
3641 fs := flag.NewFlagSet("", flag.ExitOnError)
3742 var (
38 debugAddr = fs.String("debug.addr", ":8000", "Address for HTTP debug/instrumentation server")
39 httpAddr = fs.String("http.addr", ":8001", "Address for HTTP (JSON) server")
40 grpcAddr = fs.String("grpc.addr", ":8002", "Address for gRPC server")
41 netrpcAddr = fs.String("netrpc.addr", ":8003", "Address for net/rpc server")
42 thriftAddr = fs.String("thrift.addr", ":8004", "Address for Thrift server")
43 thriftProtocol = fs.String("thrift.protocol", "binary", "binary, compact, json, simplejson")
44 thriftBufferSize = fs.Int("thrift.buffer.size", 0, "0 for unbuffered")
45 thriftFramed = fs.Bool("thrift.framed", false, "true to enable framing")
46 zipkinHostPort = fs.String("zipkin.host.port", "my.service.domain:12345", "Zipkin host:port")
47 zipkinServiceName = fs.String("zipkin.service.name", "addsvc", "Zipkin service name")
48 zipkinCollectorAddr = fs.String("zipkin.collector.addr", "", "Zipkin Kafka collector address (empty will log spans)")
43 debugAddr = fs.String("debug.addr", ":8000", "Address for HTTP debug/instrumentation server")
44 httpAddr = fs.String("http.addr", ":8001", "Address for HTTP (JSON) server")
45 grpcAddr = fs.String("grpc.addr", ":8002", "Address for gRPC server")
46 netrpcAddr = fs.String("netrpc.addr", ":8003", "Address for net/rpc server")
47 thriftAddr = fs.String("thrift.addr", ":8004", "Address for Thrift server")
48 thriftProtocol = fs.String("thrift.protocol", "binary", "binary, compact, json, simplejson")
49 thriftBufferSize = fs.Int("thrift.buffer.size", 0, "0 for unbuffered")
50 thriftFramed = fs.Bool("thrift.framed", false, "true to enable framing")
51
52 // Supported OpenTracing backends
53 appdashHostport = fs.String("appdash_hostport", "", "Enable Appdash tracing via an Appdash server host:port")
54 lightstepAccessToken = fs.String("lightstep_access_token", "", "Enable LightStep tracing via a LightStep access token")
4955 )
5056 flag.Usage = fs.Usage // only show our flags
5157 if err := fs.Parse(os.Args[1:]); err != nil {
7783 ))
7884 }
7985
80 // package tracing
81 var collector zipkin.Collector
82 {
83 zipkinLogger := log.NewContext(logger).With("component", "zipkin")
84 collector = loggingCollector{zipkinLogger} // TODO(pb)
85 if *zipkinCollectorAddr != "" {
86 var err error
87 if collector, err = zipkin.NewKafkaCollector(
88 []string{*zipkinCollectorAddr},
89 zipkin.KafkaLogger(zipkinLogger),
90 ); err != nil {
91 zipkinLogger.Log("err", err)
92 os.Exit(1)
86 // Set up OpenTracing
87 var tracer opentracing.Tracer
88 {
89 if len(*appdashHostport) > 0 {
90 tracer = appdashot.NewTracer(appdash.NewRemoteCollector(*appdashHostport))
91 }
92 if len(*lightstepAccessToken) > 0 {
93 if tracer != nil {
94 panic("Attempted to configure multiple OpenTracing implementations")
9395 }
96 tracer = lightstep.NewTracer(lightstep.Options{
97 AccessToken: *lightstepAccessToken,
98 })
99 }
100 if tracer == nil {
101 tracer = opentracing.GlobalTracer() // the noop tracer
94102 }
95103 }
96104
122130 go func() {
123131 var (
124132 transportLogger = log.NewContext(logger).With("transport", "HTTP/JSON")
125 tracingLogger = log.NewContext(transportLogger).With("component", "tracing")
126 newSumSpan = zipkin.MakeNewSpanFunc(*zipkinHostPort, *zipkinServiceName, "sum")
127 newConcatSpan = zipkin.MakeNewSpanFunc(*zipkinHostPort, *zipkinServiceName, "concat")
128 traceSum = zipkin.ToContext(newSumSpan, tracingLogger)
129 traceConcat = zipkin.ToContext(newConcatSpan, tracingLogger)
130133 mux = http.NewServeMux()
131134 sum, concat endpoint.Endpoint
132135 )
133136
134137 sum = makeSumEndpoint(svc)
135 sum = zipkin.AnnotateServer(newSumSpan, collector)(sum)
138 sum = kitot.TraceServer(tracer, "sum")(sum)
136139 mux.Handle("/sum", httptransport.NewServer(
137140 root,
138141 sum,
139142 server.DecodeSumRequest,
140143 server.EncodeSumResponse,
141 httptransport.ServerBefore(traceSum),
142144 httptransport.ServerErrorLogger(transportLogger),
145 httptransport.ServerBefore(kitot.FromHTTPRequest(tracer, "sum")),
143146 ))
144147
145148 concat = makeConcatEndpoint(svc)
146 concat = zipkin.AnnotateServer(newConcatSpan, collector)(concat)
149 concat = kitot.TraceServer(tracer, "concat")(concat)
147150 mux.Handle("/concat", httptransport.NewServer(
148151 root,
149152 concat,
150153 server.DecodeConcatRequest,
151154 server.EncodeConcatResponse,
152 httptransport.ServerBefore(traceConcat),
153155 httptransport.ServerErrorLogger(transportLogger),
156 httptransport.ServerBefore(kitot.FromHTTPRequest(tracer, "concat")),
154157 ))
155158
156159 transportLogger.Log("addr", *httpAddr)
166169 return
167170 }
168171 s := grpc.NewServer() // uses its own, internal context
169 pb.RegisterAddServer(s, newGRPCBinding(root, svc))
172 pb.RegisterAddServer(s, newGRPCBinding(root, tracer, svc))
170173 transportLogger.Log("addr", *grpcAddr)
171174 errc <- s.Serve(ln)
172175 }()
99
1010 type pureAddService struct{}
1111
12 func (pureAddService) Sum(a, b int) int { return a + b }
12 func (pureAddService) Sum(a, b int) int {
13 return a + b
14 }
1315
1416 func (pureAddService) Concat(a, b string) string { return a + b }
1517
1616
1717 "github.com/gorilla/mux"
1818 "github.com/hashicorp/consul/api"
19 "github.com/opentracing/opentracing-go"
1920 "golang.org/x/net/context"
2021
2122 "github.com/go-kit/kit/endpoint"
7677 factory loadbalancer.Factory
7778 }{
7879 "addsvc": {
79 {path: "/api/addsvc/concat", factory: grpc.ConcatEndpointFactory},
80 {path: "/api/addsvc/sum", factory: grpc.SumEndpointFactory},
80 {path: "/api/addsvc/concat", factory: grpc.NewConcatEndpointFactory(opentracing.GlobalTracer())},
81 {path: "/api/addsvc/sum", factory: grpc.NewSumEndpointFactory(opentracing.GlobalTracer())},
8182 },
8283 "stringsvc": {
8384 {path: "/api/stringsvc/uppercase", factory: httpFactory(ctx, "GET", "uppercase/")},