diff --git a/examples/addsvc/client/grpc/factory.go b/examples/addsvc/client/grpc/factory.go index b898d96..0bde30e 100644 --- a/examples/addsvc/client/grpc/factory.go +++ b/examples/addsvc/client/grpc/factory.go @@ -7,33 +7,42 @@ "github.com/go-kit/kit/endpoint" "github.com/go-kit/kit/examples/addsvc/pb" + "github.com/go-kit/kit/loadbalancer" + kitot "github.com/go-kit/kit/tracing/opentracing" grpctransport "github.com/go-kit/kit/transport/grpc" + "github.com/opentracing/opentracing-go" ) // SumEndpointFactory transforms GRPC host:port strings into Endpoints that call the Sum method on a GRPC server // at that address. -func SumEndpointFactory(instance string) (endpoint.Endpoint, io.Closer, error) { - cc, err := grpc.Dial(instance, grpc.WithInsecure()) - return grpctransport.NewClient( - cc, - "Add", - "Sum", - encodeSumRequest, - decodeSumResponse, - pb.SumReply{}, - ).Endpoint(), cc, err +func NewSumEndpointFactory(tracer opentracing.Tracer) loadbalancer.Factory { + return func(instance string) (endpoint.Endpoint, io.Closer, error) { + cc, err := grpc.Dial(instance, grpc.WithInsecure()) + return grpctransport.NewClient( + cc, + "Add", + "Sum", + encodeSumRequest, + decodeSumResponse, + pb.SumReply{}, + grpctransport.SetClientBefore(kitot.ToGRPCRequest(tracer)), + ).Endpoint(), cc, err + } } // ConcatEndpointFactory transforms GRPC host:port strings into Endpoints that call the Concat method on a GRPC server // at that address. -func ConcatEndpointFactory(instance string) (endpoint.Endpoint, io.Closer, error) { - cc, err := grpc.Dial(instance, grpc.WithInsecure()) - return grpctransport.NewClient( - cc, - "Add", - "Concat", - encodeConcatRequest, - decodeConcatResponse, - pb.ConcatReply{}, - ).Endpoint(), cc, err +func NewConcatEndpointFactory(tracer opentracing.Tracer) loadbalancer.Factory { + return func(instance string) (endpoint.Endpoint, io.Closer, error) { + cc, err := grpc.Dial(instance, grpc.WithInsecure()) + return grpctransport.NewClient( + cc, + "Add", + "Concat", + encodeConcatRequest, + decodeConcatResponse, + pb.ConcatReply{}, + grpctransport.SetClientBefore(kitot.ToGRPCRequest(tracer)), + ).Endpoint(), cc, err + } } diff --git a/examples/addsvc/client/httpjson/factory.go b/examples/addsvc/client/httpjson/factory.go index 43f3148..10b8564 100644 --- a/examples/addsvc/client/httpjson/factory.go +++ b/examples/addsvc/client/httpjson/factory.go @@ -6,45 +6,58 @@ "github.com/go-kit/kit/endpoint" "github.com/go-kit/kit/examples/addsvc/server" + "github.com/go-kit/kit/loadbalancer" + kitot "github.com/go-kit/kit/tracing/opentracing" httptransport "github.com/go-kit/kit/transport/http" + "github.com/opentracing/opentracing-go" ) -// SumEndpointFactory transforms a http url into an Endpoint. +// SumEndpointFactory generates a Factory that transforms an http url into an +// Endpoint. +// // The path of the url is reset to /sum. -func SumEndpointFactory(instance string) (endpoint.Endpoint, io.Closer, error) { - sumURL, err := url.Parse(instance) - if err != nil { - return nil, nil, err +func NewSumEndpointFactory(tracer opentracing.Tracer) loadbalancer.Factory { + return func(instance string) (endpoint.Endpoint, io.Closer, error) { + sumURL, err := url.Parse(instance) + if err != nil { + return nil, nil, err + } + sumURL.Path = "/sum" + + client := httptransport.NewClient( + "GET", + sumURL, + server.EncodeSumRequest, + server.DecodeSumResponse, + httptransport.SetClient(nil), + httptransport.SetClientBefore(kitot.ToHTTPRequest(tracer)), + ) + + return client.Endpoint(), nil, nil } - sumURL.Path = "/sum" - - client := httptransport.NewClient( - "GET", - sumURL, - server.EncodeSumRequest, - server.DecodeSumResponse, - httptransport.SetClient(nil), - ) - - return client.Endpoint(), nil, nil } -// ConcatEndpointFactory transforms a http url into an Endpoint. +// NewConcatEndpointFactory generates a Factory that transforms an http url +// into an Endpoint. +// // The path of the url is reset to /concat. -func ConcatEndpointFactory(instance string) (endpoint.Endpoint, io.Closer, error) { - concatURL, err := url.Parse(instance) - if err != nil { - return nil, nil, err +func NewConcatEndpointFactory(tracer opentracing.Tracer) loadbalancer.Factory { + return func(instance string) (endpoint.Endpoint, io.Closer, error) { + concatURL, err := url.Parse(instance) + if err != nil { + return nil, nil, err + } + concatURL.Path = "/concat" + + client := httptransport.NewClient( + "GET", + concatURL, + server.EncodeConcatRequest, + server.DecodeConcatResponse, + httptransport.SetClient(nil), + httptransport.SetClientBefore(kitot.ToHTTPRequest(tracer)), + ) + + return client.Endpoint(), nil, nil } - concatURL.Path = "/concat" - - client := httptransport.NewClient( - "GET", - concatURL, - server.EncodeConcatRequest, - server.DecodeConcatResponse, - httptransport.SetClient(nil), - ) - - return client.Endpoint(), nil, nil } diff --git a/examples/addsvc/client/main.go b/examples/addsvc/client/main.go index 95bb16e..72b65c2 100644 --- a/examples/addsvc/client/main.go +++ b/examples/addsvc/client/main.go @@ -19,6 +19,11 @@ "github.com/go-kit/kit/loadbalancer" "github.com/go-kit/kit/loadbalancer/static" "github.com/go-kit/kit/log" + kitot "github.com/go-kit/kit/tracing/opentracing" + "github.com/lightstep/lightstep-tracer-go" + "github.com/opentracing/opentracing-go" + appdashot "github.com/sourcegraph/appdash/opentracing" + "sourcegraph.com/sourcegraph/appdash" ) func main() { @@ -31,6 +36,10 @@ thriftProtocol = flag.String("thrift.protocol", "binary", "binary, compact, json, simplejson") thriftBufferSize = flag.Int("thrift.buffer.size", 0, "0 for unbuffered") thriftFramed = flag.Bool("thrift.framed", false, "true to enable framing") + + // Two OpenTracing backends (to demonstrate how they can be interchanged): + appdashHostport = flag.String("appdash_hostport", "", "Enable Appdash tracing via an Appdash server host:port") + lightstepAccessToken = flag.String("lightstep_access_token", "", "Enable LightStep tracing via a LightStep access token") ) flag.Parse() if len(os.Args) < 4 { @@ -49,6 +58,25 @@ logger = log.NewContext(logger).With("caller", log.DefaultCaller) logger = log.NewContext(logger).With("transport", *transport) + // Set up OpenTracing + var tracer opentracing.Tracer + { + if len(*appdashHostport) > 0 { + tracer = appdashot.NewTracer(appdash.NewRemoteCollector(*appdashHostport)) + } + if len(*lightstepAccessToken) > 0 { + if tracer != nil { + panic("Attempted to configure multiple OpenTracing implementations") + } + tracer = lightstep.NewTracer(lightstep.Options{ + AccessToken: *lightstepAccessToken, + }) + } + if tracer == nil { + tracer = opentracing.GlobalTracer() // the noop tracer + } + } + var ( instances []string sumFactory, concatFactory loadbalancer.Factory @@ -57,8 +85,8 @@ switch *transport { case "grpc": instances = strings.Split(*grpcAddrs, ",") - sumFactory = grpcclient.SumEndpointFactory - concatFactory = grpcclient.ConcatEndpointFactory + sumFactory = grpcclient.NewSumEndpointFactory(tracer) + concatFactory = grpcclient.NewConcatEndpointFactory(tracer) case "httpjson": instances = strings.Split(*httpAddrs, ",") @@ -67,8 +95,8 @@ instances[i] = "http://" + rawurl } } - sumFactory = httpjsonclient.SumEndpointFactory - concatFactory = httpjsonclient.ConcatEndpointFactory + sumFactory = httpjsonclient.NewSumEndpointFactory(tracer) + concatFactory = httpjsonclient.NewConcatEndpointFactory(tracer) case "netrpc": instances = strings.Split(*netrpcAddrs, ",") @@ -86,8 +114,8 @@ os.Exit(1) } - sum := buildEndpoint(instances, sumFactory, randomSeed, logger) - concat := buildEndpoint(instances, concatFactory, randomSeed, logger) + sum := buildEndpoint(tracer, "sum", instances, sumFactory, randomSeed, logger) + concat := buildEndpoint(tracer, "concat", instances, concatFactory, randomSeed, logger) svc := newClient(root, sum, concat, logger) @@ -108,10 +136,15 @@ logger.Log("err", "invalid method "+method) os.Exit(1) } + + if len(*lightstepAccessToken) > 0 { + lightstep.FlushLightStepTracer(tracer) + } } -func buildEndpoint(instances []string, factory loadbalancer.Factory, seed int64, logger log.Logger) endpoint.Endpoint { +func buildEndpoint(tracer opentracing.Tracer, operationName string, instances []string, factory loadbalancer.Factory, seed int64, logger log.Logger) endpoint.Endpoint { publisher := static.NewPublisher(instances, factory, logger) random := loadbalancer.NewRandom(publisher, seed) - return loadbalancer.Retry(10, 10*time.Second, random) + endpoint := loadbalancer.Retry(10, 10*time.Second, random) + return kitot.TraceClient(tracer, operationName)(endpoint) } diff --git a/examples/addsvc/grpc_binding.go b/examples/addsvc/grpc_binding.go index 42cd849..ec904c0 100644 --- a/examples/addsvc/grpc_binding.go +++ b/examples/addsvc/grpc_binding.go @@ -6,17 +6,31 @@ "github.com/go-kit/kit/examples/addsvc/pb" "github.com/go-kit/kit/examples/addsvc/server" servergrpc "github.com/go-kit/kit/examples/addsvc/server/grpc" + kitot "github.com/go-kit/kit/tracing/opentracing" "github.com/go-kit/kit/transport/grpc" + "github.com/opentracing/opentracing-go" ) type grpcBinding struct { sum, concat grpc.Handler } -func newGRPCBinding(ctx context.Context, svc server.AddService) grpcBinding { +func newGRPCBinding(ctx context.Context, tracer opentracing.Tracer, svc server.AddService) grpcBinding { return grpcBinding{ - sum: grpc.NewServer(ctx, makeSumEndpoint(svc), servergrpc.DecodeSumRequest, servergrpc.EncodeSumResponse), - concat: grpc.NewServer(ctx, makeConcatEndpoint(svc), servergrpc.DecodeConcatRequest, servergrpc.EncodeConcatResponse), + sum: grpc.NewServer( + ctx, + kitot.TraceServer(tracer, "sum")(makeSumEndpoint(svc)), + servergrpc.DecodeSumRequest, + servergrpc.EncodeSumResponse, + grpc.ServerBefore(kitot.FromGRPCRequest(tracer, "")), + ), + concat: grpc.NewServer( + ctx, + kitot.TraceServer(tracer, "concat")(makeConcatEndpoint(svc)), + servergrpc.DecodeConcatRequest, + servergrpc.EncodeConcatResponse, + grpc.ServerBefore(kitot.FromGRPCRequest(tracer, "")), + ), } } diff --git a/examples/addsvc/main.go b/examples/addsvc/main.go index d504043..0b82bac 100644 --- a/examples/addsvc/main.go +++ b/examples/addsvc/main.go @@ -15,9 +15,14 @@ "time" "github.com/apache/thrift/lib/go/thrift" + kitot "github.com/go-kit/kit/tracing/opentracing" + "github.com/lightstep/lightstep-tracer-go" + "github.com/opentracing/opentracing-go" stdprometheus "github.com/prometheus/client_golang/prometheus" + appdashot "github.com/sourcegraph/appdash/opentracing" "golang.org/x/net/context" "google.golang.org/grpc" + "sourcegraph.com/sourcegraph/appdash" "github.com/go-kit/kit/endpoint" "github.com/go-kit/kit/examples/addsvc/pb" @@ -36,17 +41,18 @@ // of glog. So, we define a new flag set, to keep those domains distinct. fs := flag.NewFlagSet("", flag.ExitOnError) var ( - debugAddr = fs.String("debug.addr", ":8000", "Address for HTTP debug/instrumentation server") - httpAddr = fs.String("http.addr", ":8001", "Address for HTTP (JSON) server") - grpcAddr = fs.String("grpc.addr", ":8002", "Address for gRPC server") - netrpcAddr = fs.String("netrpc.addr", ":8003", "Address for net/rpc server") - thriftAddr = fs.String("thrift.addr", ":8004", "Address for Thrift server") - thriftProtocol = fs.String("thrift.protocol", "binary", "binary, compact, json, simplejson") - thriftBufferSize = fs.Int("thrift.buffer.size", 0, "0 for unbuffered") - thriftFramed = fs.Bool("thrift.framed", false, "true to enable framing") - zipkinHostPort = fs.String("zipkin.host.port", "my.service.domain:12345", "Zipkin host:port") - zipkinServiceName = fs.String("zipkin.service.name", "addsvc", "Zipkin service name") - zipkinCollectorAddr = fs.String("zipkin.collector.addr", "", "Zipkin Kafka collector address (empty will log spans)") + debugAddr = fs.String("debug.addr", ":8000", "Address for HTTP debug/instrumentation server") + httpAddr = fs.String("http.addr", ":8001", "Address for HTTP (JSON) server") + grpcAddr = fs.String("grpc.addr", ":8002", "Address for gRPC server") + netrpcAddr = fs.String("netrpc.addr", ":8003", "Address for net/rpc server") + thriftAddr = fs.String("thrift.addr", ":8004", "Address for Thrift server") + thriftProtocol = fs.String("thrift.protocol", "binary", "binary, compact, json, simplejson") + thriftBufferSize = fs.Int("thrift.buffer.size", 0, "0 for unbuffered") + thriftFramed = fs.Bool("thrift.framed", false, "true to enable framing") + + // Supported OpenTracing backends + appdashHostport = fs.String("appdash_hostport", "", "Enable Appdash tracing via an Appdash server host:port") + lightstepAccessToken = fs.String("lightstep_access_token", "", "Enable LightStep tracing via a LightStep access token") ) flag.Usage = fs.Usage // only show our flags if err := fs.Parse(os.Args[1:]); err != nil { @@ -78,20 +84,22 @@ )) } - // package tracing - var collector zipkin.Collector - { - zipkinLogger := log.NewContext(logger).With("component", "zipkin") - collector = loggingCollector{zipkinLogger} // TODO(pb) - if *zipkinCollectorAddr != "" { - var err error - if collector, err = zipkin.NewKafkaCollector( - []string{*zipkinCollectorAddr}, - zipkin.KafkaLogger(zipkinLogger), - ); err != nil { - zipkinLogger.Log("err", err) - os.Exit(1) + // Set up OpenTracing + var tracer opentracing.Tracer + { + if len(*appdashHostport) > 0 { + tracer = appdashot.NewTracer(appdash.NewRemoteCollector(*appdashHostport)) + } + if len(*lightstepAccessToken) > 0 { + if tracer != nil { + panic("Attempted to configure multiple OpenTracing implementations") } + tracer = lightstep.NewTracer(lightstep.Options{ + AccessToken: *lightstepAccessToken, + }) + } + if tracer == nil { + tracer = opentracing.GlobalTracer() // the noop tracer } } @@ -123,35 +131,30 @@ go func() { var ( transportLogger = log.NewContext(logger).With("transport", "HTTP/JSON") - tracingLogger = log.NewContext(transportLogger).With("component", "tracing") - newSumSpan = zipkin.MakeNewSpanFunc(*zipkinHostPort, *zipkinServiceName, "sum") - newConcatSpan = zipkin.MakeNewSpanFunc(*zipkinHostPort, *zipkinServiceName, "concat") - traceSum = zipkin.ToContext(newSumSpan, tracingLogger) - traceConcat = zipkin.ToContext(newConcatSpan, tracingLogger) mux = http.NewServeMux() sum, concat endpoint.Endpoint ) sum = makeSumEndpoint(svc) - sum = zipkin.AnnotateServer(newSumSpan, collector)(sum) + sum = kitot.TraceServer(tracer, "sum")(sum) mux.Handle("/sum", httptransport.NewServer( root, sum, server.DecodeSumRequest, server.EncodeSumResponse, - httptransport.ServerBefore(traceSum), httptransport.ServerErrorLogger(transportLogger), + httptransport.ServerBefore(kitot.FromHTTPRequest(tracer, "sum")), )) concat = makeConcatEndpoint(svc) - concat = zipkin.AnnotateServer(newConcatSpan, collector)(concat) + concat = kitot.TraceServer(tracer, "concat")(concat) mux.Handle("/concat", httptransport.NewServer( root, concat, server.DecodeConcatRequest, server.EncodeConcatResponse, - httptransport.ServerBefore(traceConcat), httptransport.ServerErrorLogger(transportLogger), + httptransport.ServerBefore(kitot.FromHTTPRequest(tracer, "concat")), )) transportLogger.Log("addr", *httpAddr) @@ -167,7 +170,7 @@ return } s := grpc.NewServer() // uses its own, internal context - pb.RegisterAddServer(s, newGRPCBinding(root, svc)) + pb.RegisterAddServer(s, newGRPCBinding(root, tracer, svc)) transportLogger.Log("addr", *grpcAddr) errc <- s.Serve(ln) }() diff --git a/examples/addsvc/service.go b/examples/addsvc/service.go index f0531a2..258da49 100644 --- a/examples/addsvc/service.go +++ b/examples/addsvc/service.go @@ -10,7 +10,9 @@ type pureAddService struct{} -func (pureAddService) Sum(a, b int) int { return a + b } +func (pureAddService) Sum(a, b int) int { + return a + b +} func (pureAddService) Concat(a, b string) string { return a + b } diff --git a/examples/apigateway/main.go b/examples/apigateway/main.go index e6ba593..c881122 100644 --- a/examples/apigateway/main.go +++ b/examples/apigateway/main.go @@ -17,6 +17,7 @@ "github.com/gorilla/mux" "github.com/hashicorp/consul/api" + "github.com/opentracing/opentracing-go" "golang.org/x/net/context" "github.com/go-kit/kit/endpoint" @@ -77,8 +78,8 @@ factory loadbalancer.Factory }{ "addsvc": { - {path: "/api/addsvc/concat", factory: grpc.ConcatEndpointFactory}, - {path: "/api/addsvc/sum", factory: grpc.SumEndpointFactory}, + {path: "/api/addsvc/concat", factory: grpc.NewConcatEndpointFactory(opentracing.GlobalTracer())}, + {path: "/api/addsvc/sum", factory: grpc.NewSumEndpointFactory(opentracing.GlobalTracer())}, }, "stringsvc": { {path: "/api/stringsvc/uppercase", factory: httpFactory(ctx, "GET", "uppercase/")},