Codebase list golang-github-go-kit-kit / 1e9fdf6
add support for JSONRPC opencensus tracing (#1022) Ryan Lang authored 3 years ago GitHub committed 3 years ago
6 changed file(s) with 448 addition(s) and 2 deletion(s). Raw diff Collapse all Expand all
0 package opencensus
1
2 import (
3 "context"
4 "net/http"
5
6 "go.opencensus.io/plugin/ochttp"
7 "go.opencensus.io/plugin/ochttp/propagation/b3"
8 "go.opencensus.io/trace"
9
10 kithttp "github.com/go-kit/kit/transport/http"
11 jsonrpc "github.com/go-kit/kit/transport/http/jsonrpc"
12 )
13
14 // JSONRPCClientTrace enables OpenCensus tracing of a Go kit JSONRPC transport client.
15 func JSONRPCClientTrace(options ...TracerOption) jsonrpc.ClientOption {
16 cfg := TracerOptions{}
17
18 for _, option := range options {
19 option(&cfg)
20 }
21
22 if !cfg.Public && cfg.HTTPPropagate == nil {
23 cfg.HTTPPropagate = &b3.HTTPFormat{}
24 }
25
26 clientBefore := jsonrpc.ClientBefore(
27 func(ctx context.Context, req *http.Request) context.Context {
28 var name string
29
30 if cfg.Name != "" {
31 name = cfg.Name
32 } else {
33 // OpenCensus states Path being default naming for a client span
34 name = ctx.Value(jsonrpc.ContextKeyRequestMethod).(string)
35 }
36
37 ctx, span := trace.StartSpan(
38 ctx,
39 name,
40 trace.WithSampler(cfg.Sampler),
41 trace.WithSpanKind(trace.SpanKindClient),
42 )
43
44 span.AddAttributes(
45 trace.StringAttribute(ochttp.HostAttribute, req.URL.Host),
46 trace.StringAttribute(ochttp.MethodAttribute, req.Method),
47 trace.StringAttribute(ochttp.PathAttribute, req.URL.Path),
48 trace.StringAttribute(ochttp.UserAgentAttribute, req.UserAgent()),
49 )
50
51 if !cfg.Public {
52 cfg.HTTPPropagate.SpanContextToRequest(span.SpanContext(), req)
53 }
54
55 return ctx
56 },
57 )
58
59 clientAfter := jsonrpc.ClientAfter(
60 func(ctx context.Context, res *http.Response) context.Context {
61 if span := trace.FromContext(ctx); span != nil {
62 span.SetStatus(ochttp.TraceStatus(res.StatusCode, http.StatusText(res.StatusCode)))
63 span.AddAttributes(
64 trace.Int64Attribute(ochttp.StatusCodeAttribute, int64(res.StatusCode)),
65 )
66 }
67 return ctx
68 },
69 )
70
71 clientFinalizer := jsonrpc.ClientFinalizer(
72 func(ctx context.Context, err error) {
73 if span := trace.FromContext(ctx); span != nil {
74 if err != nil {
75 span.SetStatus(trace.Status{
76 Code: trace.StatusCodeUnknown,
77 Message: err.Error(),
78 })
79 }
80 span.End()
81 }
82 },
83 )
84
85 return func(c *jsonrpc.Client) {
86 clientBefore(c)
87 clientAfter(c)
88 clientFinalizer(c)
89 }
90 }
91
92 // JSONRPCServerTrace enables OpenCensus tracing of a Go kit JSONRPC transport server.
93 func JSONRPCServerTrace(options ...TracerOption) jsonrpc.ServerOption {
94 cfg := TracerOptions{}
95
96 for _, option := range options {
97 option(&cfg)
98 }
99
100 if !cfg.Public && cfg.HTTPPropagate == nil {
101 cfg.HTTPPropagate = &b3.HTTPFormat{}
102 }
103
104 serverBeforeCodec := jsonrpc.ServerBeforeCodec(
105 func(ctx context.Context, httpReq *http.Request, req jsonrpc.Request) context.Context {
106 var (
107 spanContext trace.SpanContext
108 span *trace.Span
109 name string
110 ok bool
111 )
112
113 if cfg.Name != "" {
114 name = cfg.Name
115 } else {
116 name = ctx.Value(jsonrpc.ContextKeyRequestMethod).(string)
117 if name == "" {
118 // we can't find the rpc method. probably the
119 // unaryInterceptor was not wired up.
120 name = "unknown jsonrpc method"
121 }
122 }
123
124 spanContext, ok = cfg.HTTPPropagate.SpanContextFromRequest(httpReq)
125 if ok && !cfg.Public {
126 ctx, span = trace.StartSpanWithRemoteParent(
127 ctx,
128 name,
129 spanContext,
130 trace.WithSpanKind(trace.SpanKindServer),
131 trace.WithSampler(cfg.Sampler),
132 )
133 } else {
134 ctx, span = trace.StartSpan(
135 ctx,
136 name,
137 trace.WithSpanKind(trace.SpanKindServer),
138 trace.WithSampler(cfg.Sampler),
139 )
140 if ok {
141 span.AddLink(trace.Link{
142 TraceID: spanContext.TraceID,
143 SpanID: spanContext.SpanID,
144 Type: trace.LinkTypeChild,
145 Attributes: nil,
146 })
147 }
148 }
149
150 span.AddAttributes(
151 trace.StringAttribute(ochttp.MethodAttribute, httpReq.Method),
152 trace.StringAttribute(ochttp.PathAttribute, httpReq.URL.Path),
153 )
154
155 return ctx
156 },
157 )
158
159 serverFinalizer := jsonrpc.ServerFinalizer(
160 func(ctx context.Context, code int, r *http.Request) {
161 if span := trace.FromContext(ctx); span != nil {
162 span.SetStatus(ochttp.TraceStatus(code, http.StatusText(code)))
163
164 if rs, ok := ctx.Value(kithttp.ContextKeyResponseSize).(int64); ok {
165 span.AddAttributes(
166 trace.Int64Attribute("http.response_size", rs),
167 )
168 }
169
170 span.End()
171 }
172 },
173 )
174
175 return func(s *jsonrpc.Server) {
176 serverBeforeCodec(s)
177 serverFinalizer(s)
178 }
179 }
0 package opencensus_test
1
2 import (
3 "bytes"
4 "context"
5 "encoding/json"
6 "errors"
7 "fmt"
8 "net/http"
9 "net/http/httptest"
10 "net/url"
11 "testing"
12
13 "go.opencensus.io/plugin/ochttp"
14 "go.opencensus.io/plugin/ochttp/propagation/b3"
15 "go.opencensus.io/plugin/ochttp/propagation/tracecontext"
16 "go.opencensus.io/trace"
17 "go.opencensus.io/trace/propagation"
18
19 "github.com/go-kit/kit/endpoint"
20 ockit "github.com/go-kit/kit/tracing/opencensus"
21 jsonrpc "github.com/go-kit/kit/transport/http/jsonrpc"
22 )
23
24 func TestJSONRPCClientTrace(t *testing.T) {
25 var (
26 err error
27 rec = &recordingExporter{}
28 rURL, _ = url.Parse("https://httpbin.org/anything")
29 endpointName = "DummyEndpoint"
30 )
31
32 trace.RegisterExporter(rec)
33
34 traces := []struct {
35 name string
36 err error
37 }{
38 {"", nil},
39 {"CustomName", nil},
40 {"", errors.New("dummy-error")},
41 }
42
43 for _, tr := range traces {
44 clientTracer := ockit.JSONRPCClientTrace(
45 ockit.WithName(tr.name),
46 ockit.WithSampler(trace.AlwaysSample()),
47 )
48 ep := jsonrpc.NewClient(
49 rURL,
50 endpointName,
51 jsonrpc.ClientRequestEncoder(func(ctx context.Context, i interface{}) (json.RawMessage, error) {
52 return json.RawMessage(`{}`), nil
53 }),
54 jsonrpc.ClientResponseDecoder(func(ctx context.Context, r jsonrpc.Response) (response interface{}, err error) {
55 return nil, tr.err
56 }),
57 clientTracer,
58 ).Endpoint()
59
60 ctx, parentSpan := trace.StartSpan(context.Background(), "test")
61
62 _, err = ep(ctx, nil)
63 if want, have := tr.err, err; want != have {
64 t.Fatalf("unexpected error, want %s, have %s", tr.err.Error(), err.Error())
65 }
66
67 spans := rec.Flush()
68 if want, have := 1, len(spans); want != have {
69 t.Fatalf("incorrect number of spans, want %d, have %d", want, have)
70 }
71
72 span := spans[0]
73 if want, have := parentSpan.SpanContext().SpanID, span.ParentSpanID; want != have {
74 t.Errorf("incorrect parent ID, want %s, have %s", want, have)
75 }
76
77 if want, have := tr.name, span.Name; want != have && want != "" {
78 t.Errorf("incorrect span name, want %s, have %s", want, have)
79 }
80
81 if want, have := endpointName, span.Name; want != have && tr.name == "" {
82 t.Errorf("incorrect span name, want %s, have %s", want, have)
83 }
84
85 code := trace.StatusCodeOK
86 if tr.err != nil {
87 code = trace.StatusCodeUnknown
88
89 if want, have := err.Error(), span.Status.Message; want != have {
90 t.Errorf("incorrect span status msg, want %s, have %s", want, have)
91 }
92 }
93
94 if want, have := int32(code), span.Status.Code; want != have {
95 t.Errorf("incorrect span status code, want %d, have %d", want, have)
96 }
97 }
98 }
99
100 func TestJSONRPCServerTrace(t *testing.T) {
101 var (
102 endpointName = "DummyEndpoint"
103 rec = &recordingExporter{}
104 )
105
106 trace.RegisterExporter(rec)
107
108 traces := []struct {
109 useParent bool
110 name string
111 err error
112 propagation propagation.HTTPFormat
113 }{
114 {false, "", nil, nil},
115 {true, "", nil, nil},
116 {true, "CustomName", nil, &b3.HTTPFormat{}},
117 {true, "", errors.New("dummy-error"), &tracecontext.HTTPFormat{}},
118 }
119
120 for _, tr := range traces {
121 var client http.Client
122
123 handler := jsonrpc.NewServer(
124 jsonrpc.EndpointCodecMap{
125 endpointName: jsonrpc.EndpointCodec{
126 Endpoint: endpoint.Nop,
127 Decode: func(context.Context, json.RawMessage) (interface{}, error) { return nil, nil },
128 Encode: func(context.Context, interface{}) (json.RawMessage, error) { return nil, tr.err },
129 },
130 },
131 ockit.JSONRPCServerTrace(
132 ockit.WithName(tr.name),
133 ockit.WithSampler(trace.AlwaysSample()),
134 ockit.WithHTTPPropagation(tr.propagation),
135 ),
136 )
137
138 server := httptest.NewServer(handler)
139 defer server.Close()
140
141 jsonStr := []byte(fmt.Sprintf(`{"method":"%s"}`, endpointName))
142 req, err := http.NewRequest("POST", server.URL, bytes.NewBuffer(jsonStr))
143 if err != nil {
144 t.Fatalf("unable to create JSONRPC request: %s", err.Error())
145 }
146
147 if tr.useParent {
148 client = http.Client{
149 Transport: &ochttp.Transport{
150 StartOptions: trace.StartOptions{
151 Sampler: trace.AlwaysSample(),
152 },
153 Propagation: tr.propagation,
154 },
155 }
156 }
157
158 resp, err := client.Do(req.WithContext(context.Background()))
159 if err != nil {
160 t.Fatalf("unable to send JSONRPC request: %s", err.Error())
161 }
162 resp.Body.Close()
163
164 spans := rec.Flush()
165
166 expectedSpans := 1
167 if tr.useParent {
168 expectedSpans++
169 }
170
171 if want, have := expectedSpans, len(spans); want != have {
172 t.Fatalf("incorrect number of spans, want %d, have %d", want, have)
173 }
174
175 if tr.useParent {
176 if want, have := spans[1].TraceID, spans[0].TraceID; want != have {
177 t.Errorf("incorrect trace ID, want %s, have %s", want, have)
178 }
179
180 if want, have := spans[1].SpanID, spans[0].ParentSpanID; want != have {
181 t.Errorf("incorrect span ID, want %s, have %s", want, have)
182 }
183 }
184
185 if want, have := tr.name, spans[0].Name; want != have && want != "" {
186 t.Errorf("incorrect span name, want %s, have %s", want, have)
187 }
188
189 if want, have := endpointName, spans[0].Name; want != have && tr.name == "" {
190 t.Errorf("incorrect span name, want %s, have %s", want, have)
191 }
192 }
193 }
158158 }()
159159 }
160160
161 ctx = context.WithValue(ctx, ContextKeyRequestMethod, c.method)
162
161163 var params json.RawMessage
162164 if params, err = c.enc(ctx, request); err != nil {
163165 return nil, err
206208 return nil, err
207209 }
208210
209 return c.dec(ctx, rpcRes)
211 response, err := c.dec(ctx, rpcRes)
212 if err != nil {
213 return nil, err
214 }
215
216 return response, nil
210217 }
211218 }
212219
00 package jsonrpc
11
2 import "encoding/json"
2 import (
3 "context"
4 "encoding/json"
5 "net/http"
6 )
37
48 // Request defines a JSON RPC request from the spec
59 // http://www.jsonrpc.org/specification#request_object
2428 stringValue string
2529 stringError error
2630 }
31
32 // RequestFunc may take information from decoded json body and place in
33 // request context. In Servers, RequestFuncs are executed after json is parsed
34 // but prior to invoking the codec
35 type RequestFunc func(context.Context, *http.Request, Request) context.Context
2736
2837 // UnmarshalJSON satisfies json.Unmarshaler
2938 func (id *RequestID) UnmarshalJSON(b []byte) error {
7887 // ContentType defines the content type to be served.
7988 ContentType string = "application/json; charset=utf-8"
8089 )
90
91 type contextKey int
92
93 const (
94 ContextKeyRequestMethod contextKey = iota
95 )
1818 type Server struct {
1919 ecm EndpointCodecMap
2020 before []httptransport.RequestFunc
21 beforeCodec []RequestFunc
2122 after []httptransport.ServerResponseFunc
2223 errorEncoder httptransport.ErrorEncoder
2324 finalizer httptransport.ServerFinalizerFunc
4950 return func(s *Server) { s.before = append(s.before, before...) }
5051 }
5152
53 // ServerBeforeCodec functions are executed after the JSON request body has been
54 // decoded, but before the method's decoder is called. This provides an opportunity
55 // for middleware to inspect the contents of the rpc request before being passed
56 // to the codec.
57 func ServerBeforeCodec(beforeCodec ...RequestFunc) ServerOption {
58 return func(s *Server) { s.beforeCodec = append(s.beforeCodec, beforeCodec...) }
59 }
60
5261 // ServerAfter functions are executed on the HTTP response writer after the
5362 // endpoint is invoked, but before anything is written to the client.
5463 func ServerAfter(after ...httptransport.ServerResponseFunc) ServerOption {
109118 }
110119
111120 ctx = context.WithValue(ctx, requestIDKey, req.ID)
121 ctx = context.WithValue(ctx, ContextKeyRequestMethod, req.Method)
122
123 for _, f := range s.beforeCodec {
124 ctx = f(ctx, r, req)
125 }
112126
113127 // Get the endpoint and codecs from the map using the method
114128 // defined in the JSON object
231231 }
232232 if r.Error != nil {
233233 t.Fatalf("Unxpected error on response: %s", buf)
234 }
235 }
236
237 func TestMultipleServerBeforeCodec(t *testing.T) {
238 var done = make(chan struct{})
239 ecm := jsonrpc.EndpointCodecMap{
240 "add": jsonrpc.EndpointCodec{
241 Endpoint: endpoint.Nop,
242 Decode: nopDecoder,
243 Encode: nopEncoder,
244 },
245 }
246 handler := jsonrpc.NewServer(
247 ecm,
248 jsonrpc.ServerBeforeCodec(func(ctx context.Context, r *http.Request, req jsonrpc.Request) context.Context {
249 ctx = context.WithValue(ctx, "one", 1)
250
251 return ctx
252 }),
253 jsonrpc.ServerBeforeCodec(func(ctx context.Context, r *http.Request, req jsonrpc.Request) context.Context {
254 if _, ok := ctx.Value("one").(int); !ok {
255 t.Error("Value was not set properly when multiple ServerBeforeCodecs are used")
256 }
257
258 close(done)
259 return ctx
260 }),
261 )
262 server := httptest.NewServer(handler)
263 defer server.Close()
264 http.Post(server.URL, "application/json", addBody()) // nolint
265
266 select {
267 case <-done:
268 case <-time.After(time.Second):
269 t.Fatal("timeout waiting for finalizer")
234270 }
235271 }
236272