Codebase list golang-github-go-kit-kit / 4431eb8
OpenCensus tracing middleware Adds OpenCencus tracing middleware for the HTTP and gRPC transports as well as a generic Go kit endpoint middleware Bas van Beek 5 years ago
12 changed file(s) with 1101 addition(s) and 9 deletion(s). Raw diff Collapse all Expand all
2525 return outer(next)
2626 }
2727 }
28
29 // Failer is an interface that should be implemented by response types that
30 // hold error properties as to separate business errors from transport errors.
31 // If the response type can hold business errors it is highly advised to
32 // implement Failer.
33 // Response encoders can check if responses are Failer, and if so if they've
34 // failed encode them using a separate write path based on the error.
35 // Endpoint middlewares can test if a response type failed and also act or
36 // report upon it.
37 //
38 // The addsvc example shows Failer's intended usage.
39 type Failer interface {
40 Failed() error
41 }
9797 }
9898 }
9999
100 // Failer is an interface that should be implemented by response types.
101 // Response encoders can check if responses are Failer, and if so if they've
102 // failed, and if so encode them using a separate write path based on the error.
103 type Failer interface {
104 Failed() error
105 }
100 // compile time assertions for our response types implementing endpoint.Failer.
101 var (
102 _ endpoint.Failer = SumResponse{}
103 _ endpoint.Failer = ConcatResponse{}
104 )
106105
107106 // SumRequest collects the request parameters for the Sum method.
108107 type SumRequest struct {
115114 Err error `json:"-"` // should be intercepted by Failed/errorEncoder
116115 }
117116
118 // Failed implements Failer.
117 // Failed implements endpoint.Failer.
119118 func (r SumResponse) Failed() error { return r.Err }
120119
121120 // ConcatRequest collects the request parameters for the Concat method.
129128 Err error `json:"-"`
130129 }
131130
132 // Failed implements Failer.
131 // Failed implements endpoint.Failer.
133132 func (r ConcatResponse) Failed() error { return r.Err }
234234 // encodeHTTPGenericResponse is a transport/http.EncodeResponseFunc that encodes
235235 // the response as JSON to the response writer. Primarily useful in a server.
236236 func encodeHTTPGenericResponse(ctx context.Context, w http.ResponseWriter, response interface{}) error {
237 if f, ok := response.(addendpoint.Failer); ok && f.Failed() != nil {
237 if f, ok := response.(endpoint.Failer); ok && f.Failed() != nil {
238238 errorEncoder(ctx, f.Failed(), w)
239239 return nil
240240 }
0 package opencensus
1
2 import (
3 "context"
4 "strconv"
5
6 "go.opencensus.io/trace"
7
8 "github.com/go-kit/kit/endpoint"
9 "github.com/go-kit/kit/sd/lb"
10 )
11
12 // TraceEndpointDefaultName is the default endpoint span name to use.
13 const TraceEndpointDefaultName = "gokit/endpoint"
14
15 // TraceEndpoint returns an Endpoint middleware, tracing a Go kit endpoint.
16 // This endpoint tracer should be used in combination with a Go kit Transport
17 // tracing middleware, generic OpenCensus transport middleware or custom before
18 // and after transport functions as service propagation of SpanContext is not
19 // provided in this middleware.
20 func TraceEndpoint(name string, options ...EndpointOption) endpoint.Middleware {
21 if name == "" {
22 name = TraceEndpointDefaultName
23 }
24
25 cfg := &EndpointOptions{}
26
27 for _, o := range options {
28 o(cfg)
29 }
30
31 return func(next endpoint.Endpoint) endpoint.Endpoint {
32 return func(ctx context.Context, request interface{}) (response interface{}, err error) {
33 ctx, span := trace.StartSpan(ctx, name)
34 if len(cfg.Attributes) > 0 {
35 span.AddAttributes(cfg.Attributes...)
36 }
37 defer span.End()
38
39 defer func() {
40 if err != nil {
41 if lberr, ok := err.(lb.RetryError); ok {
42 // handle errors originating from lb.Retry
43 attrs := make([]trace.Attribute, 0, len(lberr.RawErrors))
44 for idx, rawErr := range lberr.RawErrors {
45 attrs = append(attrs, trace.StringAttribute(
46 "gokit.retry.error."+strconv.Itoa(idx+1), rawErr.Error(),
47 ))
48 }
49 span.AddAttributes(attrs...)
50 span.SetStatus(trace.Status{
51 Code: trace.StatusCodeUnknown,
52 Message: lberr.Final.Error(),
53 })
54 return
55 }
56 // generic error
57 span.SetStatus(trace.Status{
58 Code: trace.StatusCodeUnknown,
59 Message: err.Error(),
60 })
61 return
62 }
63
64 // test for business error
65 if res, ok := response.(endpoint.Failer); ok && res.Failed() != nil {
66 span.AddAttributes(
67 trace.StringAttribute("gokit.business.error", res.Failed().Error()),
68 )
69 if cfg.IgnoreBusinessError {
70 span.SetStatus(trace.Status{Code: trace.StatusCodeOK})
71 return
72 }
73 // treating business error as real error in span.
74 span.SetStatus(trace.Status{
75 Code: trace.StatusCodeUnknown,
76 Message: res.Failed().Error(),
77 })
78 return
79 }
80
81 // no errors identified
82 span.SetStatus(trace.Status{Code: trace.StatusCodeOK})
83 }()
84 response, err = next(ctx, request)
85 return
86 }
87 }
88 }
0 package opencensus
1
2 import "go.opencensus.io/trace"
3
4 // EndpointOptions holds the options for tracing an endpoint
5 type EndpointOptions struct {
6 // IgnoreBusinessError if set to true will not treat a business error
7 // identified trough the endpoint.Failer interface as a span error.
8 IgnoreBusinessError bool
9
10 // Attributes holds the default attributes which will be set on span
11 // creation by our Endpoint middleware.
12 Attributes []trace.Attribute
13 }
14
15 // EndpointOption allows for functional options to our OpenCensus endpoint
16 // tracing middleware.
17 type EndpointOption func(*EndpointOptions)
18
19 // WithEndpointConfig sets all configuration options at once by use of the
20 // EndpointOptions struct.
21 func WithEndpointConfig(options EndpointOptions) EndpointOption {
22 return func(o *EndpointOptions) {
23 *o = options
24 }
25 }
26
27 // WithEndpointAttributes sets the default attributes for the spans created by
28 // the Endpoint tracer.
29 func WithEndpointAttributes(attrs ...trace.Attribute) EndpointOption {
30 return func(o *EndpointOptions) {
31 o.Attributes = attrs
32 }
33 }
34
35 // WithIgnoreBusinessError if set to true will not treat a business error
36 // identified through the endpoint.Failer interface as a span error.
37 func WithIgnoreBusinessError(val bool) EndpointOption {
38 return func(o *EndpointOptions) {
39 o.IgnoreBusinessError = val
40 }
41 }
0 package opencensus_test
1
2 import (
3 "context"
4 "errors"
5 "testing"
6 "time"
7
8 "go.opencensus.io/trace"
9
10 "github.com/go-kit/kit/endpoint"
11 "github.com/go-kit/kit/sd"
12 "github.com/go-kit/kit/sd/lb"
13 "github.com/go-kit/kit/tracing/opencensus"
14 )
15
16 const (
17 span1 = ""
18 span2 = "SPAN-2"
19 span3 = "SPAN-3"
20 span4 = "SPAN-4"
21 span5 = "SPAN-5"
22 )
23
24 var (
25 err1 = errors.New("someError")
26 err2 = errors.New("otherError")
27 err3 = errors.New("someBusinessError")
28 err4 = errors.New("otherBusinessError")
29 )
30
31 // compile time assertion
32 var _ endpoint.Failer = failedResponse{}
33
34 type failedResponse struct {
35 err error
36 }
37
38 func (r failedResponse) Failed() error { return r.err }
39
40 func passEndpoint(_ context.Context, req interface{}) (interface{}, error) {
41 if err, _ := req.(error); err != nil {
42 return nil, err
43 }
44 return req, nil
45 }
46
47 func TestTraceEndpoint(t *testing.T) {
48 ctx := context.Background()
49
50 e := &recordingExporter{}
51 trace.RegisterExporter(e)
52 trace.ApplyConfig(trace.Config{DefaultSampler: trace.AlwaysSample()})
53
54 // span 1
55 span1Attrs := []trace.Attribute{
56 trace.StringAttribute("string", "value"),
57 trace.Int64Attribute("int64", 42),
58 }
59 mw := opencensus.TraceEndpoint(
60 span1, opencensus.WithEndpointAttributes(span1Attrs...),
61 )
62 mw(endpoint.Nop)(ctx, nil)
63
64 // span 2
65 opts := opencensus.EndpointOptions{}
66 mw = opencensus.TraceEndpoint(span2, opencensus.WithEndpointConfig(opts))
67 mw(passEndpoint)(ctx, err1)
68
69 // span3
70 mw = opencensus.TraceEndpoint(span3)
71 ep := lb.Retry(5, 1*time.Second, lb.NewRoundRobin(sd.FixedEndpointer{passEndpoint}))
72 mw(ep)(ctx, err2)
73
74 // span4
75 mw = opencensus.TraceEndpoint(span4)
76 mw(passEndpoint)(ctx, failedResponse{err: err3})
77
78 // span4
79 mw = opencensus.TraceEndpoint(span5, opencensus.WithIgnoreBusinessError(true))
80 mw(passEndpoint)(ctx, failedResponse{err: err4})
81
82 // check span count
83 spans := e.Flush()
84 if want, have := 5, len(spans); want != have {
85 t.Fatalf("incorrected number of spans, wanted %d, got %d", want, have)
86 }
87
88 // test span 1
89 span := spans[0]
90 if want, have := int32(trace.StatusCodeOK), span.Code; want != have {
91 t.Errorf("incorrect status code, wanted %d, got %d", want, have)
92 }
93
94 if want, have := opencensus.TraceEndpointDefaultName, span.Name; want != have {
95 t.Errorf("incorrect span name, wanted %q, got %q", want, have)
96 }
97
98 if want, have := 2, len(span.Attributes); want != have {
99 t.Fatalf("incorrect attribute count, wanted %d, got %d", want, have)
100 }
101
102 // test span 2
103 span = spans[1]
104 if want, have := int32(trace.StatusCodeUnknown), span.Code; want != have {
105 t.Errorf("incorrect status code, wanted %d, got %d", want, have)
106 }
107
108 if want, have := span2, span.Name; want != have {
109 t.Errorf("incorrect span name, wanted %q, got %q", want, have)
110 }
111
112 if want, have := 0, len(span.Attributes); want != have {
113 t.Fatalf("incorrect attribute count, wanted %d, got %d", want, have)
114 }
115
116 // test span 3
117 span = spans[2]
118 if want, have := int32(trace.StatusCodeUnknown), span.Code; want != have {
119 t.Errorf("incorrect status code, wanted %d, got %d", want, have)
120 }
121
122 if want, have := span3, span.Name; want != have {
123 t.Errorf("incorrect span name, wanted %q, got %q", want, have)
124 }
125
126 if want, have := 5, len(span.Attributes); want != have {
127 t.Fatalf("incorrect attribute count, wanted %d, got %d", want, have)
128 }
129
130 // test span 4
131 span = spans[3]
132 if want, have := int32(trace.StatusCodeUnknown), span.Code; want != have {
133 t.Errorf("incorrect status code, wanted %d, got %d", want, have)
134 }
135
136 if want, have := span4, span.Name; want != have {
137 t.Errorf("incorrect span name, wanted %q, got %q", want, have)
138 }
139
140 if want, have := 1, len(span.Attributes); want != have {
141 t.Fatalf("incorrect attribute count, wanted %d, got %d", want, have)
142 }
143
144 // test span 5
145 span = spans[4]
146 if want, have := int32(trace.StatusCodeOK), span.Code; want != have {
147 t.Errorf("incorrect status code, wanted %d, got %d", want, have)
148 }
149
150 if want, have := span5, span.Name; want != have {
151 t.Errorf("incorrect span name, wanted %q, got %q", want, have)
152 }
153
154 if want, have := 1, len(span.Attributes); want != have {
155 t.Fatalf("incorrect attribute count, wanted %d, got %d", want, have)
156 }
157
158 }
0 package opencensus
1
2 import (
3 "context"
4
5 "go.opencensus.io/trace"
6 "go.opencensus.io/trace/propagation"
7 "google.golang.org/grpc/codes"
8 "google.golang.org/grpc/metadata"
9 "google.golang.org/grpc/status"
10
11 kitgrpc "github.com/go-kit/kit/transport/grpc"
12 )
13
14 const propagationKey = "grpc-trace-bin"
15
16 // GRPCClientTrace enables OpenCensus tracing of a Go kit gRPC transport client.
17 func GRPCClientTrace(options ...TracerOption) kitgrpc.ClientOption {
18 cfg := TracerOptions{
19 sampler: trace.AlwaysSample(),
20 }
21
22 for _, option := range options {
23 option(&cfg)
24 }
25
26 clientBefore := kitgrpc.ClientBefore(
27 func(ctx context.Context, md *metadata.MD) context.Context {
28 var name string
29
30 if cfg.name != "" {
31 name = cfg.name
32 } else {
33 name = ctx.Value(kitgrpc.ContextKeyRequestMethod).(string)
34 }
35
36 span := trace.NewSpan(
37 name,
38 trace.FromContext(ctx),
39 trace.StartOptions{
40 Sampler: cfg.sampler,
41 SpanKind: trace.SpanKindClient,
42 },
43 )
44
45 if !cfg.public {
46 traceContextBinary := string(propagation.Binary(span.SpanContext()))
47 (*md)[propagationKey] = append((*md)[propagationKey], traceContextBinary)
48 }
49
50 return trace.NewContext(ctx, span)
51 },
52 )
53
54 clientFinalizer := kitgrpc.ClientFinalizer(
55 func(ctx context.Context, err error) {
56 if span := trace.FromContext(ctx); span != nil {
57 s, ok := status.FromError(err)
58 if ok {
59 span.SetStatus(trace.Status{Code: int32(s.Code()), Message: s.Message()})
60 } else {
61 span.SetStatus(trace.Status{Code: int32(codes.Unknown), Message: err.Error()})
62 }
63 span.End()
64 }
65 },
66 )
67
68 return func(c *kitgrpc.Client) {
69 clientBefore(c)
70 clientFinalizer(c)
71 }
72
73 }
74
75 // GRPCServerTrace enables OpenCensus tracing of a Go kit gRPC transport server.
76 func GRPCServerTrace(options ...TracerOption) kitgrpc.ServerOption {
77 cfg := TracerOptions{}
78
79 for _, option := range options {
80 option(&cfg)
81 }
82
83 serverBefore := kitgrpc.ServerBefore(
84 func(ctx context.Context, md metadata.MD) context.Context {
85 var (
86 spanContext trace.SpanContext
87 ok bool
88 name string
89 )
90
91 if cfg.name != "" {
92 name = cfg.name
93 } else {
94 name, ok = ctx.Value(kitgrpc.ContextKeyRequestMethod).(string)
95 if !ok || name == "" {
96 // we can't find the gRPC method. probably the
97 // unaryInterceptor was not wired up.
98 name = "unknown grpc method"
99 }
100 }
101
102 traceContext := md[propagationKey]
103
104 if len(traceContext) > 0 {
105 traceContextBinary := []byte(traceContext[0])
106 spanContext, ok = propagation.FromBinary(traceContextBinary)
107 if ok && !cfg.public {
108 ctx, _ = trace.StartSpanWithRemoteParent(
109 ctx,
110 name,
111 spanContext,
112 trace.WithSpanKind(trace.SpanKindServer),
113 trace.WithSampler(cfg.sampler),
114 )
115 return ctx
116 }
117 }
118 ctx, span := trace.StartSpan(
119 ctx,
120 name,
121 trace.WithSpanKind(trace.SpanKindServer),
122 trace.WithSampler(cfg.sampler),
123 )
124 if ok {
125 span.AddLink(
126 trace.Link{
127 TraceID: spanContext.TraceID,
128 SpanID: spanContext.SpanID,
129 Type: trace.LinkTypeChild,
130 },
131 )
132 }
133 return ctx
134 },
135 )
136
137 serverFinalizer := kitgrpc.ServerFinalizer(
138 func(ctx context.Context, err error) {
139 if span := trace.FromContext(ctx); span != nil {
140 s, ok := status.FromError(err)
141 if ok {
142 span.SetStatus(trace.Status{Code: int32(s.Code()), Message: s.Message()})
143 } else {
144 span.SetStatus(trace.Status{Code: int32(codes.Internal), Message: err.Error()})
145 }
146 span.End()
147 }
148 },
149 )
150
151 return func(s *kitgrpc.Server) {
152 serverBefore(s)
153 serverFinalizer(s)
154 }
155 }
0 package opencensus_test
1
2 import (
3 "context"
4 "errors"
5 "testing"
6
7 "go.opencensus.io/trace"
8 "go.opencensus.io/trace/propagation"
9 "google.golang.org/grpc"
10 "google.golang.org/grpc/codes"
11 "google.golang.org/grpc/metadata"
12
13 "github.com/go-kit/kit/endpoint"
14 ockit "github.com/go-kit/kit/tracing/opencensus"
15 grpctransport "github.com/go-kit/kit/transport/grpc"
16 )
17
18 type dummy struct{}
19
20 const traceContextKey = "grpc-trace-bin"
21
22 func unaryInterceptor(
23 ctx context.Context, method string, req, reply interface{},
24 cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption,
25 ) error {
26 return nil
27 }
28
29 func TestGRPCClientTrace(t *testing.T) {
30 rec := &recordingExporter{}
31
32 trace.RegisterExporter(rec)
33
34 cc, err := grpc.Dial(
35 "",
36 grpc.WithUnaryInterceptor(unaryInterceptor),
37 grpc.WithInsecure(),
38 )
39 if err != nil {
40 t.Fatalf("unable to create gRPC dialer: %s", err.Error())
41 }
42
43 traces := []struct {
44 name string
45 err error
46 }{
47 {"", nil},
48 {"CustomName", nil},
49 {"", errors.New("dummy-error")},
50 }
51
52 for _, tr := range traces {
53 clientTracer := ockit.GRPCClientTrace(ockit.WithName(tr.name))
54
55 ep := grpctransport.NewClient(
56 cc,
57 "dummyService",
58 "dummyMethod",
59 func(context.Context, interface{}) (interface{}, error) {
60 return nil, nil
61 },
62 func(context.Context, interface{}) (interface{}, error) {
63 return nil, tr.err
64 },
65 dummy{},
66 clientTracer,
67 ).Endpoint()
68
69 ctx, parentSpan := trace.StartSpan(context.Background(), "test")
70
71 _, err = ep(ctx, nil)
72 if want, have := tr.err, err; want != have {
73 t.Fatalf("unexpected error, want %s, have %s", tr.err.Error(), err.Error())
74 }
75
76 spans := rec.Flush()
77 if want, have := 1, len(spans); want != have {
78 t.Fatalf("incorrect number of spans, want %d, have %d", want, have)
79 }
80 span := spans[0]
81 if want, have := parentSpan.SpanContext().SpanID, span.ParentSpanID; want != have {
82 t.Errorf("incorrect parent ID, want %s, have %s", want, have)
83 }
84
85 if want, have := tr.name, span.Name; want != have && want != "" {
86 t.Errorf("incorrect span name, want %s, have %s", want, have)
87 }
88
89 if want, have := "/dummyService/dummyMethod", span.Name; want != have && tr.name == "" {
90 t.Errorf("incorrect span name, want %s, have %s", want, have)
91 }
92
93 code := trace.StatusCodeOK
94 if tr.err != nil {
95 code = trace.StatusCodeUnknown
96
97 if want, have := err.Error(), span.Status.Message; want != have {
98 t.Errorf("incorrect span status msg, want %s, have %s", want, have)
99 }
100 }
101
102 if want, have := int32(code), span.Status.Code; want != have {
103 t.Errorf("incorrect span status code, want %d, have %d", want, have)
104 }
105 }
106 }
107
108 func TestGRPCServerTrace(t *testing.T) {
109 rec := &recordingExporter{}
110
111 trace.RegisterExporter(rec)
112
113 traces := []struct {
114 useParent bool
115 name string
116 err error
117 }{
118 {false, "", nil},
119 {true, "", nil},
120 {true, "CustomName", nil},
121 {true, "", errors.New("dummy-error")},
122 }
123
124 for _, tr := range traces {
125 var (
126 ctx = context.Background()
127 parentSpan *trace.Span
128 )
129
130 server := grpctransport.NewServer(
131 endpoint.Nop,
132 func(context.Context, interface{}) (interface{}, error) {
133 return nil, nil
134 },
135 func(context.Context, interface{}) (interface{}, error) {
136 return nil, tr.err
137 },
138 ockit.GRPCServerTrace(ockit.WithName(tr.name)),
139 )
140
141 if tr.useParent {
142 _, parentSpan = trace.StartSpan(context.Background(), "test")
143 traceContextBinary := propagation.Binary(parentSpan.SpanContext())
144
145 md := metadata.MD{}
146 md.Set(traceContextKey, string(traceContextBinary))
147 ctx = metadata.NewIncomingContext(ctx, md)
148 }
149
150 server.ServeGRPC(ctx, nil)
151
152 spans := rec.Flush()
153
154 if want, have := 1, len(spans); want != have {
155 t.Fatalf("incorrect number of spans, want %d, have %d", want, have)
156 }
157
158 if tr.useParent {
159 if want, have := parentSpan.SpanContext().TraceID, spans[0].TraceID; want != have {
160 t.Errorf("incorrect trace ID, want %s, have %s", want, have)
161 }
162
163 if want, have := parentSpan.SpanContext().SpanID, spans[0].ParentSpanID; want != have {
164 t.Errorf("incorrect span ID, want %s, have %s", want, have)
165 }
166 }
167
168 if want, have := tr.name, spans[0].Name; want != have && want != "" {
169 t.Errorf("incorrect span name, want %s, have %s", want, have)
170 }
171
172 if tr.err != nil {
173 if want, have := int32(codes.Internal), spans[0].Status.Code; want != have {
174 t.Errorf("incorrect span status code, want %d, have %d", want, have)
175 }
176
177 if want, have := tr.err.Error(), spans[0].Status.Message; want != have {
178 t.Errorf("incorrect span status message, want %s, have %s", want, have)
179 }
180 }
181 }
182 }
0 package opencensus
1
2 import (
3 "context"
4 "net/http"
5
6 "go.opencensus.io/plugin/ochttp"
7 "go.opencensus.io/plugin/ochttp/propagation/b3"
8 "go.opencensus.io/trace"
9
10 kithttp "github.com/go-kit/kit/transport/http"
11 )
12
13 // HTTPClientTrace enables OpenCensus tracing of a Go kit HTTP transport client.
14 func HTTPClientTrace(options ...TracerOption) kithttp.ClientOption {
15 cfg := TracerOptions{
16 sampler: trace.AlwaysSample(),
17 httpPropagate: &b3.HTTPFormat{},
18 }
19
20 for _, option := range options {
21 option(&cfg)
22 }
23
24 clientBefore := kithttp.ClientBefore(
25 func(ctx context.Context, req *http.Request) context.Context {
26 var name string
27
28 if cfg.name != "" {
29 name = cfg.name
30 } else {
31 // OpenCensus states Path being default naming for a client span
32 name = req.Method + " " + req.URL.Path
33 }
34
35 span := trace.NewSpan(
36 name,
37 trace.FromContext(ctx),
38 trace.StartOptions{
39 Sampler: cfg.sampler,
40 SpanKind: trace.SpanKindClient,
41 },
42 )
43
44 span.AddAttributes(
45 trace.StringAttribute(ochttp.HostAttribute, req.URL.Host),
46 trace.StringAttribute(ochttp.MethodAttribute, req.Method),
47 trace.StringAttribute(ochttp.PathAttribute, req.URL.Path),
48 trace.StringAttribute(ochttp.UserAgentAttribute, req.UserAgent()),
49 )
50
51 if !cfg.public {
52 cfg.httpPropagate.SpanContextToRequest(span.SpanContext(), req)
53 }
54
55 return trace.NewContext(ctx, span)
56 },
57 )
58
59 clientAfter := kithttp.ClientAfter(
60 func(ctx context.Context, res *http.Response) context.Context {
61 if span := trace.FromContext(ctx); span != nil {
62 span.SetStatus(ochttp.TraceStatus(res.StatusCode, http.StatusText(res.StatusCode)))
63 span.AddAttributes(
64 trace.Int64Attribute(ochttp.StatusCodeAttribute, int64(res.StatusCode)),
65 )
66 }
67 return ctx
68 },
69 )
70
71 clientFinalizer := kithttp.ClientFinalizer(
72 func(ctx context.Context, err error) {
73 if span := trace.FromContext(ctx); span != nil {
74 if err != nil {
75 span.SetStatus(trace.Status{
76 Code: trace.StatusCodeUnknown,
77 Message: err.Error(),
78 })
79 }
80 span.End()
81 }
82 },
83 )
84
85 return func(c *kithttp.Client) {
86 clientBefore(c)
87 clientAfter(c)
88 clientFinalizer(c)
89 }
90 }
91
92 // HTTPServerTrace enables OpenCensus tracing of a Go kit HTTP transport server.
93 func HTTPServerTrace(options ...TracerOption) kithttp.ServerOption {
94 cfg := TracerOptions{
95 sampler: trace.AlwaysSample(),
96 httpPropagate: &b3.HTTPFormat{},
97 }
98
99 for _, option := range options {
100 option(&cfg)
101 }
102
103 serverBefore := kithttp.ServerBefore(
104 func(ctx context.Context, req *http.Request) context.Context {
105 var (
106 spanContext trace.SpanContext
107 span *trace.Span
108 name string
109 ok bool
110 )
111
112 if cfg.name != "" {
113 name = cfg.name
114 } else {
115 name = req.Method + " " + req.URL.Path
116 }
117
118 spanContext, ok = cfg.httpPropagate.SpanContextFromRequest(req)
119 if ok && !cfg.public {
120 ctx, span = trace.StartSpanWithRemoteParent(
121 ctx,
122 name,
123 spanContext,
124 trace.WithSpanKind(trace.SpanKindServer),
125 trace.WithSampler(cfg.sampler),
126 )
127 } else {
128 ctx, span = trace.StartSpan(
129 ctx,
130 name,
131 trace.WithSpanKind(trace.SpanKindServer),
132 trace.WithSampler(cfg.sampler),
133 )
134 if ok {
135 span.AddLink(trace.Link{
136 TraceID: spanContext.TraceID,
137 SpanID: spanContext.SpanID,
138 Type: trace.LinkTypeChild,
139 Attributes: nil,
140 })
141 }
142 }
143
144 span.AddAttributes(
145 trace.StringAttribute(ochttp.MethodAttribute, req.Method),
146 trace.StringAttribute(ochttp.PathAttribute, req.URL.Path),
147 )
148
149 return ctx
150 },
151 )
152
153 serverFinalizer := kithttp.ServerFinalizer(
154 func(ctx context.Context, code int, r *http.Request) {
155 if span := trace.FromContext(ctx); span != nil {
156 span.SetStatus(ochttp.TraceStatus(code, http.StatusText(code)))
157
158 if rs, ok := ctx.Value(kithttp.ContextKeyResponseSize).(int64); ok {
159 span.AddAttributes(
160 trace.Int64Attribute("http.response_size", rs),
161 )
162 }
163
164 span.End()
165 }
166 },
167 )
168
169 return func(s *kithttp.Server) {
170 serverBefore(s)
171 serverFinalizer(s)
172 }
173 }
0 package opencensus_test
1
2 import (
3 "context"
4 "errors"
5 "net/http"
6 "net/http/httptest"
7 "net/url"
8 "testing"
9
10 "go.opencensus.io/plugin/ochttp"
11 "go.opencensus.io/plugin/ochttp/propagation/b3"
12 "go.opencensus.io/plugin/ochttp/propagation/tracecontext"
13 "go.opencensus.io/trace"
14 "go.opencensus.io/trace/propagation"
15
16 "github.com/go-kit/kit/endpoint"
17 ockit "github.com/go-kit/kit/tracing/opencensus"
18 kithttp "github.com/go-kit/kit/transport/http"
19 )
20
21 func TestHttpClientTrace(t *testing.T) {
22 var (
23 err error
24 rec = &recordingExporter{}
25 rURL, _ = url.Parse("http://test.com/dummy/path")
26 )
27
28 trace.RegisterExporter(rec)
29
30 traces := []struct {
31 name string
32 err error
33 }{
34 {"", nil},
35 {"CustomName", nil},
36 {"", errors.New("dummy-error")},
37 }
38
39 for _, tr := range traces {
40
41 clientTracer := ockit.HTTPClientTrace(ockit.WithName(tr.name))
42 ep := kithttp.NewClient(
43 "GET",
44 rURL,
45 func(ctx context.Context, r *http.Request, i interface{}) error {
46 return nil
47 },
48 func(ctx context.Context, r *http.Response) (response interface{}, err error) {
49 return nil, tr.err
50 },
51 clientTracer,
52 ).Endpoint()
53
54 ctx, parentSpan := trace.StartSpan(context.Background(), "test")
55
56 _, err = ep(ctx, nil)
57 if want, have := tr.err, err; want != have {
58 t.Fatalf("unexpected error, want %s, have %s", tr.err.Error(), err.Error())
59 }
60
61 spans := rec.Flush()
62 if want, have := 1, len(spans); want != have {
63 t.Fatalf("incorrect number of spans, want %d, have %d", want, have)
64 }
65
66 span := spans[0]
67 if want, have := parentSpan.SpanContext().SpanID, span.ParentSpanID; want != have {
68 t.Errorf("incorrect parent ID, want %s, have %s", want, have)
69 }
70
71 if want, have := tr.name, span.Name; want != have && want != "" {
72 t.Errorf("incorrect span name, want %s, have %s", want, have)
73 }
74
75 if want, have := "GET /dummy/path", span.Name; want != have && tr.name == "" {
76 t.Errorf("incorrect span name, want %s, have %s", want, have)
77 }
78
79 code := trace.StatusCodeOK
80 if tr.err != nil {
81 code = trace.StatusCodeUnknown
82
83 if want, have := err.Error(), span.Status.Message; want != have {
84 t.Errorf("incorrect span status msg, want %s, have %s", want, have)
85 }
86 }
87
88 if want, have := int32(code), span.Status.Code; want != have {
89 t.Errorf("incorrect span status code, want %d, have %d", want, have)
90 }
91 }
92 }
93
94 func TestHTTPServerTrace(t *testing.T) {
95 rec := &recordingExporter{}
96
97 trace.RegisterExporter(rec)
98
99 traces := []struct {
100 useParent bool
101 name string
102 err error
103 propagation propagation.HTTPFormat
104 }{
105 {false, "", nil, nil},
106 {true, "", nil, nil},
107 {true, "CustomName", nil, &b3.HTTPFormat{}},
108 {true, "", errors.New("dummy-error"), &tracecontext.HTTPFormat{}},
109 }
110
111 for _, tr := range traces {
112 var client http.Client
113
114 handler := kithttp.NewServer(
115 endpoint.Nop,
116 func(context.Context, *http.Request) (interface{}, error) { return nil, nil },
117 func(context.Context, http.ResponseWriter, interface{}) error { return errors.New("dummy") },
118 ockit.HTTPServerTrace(
119 ockit.WithName(tr.name),
120 ockit.WithHTTPPropagation(tr.propagation),
121 ),
122 )
123
124 server := httptest.NewServer(handler)
125 defer server.Close()
126
127 const httpMethod = "GET"
128
129 req, err := http.NewRequest(httpMethod, server.URL, nil)
130 if err != nil {
131 t.Fatalf("unable to create HTTP request: %s", err.Error())
132 }
133
134 if tr.useParent {
135 client = http.Client{
136 Transport: &ochttp.Transport{
137 Propagation: tr.propagation,
138 },
139 }
140 }
141
142 resp, err := client.Do(req.WithContext(context.Background()))
143 if err != nil {
144 t.Fatalf("unable to send HTTP request: %s", err.Error())
145 }
146 resp.Body.Close()
147
148 spans := rec.Flush()
149
150 expectedSpans := 1
151 if tr.useParent {
152 expectedSpans++
153 }
154
155 if want, have := expectedSpans, len(spans); want != have {
156 t.Fatalf("incorrect number of spans, want %d, have %d", want, have)
157 }
158
159 if tr.useParent {
160 if want, have := spans[1].TraceID, spans[0].TraceID; want != have {
161 t.Errorf("incorrect trace ID, want %s, have %s", want, have)
162 }
163
164 if want, have := spans[1].SpanID, spans[0].ParentSpanID; want != have {
165 t.Errorf("incorrect span ID, want %s, have %s", want, have)
166 }
167 }
168
169 if want, have := tr.name, spans[0].Name; want != have && want != "" {
170 t.Errorf("incorrect span name, want %s, have %s", want, have)
171 }
172
173 if want, have := "GET /", spans[0].Name; want != have && tr.name == "" {
174 t.Errorf("incorrect span name, want %s, have %s", want, have)
175 }
176 }
177 }
0 package opencensus_test
1
2 import (
3 "sync"
4
5 "go.opencensus.io/trace"
6 )
7
8 type recordingExporter struct {
9 mu sync.Mutex
10 data []*trace.SpanData
11 }
12
13 func (e *recordingExporter) ExportSpan(d *trace.SpanData) {
14 e.mu.Lock()
15 defer e.mu.Unlock()
16
17 e.data = append(e.data, d)
18 }
19
20 func (e *recordingExporter) Flush() (data []*trace.SpanData) {
21 e.mu.Lock()
22 defer e.mu.Unlock()
23
24 data = e.data
25 e.data = nil
26 return
27 }
0 package opencensus
1
2 import (
3 "go.opencensus.io/plugin/ochttp/propagation/b3"
4 "go.opencensus.io/trace"
5 "go.opencensus.io/trace/propagation"
6 )
7
8 // defaultHTTPPropagate holds OpenCensus' default HTTP propagation format which
9 // currently is Zipkin's B3.
10 var defaultHTTPPropagate propagation.HTTPFormat = &b3.HTTPFormat{}
11
12 // TracerOption allows for functional options to our OpenCensus tracing
13 // middleware.
14 type TracerOption func(o *TracerOptions)
15
16 // WithTracerConfig sets all configuration options at once.
17 func WithTracerConfig(options TracerOptions) TracerOption {
18 return func(o *TracerOptions) {
19 *o = options
20 }
21 }
22
23 // WithSampler sets the sampler to use by our OpenCensus Tracer.
24 func WithSampler(sampler trace.Sampler) TracerOption {
25 return func(o *TracerOptions) {
26 o.sampler = sampler
27 }
28 }
29
30 // WithName sets the name for an instrumented transport endpoint. If name is omitted
31 // at tracing middleware creation, the method of the transport or transport rpc
32 // name is used.
33 func WithName(name string) TracerOption {
34 return func(o *TracerOptions) {
35 o.name = name
36 }
37 }
38
39 // IsPublic should be set to true for publicly accessible servers and for
40 // clients that should not propagate their current trace metadata.
41 // On the server side a new trace will always be started regardless of any
42 // trace metadata being found in the incoming request. If any trace metadata
43 // is found, it will be added as a linked trace instead.
44 func IsPublic(isPublic bool) TracerOption {
45 return func(o *TracerOptions) {
46 o.public = isPublic
47 }
48 }
49
50 // WithHTTPPropagation sets the propagation handlers for the HTTP transport
51 // middlewares. If used on a non HTTP transport this is a noop.
52 func WithHTTPPropagation(p propagation.HTTPFormat) TracerOption {
53 return func(o *TracerOptions) {
54 if p == nil {
55 // reset to default OC HTTP format
56 o.httpPropagate = defaultHTTPPropagate
57 return
58 }
59 o.httpPropagate = p
60 }
61 }
62
63 // TracerOptions holds configuration for our tracing middlewares
64 type TracerOptions struct {
65 sampler trace.Sampler
66 name string
67 public bool
68 httpPropagate propagation.HTTPFormat
69 }