diff --git a/examples/addsvc/client/main.go b/examples/addsvc/client/main.go index e1307cb..e602998 100644 --- a/examples/addsvc/client/main.go +++ b/examples/addsvc/client/main.go @@ -9,6 +9,7 @@ "strings" "time" + zipkin "github.com/basvanbeek/zipkin-go-opentracing" "github.com/lightstep/lightstep-tracer-go" "github.com/opentracing/opentracing-go" appdashot "github.com/sourcegraph/appdash/opentracing" @@ -37,7 +38,8 @@ thriftBufferSize = flag.Int("thrift.buffer.size", 0, "0 for unbuffered") thriftFramed = flag.Bool("thrift.framed", false, "true to enable framing") - // Two OpenTracing backends (to demonstrate how they can be interchanged): + // Three OpenTracing backends (to demonstrate how they can be interchanged): + zipkinAddr = flag.String("zipkin.kafka.addr", "", "Enable Zipkin tracing via a Kafka Collector host:port") appdashAddr = flag.String("appdash.addr", "", "Enable Appdash tracing via an Appdash server host:port") lightstepAccessToken = flag.String("lightstep.token", "", "Enable LightStep tracing via a LightStep access token") ) @@ -63,17 +65,33 @@ var tracer opentracing.Tracer { switch { - case *appdashAddr != "" && *lightstepAccessToken == "": + case *appdashAddr != "" && *lightstepAccessToken == "" && *zipkinAddr == "": tracer = appdashot.NewTracer(appdash.NewRemoteCollector(*appdashAddr)) - case *appdashAddr == "" && *lightstepAccessToken != "": + case *appdashAddr == "" && *lightstepAccessToken != "" && *zipkinAddr == "": tracer = lightstep.NewTracer(lightstep.Options{ AccessToken: *lightstepAccessToken, }) defer lightstep.FlushLightStepTracer(tracer) - case *appdashAddr == "" && *lightstepAccessToken == "": + case *appdashAddr == "" && *lightstepAccessToken == "" && *zipkinAddr != "": + collector, err := zipkin.NewKafkaCollector( + strings.Split(*zipkinAddr, ","), + zipkin.KafkaLogger(tracingLogger), + ) + if err != nil { + tracingLogger.Log("err", "unable to create kafka collector") + os.Exit(1) + } + tracer, err = zipkin.NewTracer( + zipkin.NewRecorder(collector, false, "localhost:8000", "addsvc-client"), + ) + if err != nil { + tracingLogger.Log("err", "unable to create zipkin tracer") + os.Exit(1) + } + case *appdashAddr == "" && *lightstepAccessToken == "" && *zipkinAddr == "": tracer = opentracing.GlobalTracer() // no-op default: - panic("specify either -appdash.addr or -lightstep.access.token, not both") + panic("specify a single -appdash.addr, -lightstep.access.token or -zipkin.kafka.addr") } } @@ -136,6 +154,8 @@ logger.Log("err", "invalid method "+method) os.Exit(1) } + // wait for collector + time.Sleep(2 * time.Second) } func buildEndpoint(tracer opentracing.Tracer, operationName string, instances []string, factory loadbalancer.Factory, seed int64, logger log.Logger) endpoint.Endpoint { diff --git a/examples/addsvc/main.go b/examples/addsvc/main.go index 28c4088..6856a2f 100644 --- a/examples/addsvc/main.go +++ b/examples/addsvc/main.go @@ -15,6 +15,7 @@ "time" "github.com/apache/thrift/lib/go/thrift" + zipkin "github.com/basvanbeek/zipkin-go-opentracing" "github.com/lightstep/lightstep-tracer-go" "github.com/opentracing/opentracing-go" stdprometheus "github.com/prometheus/client_golang/prometheus" @@ -32,7 +33,6 @@ "github.com/go-kit/kit/metrics/expvar" "github.com/go-kit/kit/metrics/prometheus" kitot "github.com/go-kit/kit/tracing/opentracing" - "github.com/go-kit/kit/tracing/zipkin" httptransport "github.com/go-kit/kit/transport/http" ) @@ -51,6 +51,7 @@ thriftFramed = fs.Bool("thrift.framed", false, "true to enable framing") // Supported OpenTracing backends + zipkinAddr = fs.String("zipkin.kafka.addr", "", "Enable Zipkin tracing via a Kafka server host:port") appdashAddr = fs.String("appdash.addr", "", "Enable Appdash tracing via an Appdash server host:port") lightstepAccessToken = fs.String("lightstep.token", "", "Enable LightStep tracing via a LightStep access token") ) @@ -88,17 +89,33 @@ var tracer opentracing.Tracer { switch { - case *appdashAddr != "" && *lightstepAccessToken == "": + case *appdashAddr != "" && *lightstepAccessToken == "" && *zipkinAddr == "": tracer = appdashot.NewTracer(appdash.NewRemoteCollector(*appdashAddr)) - case *appdashAddr == "" && *lightstepAccessToken != "": + case *appdashAddr == "" && *lightstepAccessToken != "" && *zipkinAddr == "": tracer = lightstep.NewTracer(lightstep.Options{ AccessToken: *lightstepAccessToken, }) defer lightstep.FlushLightStepTracer(tracer) - case *appdashAddr == "" && *lightstepAccessToken == "": + case *appdashAddr == "" && *lightstepAccessToken == "" && *zipkinAddr != "": + collector, err := zipkin.NewKafkaCollector( + strings.Split(*zipkinAddr, ","), + zipkin.KafkaLogger(logger), + ) + if err != nil { + logger.Log("err", "unable to create kafka collector") + os.Exit(1) + } + tracer, err = zipkin.NewTracer( + zipkin.NewRecorder(collector, false, "localhost:80", "addsvc"), + ) + if err != nil { + logger.Log("err", "unable to create zipkin tracer") + os.Exit(1) + } + case *appdashAddr == "" && *lightstepAccessToken == "" && *zipkinAddr == "": tracer = opentracing.GlobalTracer() // no-op default: - panic("specify either -appdash.addr or -lightstep.access.token, not both") + panic("specify a single -appdash.addr, -lightstep.access.token or -zipkin.kafka.addr") } } @@ -237,24 +254,3 @@ signal.Notify(c, syscall.SIGINT, syscall.SIGTERM) return fmt.Errorf("%s", <-c) } - -type loggingCollector struct{ log.Logger } - -func (c loggingCollector) Collect(s *zipkin.Span) error { - annotations := s.Encode().GetAnnotations() - values := make([]string, len(annotations)) - for i, a := range annotations { - values[i] = a.Value - } - c.Logger.Log( - "trace_id", s.TraceID(), - "span_id", s.SpanID(), - "parent_span_id", s.ParentSpanID(), - "annotations", strings.Join(values, " "), - ) - return nil -} - -func (c loggingCollector) ShouldSample(*zipkin.Span) bool { return true } - -func (c loggingCollector) Close() error { return nil } diff --git a/tracing/opentracing/grpc.go b/tracing/opentracing/grpc.go index 5466c7b..fee54d7 100644 --- a/tracing/opentracing/grpc.go +++ b/tracing/opentracing/grpc.go @@ -39,11 +39,11 @@ func FromGRPCRequest(tracer opentracing.Tracer, operationName string, logger log.Logger) func(ctx context.Context, md *metadata.MD) context.Context { return func(ctx context.Context, md *metadata.MD) context.Context { span, err := tracer.Join(operationName, opentracing.TextMap, metadataReaderWriter{md}) - if err != nil && logger != nil { - logger.Log("msg", "Join failed", "err", err) - } - if span == nil { + if err != nil { span = tracer.StartSpan(operationName) + if logger != nil { + logger.Log("msg", "Join failed", "err", err) + } } return opentracing.ContextWithSpan(ctx, span) } diff --git a/tracing/opentracing/http.go b/tracing/opentracing/http.go index 606516d..fe03c7f 100644 --- a/tracing/opentracing/http.go +++ b/tracing/opentracing/http.go @@ -57,18 +57,17 @@ // The logger is used to report errors and may be nil. func FromHTTPRequest(tracer opentracing.Tracer, operationName string, logger log.Logger) kithttp.RequestFunc { return func(ctx context.Context, req *http.Request) context.Context { - // Try to join to a trace propagated in `req`. There's nothing we can - // do with any errors here, so we ignore them. + // Try to join to a trace propagated in `req`. span, err := tracer.Join( operationName, opentracing.TextMap, opentracing.HTTPHeaderTextMapCarrier(req.Header), ) - if err != nil && logger != nil { - logger.Log("msg", "Join failed", "err", err) - } - if span == nil { - span = opentracing.StartSpan(operationName) + if err != nil { + span = tracer.StartSpan(operationName) + if logger != nil { + logger.Log("msg", "Join failed", "err", err) + } } return opentracing.ContextWithSpan(ctx, span) }