Toward a modular (HTTP) client
Peter Bourgon
8 years ago
1 | 1 | |
2 | 2 | import ( |
3 | 3 | "bytes" |
4 | "encoding/json" | |
5 | 4 | "net/http" |
6 | 5 | "net/url" |
7 | 6 | "strings" |
8 | "time" | |
7 | ||
8 | "github.com/go-kit/kit/log" | |
9 | 9 | |
10 | 10 | "golang.org/x/net/context" |
11 | 11 | |
12 | "github.com/go-kit/kit/log" | |
13 | "github.com/go-kit/kit/tracing/zipkin" | |
12 | "github.com/go-kit/kit/client" | |
13 | "github.com/go-kit/kit/transport/codec" | |
14 | httptransport "github.com/go-kit/kit/transport/http" | |
14 | 15 | ) |
15 | 16 | |
16 | 17 | // Add is the abstract definition of what this service does. It could easily |
17 | // be an interface type with multiple methods. Each method would be an | |
18 | // endpoint. | |
18 | // be an interface type with multiple methods, in which case each method would | |
19 | // be an endpoint. | |
19 | 20 | type Add func(context.Context, int64, int64) int64 |
20 | 21 | |
21 | func pureAdd(_ context.Context, a, b int64) int64 { time.Sleep(34 * time.Millisecond); return a + b } | |
22 | func pureAdd(_ context.Context, a, b int64) int64 { return a + b } | |
22 | 23 | |
23 | func addViaHTTP(addr string, newSpan zipkin.NewSpanFunc, c zipkin.Collector) Add { | |
24 | // TODO make & use addsvc HTTP client | |
24 | func proxyAdd(e client.Endpoint) Add { | |
25 | return func(ctx context.Context, a, b int64) int64 { | |
26 | resp, err := e(ctx, &addRequest{a, b}) | |
27 | if err != nil { | |
28 | log.DefaultLogger.Log("err", err) | |
29 | return 0 | |
30 | } | |
31 | addResp, ok := resp.(*addResponse) | |
32 | if !ok { | |
33 | log.DefaultLogger.Log("err", client.ErrBadCast) | |
34 | return 0 | |
35 | } | |
36 | return addResp.V | |
37 | } | |
38 | } | |
25 | 39 | |
40 | type httpClient struct { | |
41 | *url.URL | |
42 | codec.Codec | |
43 | method string | |
44 | *http.Client | |
45 | before []httptransport.BeforeFunc | |
46 | makeResponse func() interface{} | |
47 | } | |
48 | ||
49 | // TODO this needs to go to package client | |
50 | func newHTTPClient(addr string, cdc codec.Codec, makeResponse func() interface{}, options ...httpClientOption) client.Endpoint { | |
26 | 51 | if !strings.HasPrefix(addr, "http") { |
27 | 52 | addr = "http://" + addr |
28 | 53 | } |
30 | 55 | if err != nil { |
31 | 56 | panic(err) |
32 | 57 | } |
33 | u.Path = "/add" | |
34 | 58 | |
35 | return func(ctx context.Context, a, b int64) int64 { | |
36 | var buf bytes.Buffer | |
37 | if err := json.NewEncoder(&buf).Encode(map[string]interface{}{"a": a, "b": b}); err != nil { | |
38 | log.DefaultLogger.Log("err", err) | |
39 | return 0 | |
40 | } | |
59 | c := httpClient{ | |
60 | URL: u, | |
61 | Codec: cdc, | |
62 | method: "GET", | |
63 | Client: http.DefaultClient, | |
64 | makeResponse: makeResponse, | |
65 | } | |
66 | for _, option := range options { | |
67 | option(&c) | |
68 | } | |
69 | return c.endpoint | |
70 | } | |
41 | 71 | |
42 | req, err := http.NewRequest("GET", u.String(), &buf) | |
43 | if err != nil { | |
44 | log.DefaultLogger.Log("err", err) | |
45 | return 0 | |
46 | } | |
72 | func (c httpClient) endpoint(ctx context.Context, request interface{}) (interface{}, error) { | |
73 | var buf bytes.Buffer | |
74 | if err := c.Codec.Encode(&buf, request); err != nil { | |
75 | return nil, err | |
76 | } | |
47 | 77 | |
48 | span := zipkin.NewChildSpan(ctx, newSpan) | |
49 | defer c.Collect(span) | |
50 | span.Annotate(zipkin.ClientSend) | |
51 | zipkin.SetRequestHeaders(req.Header, zipkin.NewChildSpan(ctx, newSpan)) | |
52 | resp, err := http.DefaultClient.Do(req) | |
53 | span.Annotate(zipkin.ClientReceive) | |
54 | if err != nil { | |
55 | log.DefaultLogger.Log("err", err) | |
56 | return 0 | |
57 | } | |
58 | defer resp.Body.Close() | |
78 | req, err := http.NewRequest(c.method, c.URL.String(), &buf) | |
79 | if err != nil { | |
80 | return nil, err | |
81 | } | |
59 | 82 | |
60 | var response struct { | |
61 | V int64 `json:"v"` | |
62 | } | |
63 | if err := json.NewDecoder(resp.Body).Decode(&response); err != nil { | |
64 | log.DefaultLogger.Log("err", err) | |
65 | return 0 | |
66 | } | |
83 | for _, f := range c.before { | |
84 | f(ctx, req) | |
85 | } | |
67 | 86 | |
68 | return response.V | |
87 | resp, err := c.Client.Do(req) | |
88 | if err != nil { | |
89 | return nil, err | |
69 | 90 | } |
91 | defer resp.Body.Close() | |
92 | ||
93 | response := c.makeResponse() | |
94 | ctx, err = c.Codec.Decode(ctx, resp.Body, response) | |
95 | if err != nil { | |
96 | return nil, err | |
97 | } | |
98 | ||
99 | return response, nil | |
70 | 100 | } |
101 | ||
102 | type httpClientOption func(*httpClient) | |
103 | ||
104 | func before(f httptransport.BeforeFunc) httpClientOption { | |
105 | return func(c *httpClient) { c.before = append(c.before, f) } | |
106 | } |
11 | 11 | // |
12 | 12 | // This function is just boiler-plate; in theory, it could be generated. |
13 | 13 | func makeEndpoint(a Add) server.Endpoint { |
14 | return func(ctx context.Context, req server.Request) (server.Response, error) { | |
14 | return func(ctx context.Context, request interface{}) (interface{}, error) { | |
15 | 15 | select { |
16 | 16 | case <-ctx.Done(): |
17 | 17 | return nil, server.ErrContextCanceled |
18 | 18 | default: |
19 | 19 | } |
20 | 20 | |
21 | addReq, ok := req.(*request) | |
21 | addReq, ok := request.(*addRequest) | |
22 | 22 | if !ok { |
23 | 23 | return nil, server.ErrBadCast |
24 | 24 | } |
25 | 25 | |
26 | 26 | v := a(ctx, addReq.A, addReq.B) |
27 | 27 | |
28 | return response{ | |
29 | V: v, | |
30 | }, nil | |
28 | return addResponse{V: v}, nil | |
31 | 29 | } |
32 | 30 | } |
19 | 19 | // way to manipulate the RPC context, like headers for HTTP. So we don't have |
20 | 20 | // a way to transport e.g. Zipkin IDs with the request. TODO. |
21 | 21 | func (b grpcBinding) Add(ctx context.Context, req *pb.AddRequest) (*pb.AddReply, error) { |
22 | addReq := request{req.A, req.B} | |
22 | addReq := addRequest{req.A, req.B} | |
23 | 23 | r, err := b.Endpoint(ctx, addReq) |
24 | 24 | if err != nil { |
25 | 25 | return nil, err |
26 | 26 | } |
27 | 27 | |
28 | resp, ok := r.(*response) | |
28 | resp, ok := r.(*addResponse) | |
29 | 29 | if !ok { |
30 | 30 | return nil, server.ErrBadCast |
31 | 31 | } |
10 | 10 | _ "net/http/pprof" |
11 | 11 | "os" |
12 | 12 | "os/signal" |
13 | "reflect" | |
14 | 13 | "strconv" |
14 | "strings" | |
15 | 15 | "syscall" |
16 | 16 | "time" |
17 | 17 | |
18 | 18 | "github.com/apache/thrift/lib/go/thrift" |
19 | "github.com/go-kit/kit/tracing/zipkin/_thrift/gen-go/zipkincore" | |
19 | 20 | stdprometheus "github.com/prometheus/client_golang/prometheus" |
20 | 21 | "github.com/streadway/handy/cors" |
21 | 22 | "github.com/streadway/handy/encoding" |
24 | 25 | |
25 | 26 | thriftadd "github.com/go-kit/kit/addsvc/_thrift/gen-go/add" |
26 | 27 | "github.com/go-kit/kit/addsvc/pb" |
28 | "github.com/go-kit/kit/client" | |
27 | 29 | kitlog "github.com/go-kit/kit/log" |
28 | 30 | "github.com/go-kit/kit/metrics" |
29 | 31 | "github.com/go-kit/kit/metrics/expvar" |
36 | 38 | ) |
37 | 39 | |
38 | 40 | func main() { |
39 | // gRPC transitively registers flags via its import of glog. Here we | |
40 | // define a new flag set, to keep those domains distinct. | |
41 | // Flag domain. Note that gRPC transitively registers flags via its import | |
42 | // of glog. So, we define a new flag set, to keep those domains distinct. | |
41 | 43 | fs := flag.NewFlagSet("", flag.ExitOnError) |
42 | 44 | var ( |
43 | fwd = fs.String("fwd.http", "", "if set, forward requests to this addsvc (HTTP)") | |
45 | proxyHTTPAddr = fs.String("proxy.http.addr", "", "if set, proxy requests over HTTP to this addsvc") | |
44 | 46 | debugAddr = fs.String("debug.addr", ":8000", "Address for HTTP debug/instrumentation server") |
45 | 47 | httpAddr = fs.String("http.addr", ":8001", "Address for HTTP (JSON) server") |
46 | 48 | grpcAddr = fs.String("grpc.addr", ":8002", "Address for gRPC server") |
52 | 54 | ) |
53 | 55 | flag.Usage = fs.Usage // only show our flags |
54 | 56 | fs.Parse(os.Args[1:]) |
55 | rand.Seed(time.Now().UnixNano()) | |
56 | 57 | |
57 | 58 | // `package log` domain |
58 | 59 | var logger kitlog.Logger |
90 | 91 | zipkinMethodName := "add" |
91 | 92 | zipkinSpanFunc := zipkin.MakeNewSpanFunc(zipkinHostPort, *zipkinServiceName, zipkinMethodName) |
92 | 93 | |
94 | // Mechanical stuff | |
95 | rand.Seed(time.Now().UnixNano()) | |
96 | root := context.Background() | |
97 | errc := make(chan error) | |
98 | ||
93 | 99 | // Our business and operational domain |
94 | var a Add | |
95 | if *fwd == "" { | |
96 | a = pureAdd | |
97 | } else { | |
98 | a = addViaHTTP(*fwd, zipkinSpanFunc, zipkinCollector) | |
100 | var a Add = pureAdd | |
101 | if *proxyHTTPAddr != "" { | |
102 | codec := jsoncodec.New() | |
103 | makeResponse := func() interface{} { return &addResponse{} } | |
104 | ||
105 | var e client.Endpoint | |
106 | e = newHTTPClient(*proxyHTTPAddr, codec, makeResponse, before(zipkin.ToRequest(zipkinSpanFunc))) | |
107 | e = zipkin.AnnotateClient(zipkinSpanFunc, zipkinCollector)(e) | |
108 | ||
109 | a = proxyAdd(e) | |
99 | 110 | } |
100 | 111 | a = logging(logger)(a) |
101 | 112 | |
102 | 113 | // `package server` domain |
103 | 114 | var e server.Endpoint |
104 | 115 | e = makeEndpoint(a) |
105 | e = zipkin.AnnotateEndpoint(zipkinSpanFunc, zipkinCollector)(e) | |
106 | ||
107 | // Mechanical stuff | |
108 | root := context.Background() | |
109 | errc := make(chan error) | |
116 | e = zipkin.AnnotateServer(zipkinSpanFunc, zipkinCollector)(e) | |
110 | 117 | |
111 | 118 | go func() { |
112 | 119 | errc <- interrupt() |
124 | 131 | defer cancel() |
125 | 132 | |
126 | 133 | field := metrics.Field{Key: "transport", Value: "http"} |
127 | before := httptransport.Before(zipkin.ToContext(zipkin.FromHTTP(zipkinSpanFunc))) | |
134 | before := httptransport.Before(zipkin.ToContext(zipkinSpanFunc)) | |
128 | 135 | after := httptransport.After(httptransport.SetContentType("application/json")) |
136 | makeRequest := func() interface{} { return &addRequest{} } | |
129 | 137 | |
130 | 138 | var handler http.Handler |
131 | handler = httptransport.NewBinding(ctx, reflect.TypeOf(request{}), jsoncodec.New(), e, before, after) | |
139 | handler = httptransport.NewBinding(ctx, makeRequest, jsoncodec.New(), e, before, after) | |
132 | 140 | handler = encoding.Gzip(handler) |
133 | 141 | handler = cors.Middleware(cors.Config{})(handler) |
134 | 142 | handler = httpInstrument(requests.With(field), duration.With(field))(handler) |
226 | 234 | "trace_id", strconv.FormatInt(s.TraceID(), 16), |
227 | 235 | "span_id", strconv.FormatInt(s.SpanID(), 16), |
228 | 236 | "parent_span_id", strconv.FormatInt(s.ParentSpanID(), 16), |
237 | "annotations", pretty(s.Encode().GetAnnotations()), | |
229 | 238 | ) |
230 | 239 | return nil |
231 | 240 | } |
241 | ||
242 | func pretty(annotations []*zipkincore.Annotation) string { | |
243 | values := make([]string, len(annotations)) | |
244 | for i, annotation := range annotations { | |
245 | values[i] = annotation.Value | |
246 | } | |
247 | return strings.Join(values, " ") | |
248 | } |
2 | 2 | // The request and response types should be annotated sufficiently for all |
3 | 3 | // transports we intend to use. |
4 | 4 | |
5 | type request struct { | |
5 | type addRequest struct { | |
6 | 6 | A int64 `json:"a"` |
7 | 7 | B int64 `json:"b"` |
8 | 8 | } |
9 | 9 | |
10 | type response struct { | |
10 | type addResponse struct { | |
11 | 11 | V int64 `json:"v"` |
12 | 12 | } |
18 | 18 | |
19 | 19 | // Add implements Thrift's AddService interface. |
20 | 20 | func (tb thriftBinding) Add(a, b int64) (*thriftadd.AddReply, error) { |
21 | r, err := tb.Endpoint(tb.Context, request{a, b}) | |
21 | r, err := tb.Endpoint(tb.Context, addRequest{a, b}) | |
22 | 22 | if err != nil { |
23 | 23 | return nil, err |
24 | 24 | } |
25 | 25 | |
26 | resp, ok := r.(*response) | |
26 | resp, ok := r.(*addResponse) | |
27 | 27 | if !ok { |
28 | 28 | return nil, server.ErrBadCast |
29 | 29 | } |
0 | package client | |
1 | ||
2 | import ( | |
3 | "errors" | |
4 | ||
5 | "golang.org/x/net/context" | |
6 | ) | |
7 | ||
8 | // Endpoint is the fundamental building block of package client. | |
9 | // It represents a single RPC method on a remote service. | |
10 | type Endpoint func(ctx context.Context, request interface{}) (response interface{}, err error) | |
11 | ||
12 | // Middleware is a chainable behavior modifier. | |
13 | type Middleware func(Endpoint) Endpoint | |
14 | ||
15 | // ErrBadCast indicates a type error during decoding or encoding. | |
16 | var ErrBadCast = errors.New("bad cast") |
5 | 5 | "golang.org/x/net/context" |
6 | 6 | ) |
7 | 7 | |
8 | // Request is an RPC request. | |
9 | type Request interface{} | |
10 | ||
11 | // Response is an RPC response. | |
12 | type Response interface{} | |
13 | ||
14 | 8 | // Endpoint is the fundamental building block of package server. |
15 | 9 | // It represents a single RPC method. |
16 | type Endpoint func(context.Context, Request) (Response, error) | |
10 | type Endpoint func(ctx context.Context, request interface{}) (response interface{}, err error) | |
11 | ||
12 | // Middleware is a chainable behavior modifier. | |
13 | type Middleware func(Endpoint) Endpoint | |
17 | 14 | |
18 | 15 | // ErrBadCast indicates a type error during decoding or encoding. |
19 | 16 | var ErrBadCast = errors.New("bad cast") |
4 | 4 | "net/http" |
5 | 5 | "strconv" |
6 | 6 | |
7 | "github.com/go-kit/kit/client" | |
7 | 8 | "github.com/go-kit/kit/log" |
8 | 9 | "github.com/go-kit/kit/server" |
9 | 10 | |
44 | 45 | ClientReceive = "cr" |
45 | 46 | ) |
46 | 47 | |
47 | // AnnotateEndpoint extracts a span from the context, adds server-receive and | |
48 | // server-send annotations at the boundaries, and submits the span to the | |
49 | // collector. If no span is present, a new span is generated and put in the | |
50 | // context. | |
51 | func AnnotateEndpoint(newSpan NewSpanFunc, c Collector) func(server.Endpoint) server.Endpoint { | |
48 | // AnnotateServer returns a server.Middleware that extracts a span from the | |
49 | // context, adds server-receive and server-send annotations at the boundaries, | |
50 | // and submits the span to the collector. If no span is found in the context, | |
51 | // a new span is generated and inserted. | |
52 | func AnnotateServer(newSpan NewSpanFunc, c Collector) server.Middleware { | |
52 | 53 | return func(e server.Endpoint) server.Endpoint { |
53 | return func(ctx context.Context, req server.Request) (server.Response, error) { | |
54 | span, ctx := mustGetServerSpan(ctx, newSpan) | |
54 | return func(ctx context.Context, request interface{}) (interface{}, error) { | |
55 | span, ok := maybeFromContext(ctx) | |
56 | if !ok { | |
57 | span = newSpan(newID(), newID(), 0) | |
58 | ctx = context.WithValue(ctx, SpanContextKey, span) | |
59 | } | |
55 | 60 | span.Annotate(ServerReceive) |
56 | 61 | defer func() { span.Annotate(ServerSend); c.Collect(span) }() |
57 | return e(ctx, req) | |
62 | return e(ctx, request) | |
58 | 63 | } |
59 | 64 | } |
60 | 65 | } |
61 | 66 | |
62 | // FromHTTP is a helper method that allows NewSpanFunc's factory function to | |
63 | // be easily invoked by passing an HTTP request. The span name is the HTTP | |
64 | // method. The trace, span, and parent span IDs are taken from the request | |
65 | // headers. | |
66 | func FromHTTP(newSpan NewSpanFunc) func(*http.Request) *Span { | |
67 | return func(r *http.Request) *Span { | |
68 | traceIDStr := r.Header.Get(traceIDHTTPHeader) | |
69 | if traceIDStr == "" { | |
70 | // If there's no trace ID, that's normal: just make a new one. | |
71 | log.DefaultLogger.Log("debug", "make new span") | |
72 | return newSpan(newID(), newID(), 0) | |
67 | // AnnotateClient returns a client.Middleware that extracts a parent span from | |
68 | // the context, produces a client (child) span from it, adds client-send and | |
69 | // client-receive annotations at the boundaries, and submits the span to the | |
70 | // collector. If no span is found in the context, a new span is generated and | |
71 | // inserted. | |
72 | func AnnotateClient(newSpan NewSpanFunc, c Collector) client.Middleware { | |
73 | return func(e client.Endpoint) client.Endpoint { | |
74 | return func(ctx context.Context, request interface{}) (interface{}, error) { | |
75 | var clientSpan *Span | |
76 | parentSpan, ok := maybeFromContext(ctx) | |
77 | if ok { | |
78 | clientSpan = newSpan(parentSpan.TraceID(), newID(), parentSpan.SpanID()) | |
79 | } else { | |
80 | clientSpan = newSpan(newID(), newID(), 0) | |
81 | } | |
82 | ctx = context.WithValue(ctx, SpanContextKey, clientSpan) // set | |
83 | defer func() { ctx = context.WithValue(ctx, SpanContextKey, parentSpan) }() // reset | |
84 | clientSpan.Annotate(ClientSend) | |
85 | defer func() { clientSpan.Annotate(ClientReceive); c.Collect(clientSpan) }() | |
86 | return e(ctx, request) | |
73 | 87 | } |
74 | traceID, err := strconv.ParseInt(traceIDStr, 16, 64) | |
75 | if err != nil { | |
76 | log.DefaultLogger.Log(traceIDHTTPHeader, traceIDStr, "err", err) | |
77 | return newSpan(newID(), newID(), 0) | |
88 | } | |
89 | } | |
90 | ||
91 | // ToContext returns a function that satisfies transport/http.BeforeFunc. It | |
92 | // takes a Zipkin span from the incoming HTTP request, and saves it in the | |
93 | // request context. It's designed to be wired into a server's HTTP transport | |
94 | // Before stack. | |
95 | func ToContext(newSpan NewSpanFunc) func(ctx context.Context, r *http.Request) context.Context { | |
96 | return func(ctx context.Context, r *http.Request) context.Context { | |
97 | return context.WithValue(ctx, SpanContextKey, fromHTTP(newSpan, r)) | |
98 | } | |
99 | } | |
100 | ||
101 | // ToRequest returns a function that satisfies transport/http.BeforeFunc. It | |
102 | // takes a Zipkin span from the context, and injects it into the HTTP request. | |
103 | // It's designed to be wired into a client's HTTP transport Before stack. It's | |
104 | // expected that AnnotateClient has already ensured the span in the context is | |
105 | // a child/client span. | |
106 | func ToRequest(newSpan NewSpanFunc) func(ctx context.Context, r *http.Request) context.Context { | |
107 | return func(ctx context.Context, r *http.Request) context.Context { | |
108 | span, ok := maybeFromContext(ctx) | |
109 | if !ok { | |
110 | span = newSpan(newID(), newID(), 0) | |
78 | 111 | } |
79 | spanIDStr := r.Header.Get(spanIDHTTPHeader) | |
80 | if spanIDStr == "" { | |
81 | log.DefaultLogger.Log("msg", "trace ID without span ID") // abnormal | |
82 | spanIDStr = strconv.FormatInt(newID(), 64) // deal with it | |
83 | } | |
84 | spanID, err := strconv.ParseInt(spanIDStr, 16, 64) | |
85 | if err != nil { | |
86 | log.DefaultLogger.Log(spanIDHTTPHeader, spanIDStr, "err", err) // abnormal | |
87 | spanID = newID() // deal with it | |
88 | } | |
89 | parentSpanIDStr := r.Header.Get(parentSpanIDHTTPHeader) | |
90 | if parentSpanIDStr == "" { | |
91 | parentSpanIDStr = "0" // normal | |
92 | } | |
93 | parentSpanID, err := strconv.ParseInt(parentSpanIDStr, 16, 64) | |
94 | if err != nil { | |
95 | log.DefaultLogger.Log(parentSpanIDHTTPHeader, parentSpanIDStr, "err", err) // abnormal | |
96 | parentSpanID = 0 // the only way to deal with it | |
97 | } | |
98 | return newSpan(traceID, spanID, parentSpanID) | |
99 | } | |
100 | } | |
101 | ||
102 | // ToContext returns a function that satisfies transport/http.BeforeFunc. When | |
103 | // invoked, it generates a Zipkin span from the incoming HTTP request, and | |
104 | // saves it in the request context under the SpanContextKey. | |
105 | func ToContext(f func(*http.Request) *Span) func(context.Context, *http.Request) context.Context { | |
106 | return func(ctx context.Context, r *http.Request) context.Context { | |
107 | return context.WithValue(ctx, SpanContextKey, f(r)) | |
108 | } | |
109 | } | |
110 | ||
111 | // NewChildSpan creates a new child (client) span. If a span is present in the | |
112 | // context, it will be interpreted as the parent. | |
113 | func NewChildSpan(ctx context.Context, newSpan NewSpanFunc) *Span { | |
112 | setRequestHeaders(r.Header, span) | |
113 | return ctx | |
114 | } | |
115 | } | |
116 | ||
117 | func fromHTTP(newSpan NewSpanFunc, r *http.Request) *Span { | |
118 | traceIDStr := r.Header.Get(traceIDHTTPHeader) | |
119 | if traceIDStr == "" { | |
120 | log.DefaultLogger.Log("debug", "make new span") | |
121 | return newSpan(newID(), newID(), 0) // normal; just make a new one | |
122 | } | |
123 | traceID, err := strconv.ParseInt(traceIDStr, 16, 64) | |
124 | if err != nil { | |
125 | log.DefaultLogger.Log(traceIDHTTPHeader, traceIDStr, "err", err) | |
126 | return newSpan(newID(), newID(), 0) | |
127 | } | |
128 | spanIDStr := r.Header.Get(spanIDHTTPHeader) | |
129 | if spanIDStr == "" { | |
130 | log.DefaultLogger.Log("msg", "trace ID without span ID") // abnormal | |
131 | spanIDStr = strconv.FormatInt(newID(), 64) // deal with it | |
132 | } | |
133 | spanID, err := strconv.ParseInt(spanIDStr, 16, 64) | |
134 | if err != nil { | |
135 | log.DefaultLogger.Log(spanIDHTTPHeader, spanIDStr, "err", err) // abnormal | |
136 | spanID = newID() // deal with it | |
137 | } | |
138 | parentSpanIDStr := r.Header.Get(parentSpanIDHTTPHeader) | |
139 | if parentSpanIDStr == "" { | |
140 | parentSpanIDStr = "0" // normal | |
141 | } | |
142 | parentSpanID, err := strconv.ParseInt(parentSpanIDStr, 16, 64) | |
143 | if err != nil { | |
144 | log.DefaultLogger.Log(parentSpanIDHTTPHeader, parentSpanIDStr, "err", err) // abnormal | |
145 | parentSpanID = 0 // the only way to deal with it | |
146 | } | |
147 | return newSpan(traceID, spanID, parentSpanID) | |
148 | } | |
149 | ||
150 | // func fromContext(newSpan NewSpanFunc) func(context.Context) *Span { | |
151 | // return func(ctx context.Context) *Span { | |
152 | // if span, ok := maybeFromContext(ctx); ok { | |
153 | // return span | |
154 | // } | |
155 | // return newSpan(newID(), newID(), 0) | |
156 | // } | |
157 | // } | |
158 | ||
159 | //// NewChildSpan creates a new child (client) span. If a span is already | |
160 | //// present in the context, it will be interpreted as the parent. | |
161 | //func NewChildSpan(ctx context.Context, newSpan NewSpanFunc) *Span { | |
162 | // parentSpan, ok := maybeFromContext(ctx) | |
163 | // if !ok { | |
164 | // return newSpan(newID(), newID(), 0) | |
165 | // } | |
166 | // var ( | |
167 | // traceID = parentSpan.TraceID() | |
168 | // spanID = newID() | |
169 | // parentSpanID = parentSpan.SpanID() | |
170 | // ) | |
171 | // return newSpan(traceID, spanID, parentSpanID) | |
172 | //} | |
173 | ||
174 | func setRequestHeaders(h http.Header, s *Span) { | |
175 | if id := s.TraceID(); id > 0 { | |
176 | h.Set(traceIDHTTPHeader, strconv.FormatInt(id, 16)) | |
177 | } | |
178 | if id := s.SpanID(); id > 0 { | |
179 | h.Set(spanIDHTTPHeader, strconv.FormatInt(id, 16)) | |
180 | } | |
181 | if id := s.ParentSpanID(); id > 0 { | |
182 | h.Set(parentSpanIDHTTPHeader, strconv.FormatInt(id, 16)) | |
183 | } | |
184 | } | |
185 | ||
186 | func maybeFromContext(ctx context.Context) (*Span, bool) { | |
114 | 187 | val := ctx.Value(SpanContextKey) |
115 | 188 | if val == nil { |
116 | return newSpan(newID(), newID(), 0) | |
117 | } | |
118 | parentSpan, ok := val.(*Span) | |
119 | if !ok { | |
120 | panic(SpanContextKey + " value isn't a span object") | |
121 | } | |
122 | var ( | |
123 | traceID = parentSpan.TraceID() | |
124 | spanID = newID() | |
125 | parentSpanID = parentSpan.SpanID() | |
126 | ) | |
127 | return newSpan(traceID, spanID, parentSpanID) | |
128 | } | |
129 | ||
130 | // SetRequestHeaders sets up HTTP headers for a new outbound request based on | |
131 | // the (client) span. All IDs are encoded as hex strings. | |
132 | func SetRequestHeaders(h http.Header, s *Span) { | |
133 | if id := s.TraceID(); id > 0 { | |
134 | h.Set(traceIDHTTPHeader, strconv.FormatInt(id, 16)) | |
135 | } | |
136 | if id := s.SpanID(); id > 0 { | |
137 | h.Set(spanIDHTTPHeader, strconv.FormatInt(id, 16)) | |
138 | } | |
139 | if id := s.ParentSpanID(); id > 0 { | |
140 | h.Set(parentSpanIDHTTPHeader, strconv.FormatInt(id, 16)) | |
141 | } | |
142 | } | |
143 | ||
144 | func mustGetServerSpan(ctx context.Context, newSpan NewSpanFunc) (*Span, context.Context) { | |
145 | val := ctx.Value(SpanContextKey) | |
146 | if val == nil { | |
147 | span := newSpan(newID(), newID(), 0) | |
148 | return span, context.WithValue(ctx, SpanContextKey, span) | |
189 | return nil, false | |
149 | 190 | } |
150 | 191 | span, ok := val.(*Span) |
151 | 192 | if !ok { |
152 | 193 | panic(SpanContextKey + " value isn't a span object") |
153 | 194 | } |
154 | return span, ctx | |
155 | } | |
156 | ||
157 | //func getID(h http.Header, key string) int64 { | |
158 | // val := h.Get(key) | |
159 | // if val == "" { | |
160 | // return 0 | |
195 | return span, true | |
196 | } | |
197 | ||
198 | //func mustGetServerSpan(ctx context.Context, newSpan NewSpanFunc) (*Span, context.Context) { | |
199 | // val := ctx.Value(SpanContextKey) | |
200 | // if val == nil { | |
201 | // span := newSpan(newID(), newID(), 0) | |
202 | // return span, context.WithValue(ctx, SpanContextKey, span) | |
161 | 203 | // } |
162 | // i, err := strconv.ParseInt(val, 16, 64) | |
163 | // if err != nil { | |
164 | // panic("invalid Zipkin ID in HTTP header: " + val) | |
204 | // span, ok := val.(*Span) | |
205 | // if !ok { | |
206 | // panic(SpanContextKey + " value isn't a span object") | |
165 | 207 | // } |
166 | // return i | |
208 | // return span, ctx | |
167 | 209 | //} |
168 | 210 | |
169 | 211 | func newID() int64 { |
0 | package zipkin | |
1 | ||
2 | import ( | |
3 | "net/http" | |
4 | "strconv" | |
5 | "testing" | |
6 | ||
7 | "golang.org/x/net/context" | |
8 | ) | |
9 | ||
10 | func TestFromHTTPToContext(t *testing.T) { | |
11 | const ( | |
12 | hostport = "5.5.5.5:5555" | |
13 | serviceName = "foo-service" | |
14 | methodName = "foo-method" | |
15 | traceID int64 = 12 | |
16 | spanID int64 = 34 | |
17 | parentSpanID int64 = 56 | |
18 | ) | |
19 | ||
20 | r, _ := http.NewRequest("GET", "https://best.horse", nil) | |
21 | r.Header.Set("X-B3-TraceId", strconv.FormatInt(traceID, 16)) | |
22 | r.Header.Set("X-B3-SpanId", strconv.FormatInt(spanID, 16)) | |
23 | r.Header.Set("X-B3-ParentSpanId", strconv.FormatInt(parentSpanID, 16)) | |
24 | ||
25 | sf := MakeNewSpanFunc(hostport, serviceName, methodName) | |
26 | cf := ToContext(sf) | |
27 | ||
28 | ctx := cf(context.Background(), r) | |
29 | val := ctx.Value(SpanContextKey) | |
30 | if val == nil { | |
31 | t.Fatalf("%s returned no value", SpanContextKey) | |
32 | } | |
33 | span, ok := val.(*Span) | |
34 | if !ok { | |
35 | t.Fatalf("%s was not a Span object", SpanContextKey) | |
36 | } | |
37 | ||
38 | if want, have := traceID, span.TraceID(); want != have { | |
39 | t.Errorf("want %d, have %d", want, have) | |
40 | } | |
41 | ||
42 | if want, have := spanID, span.SpanID(); want != have { | |
43 | t.Errorf("want %d, have %d", want, have) | |
44 | } | |
45 | ||
46 | if want, have := parentSpanID, span.ParentSpanID(); want != have { | |
47 | t.Errorf("want %d, have %d", want, have) | |
48 | } | |
49 | } | |
50 | ||
51 | func TestSetRequestHeaders(t *testing.T) { | |
52 | const ( | |
53 | hostport = "4.2.4.2:4242" | |
54 | serviceName = "bar-service" | |
55 | methodName = "bar-method" | |
56 | traceID int64 = 123 | |
57 | spanID int64 = 456 | |
58 | parentSpanID int64 = 789 | |
59 | ) | |
60 | ||
61 | r, _ := http.NewRequest("POST", "http://destroy.horse", nil) | |
62 | setRequestHeaders(r.Header, NewSpan(hostport, serviceName, methodName, traceID, spanID, parentSpanID)) | |
63 | ||
64 | for h, want := range map[string]string{ | |
65 | "X-B3-TraceId": strconv.FormatInt(traceID, 16), | |
66 | "X-B3-SpanId": strconv.FormatInt(spanID, 16), | |
67 | "X-B3-ParentSpanId": strconv.FormatInt(parentSpanID, 16), | |
68 | } { | |
69 | if have := r.Header.Get(h); want != have { | |
70 | t.Errorf("%s: want %s, have %s", h, want, have) | |
71 | } | |
72 | } | |
73 | } |
0 | 0 | package zipkin_test |
1 | 1 | |
2 | 2 | import ( |
3 | "math/rand" | |
4 | "net/http" | |
5 | "strconv" | |
6 | 3 | "sync/atomic" |
7 | 4 | "testing" |
8 | 5 | |
12 | 9 | "github.com/go-kit/kit/tracing/zipkin" |
13 | 10 | ) |
14 | 11 | |
15 | func TestAnnotateEndpoint(t *testing.T) { | |
12 | func TestAnnotateServer(t *testing.T) { | |
16 | 13 | const ( |
17 | 14 | hostport = "1.2.3.4:1234" |
18 | 15 | serviceName = "some-service" |
23 | 20 | c := &countingCollector{} |
24 | 21 | |
25 | 22 | var e server.Endpoint |
26 | e = func(context.Context, server.Request) (server.Response, error) { return struct{}{}, nil } | |
27 | e = zipkin.AnnotateEndpoint(f, c)(e) | |
23 | e = func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil } | |
24 | e = zipkin.AnnotateServer(f, c)(e) | |
28 | 25 | |
29 | 26 | if want, have := int32(0), int32(c.int32); want != have { |
30 | 27 | t.Errorf("want %d, have %d", want, have) |
37 | 34 | } |
38 | 35 | } |
39 | 36 | |
40 | func TestFromHTTPToContext(t *testing.T) { | |
41 | const ( | |
42 | hostport = "5.5.5.5:5555" | |
43 | serviceName = "foo-service" | |
44 | methodName = "foo-method" | |
45 | traceID int64 = 12 | |
46 | spanID int64 = 34 | |
47 | parentSpanID int64 = 56 | |
48 | ) | |
49 | ||
50 | r, _ := http.NewRequest("GET", "https://best.horse", nil) | |
51 | r.Header.Set("X-B3-TraceId", strconv.FormatInt(traceID, 16)) | |
52 | r.Header.Set("X-B3-SpanId", strconv.FormatInt(spanID, 16)) | |
53 | r.Header.Set("X-B3-ParentSpanId", strconv.FormatInt(parentSpanID, 16)) | |
54 | ||
55 | sf := zipkin.MakeNewSpanFunc(hostport, serviceName, methodName) | |
56 | hf := zipkin.FromHTTP(sf) | |
57 | cf := zipkin.ToContext(hf) | |
58 | ||
59 | ctx := cf(context.Background(), r) | |
60 | val := ctx.Value(zipkin.SpanContextKey) | |
61 | if val == nil { | |
62 | t.Fatalf("%s returned no value", zipkin.SpanContextKey) | |
63 | } | |
64 | span, ok := val.(*zipkin.Span) | |
65 | if !ok { | |
66 | t.Fatalf("%s was not a Span object", zipkin.SpanContextKey) | |
67 | } | |
68 | ||
69 | if want, have := traceID, span.TraceID(); want != have { | |
70 | t.Errorf("want %d, have %d", want, have) | |
71 | } | |
72 | ||
73 | if want, have := spanID, span.SpanID(); want != have { | |
74 | t.Errorf("want %d, have %d", want, have) | |
75 | } | |
76 | ||
77 | if want, have := parentSpanID, span.ParentSpanID(); want != have { | |
78 | t.Errorf("want %d, have %d", want, have) | |
79 | } | |
80 | } | |
81 | ||
82 | func TestNewChildSpan(t *testing.T) { | |
83 | rand.Seed(123) | |
84 | ||
85 | const ( | |
86 | hostport = "1.2.1.2:1212" | |
87 | serviceName = "my-service" | |
88 | methodName = "my-method" | |
89 | traceID int64 = 123 | |
90 | spanID int64 = 456 | |
91 | parentSpanID int64 = 789 | |
92 | ) | |
93 | ||
94 | f := zipkin.MakeNewSpanFunc(hostport, serviceName, methodName) | |
95 | ctx := context.WithValue(context.Background(), zipkin.SpanContextKey, f(traceID, spanID, parentSpanID)) | |
96 | childSpan := zipkin.NewChildSpan(ctx, f) | |
97 | ||
98 | if want, have := traceID, childSpan.TraceID(); want != have { | |
99 | t.Errorf("want %d, have %d", want, have) | |
100 | } | |
101 | if have := childSpan.SpanID(); have == spanID { | |
102 | t.Errorf("span ID should be random, but we have %d", have) | |
103 | } | |
104 | if want, have := spanID, childSpan.ParentSpanID(); want != have { | |
105 | t.Errorf("want %d, have %d", want, have) | |
106 | } | |
107 | } | |
108 | ||
109 | func TestSetRequestHeaders(t *testing.T) { | |
110 | const ( | |
111 | hostport = "4.2.4.2:4242" | |
112 | serviceName = "bar-service" | |
113 | methodName = "bar-method" | |
114 | traceID int64 = 123 | |
115 | spanID int64 = 456 | |
116 | parentSpanID int64 = 789 | |
117 | ) | |
118 | ||
119 | r, _ := http.NewRequest("POST", "http://destroy.horse", nil) | |
120 | zipkin.SetRequestHeaders(r.Header, zipkin.NewSpan(hostport, serviceName, methodName, traceID, spanID, parentSpanID)) | |
121 | ||
122 | for h, want := range map[string]string{ | |
123 | "X-B3-TraceId": strconv.FormatInt(traceID, 16), | |
124 | "X-B3-SpanId": strconv.FormatInt(spanID, 16), | |
125 | "X-B3-ParentSpanId": strconv.FormatInt(parentSpanID, 16), | |
126 | } { | |
127 | if have := r.Header.Get(h); want != have { | |
128 | t.Errorf("%s: want %s, have %s", h, want, have) | |
129 | } | |
130 | } | |
37 | func TestAnnotateClient(t *testing.T) { | |
38 | t.Skip("not yet implemented") | |
131 | 39 | } |
132 | 40 | |
133 | 41 | type countingCollector struct{ int32 } |
3 | 3 | "io" |
4 | 4 | |
5 | 5 | "golang.org/x/net/context" |
6 | ||
7 | "github.com/go-kit/kit/server" | |
8 | 6 | ) |
9 | 7 | |
10 | // Codec defines how to decode and encode requests and responses. Decode takes | |
11 | // and returns a context because the request may be accompanied by information | |
8 | // Codec decodes and encodes requests and responses. Decode takes and returns | |
9 | // a context because the request or response may be accompanied by information | |
12 | 10 | // that needs to be applied there. |
13 | 11 | type Codec interface { |
14 | Decode(context.Context, io.Reader, server.Request) (context.Context, error) | |
15 | Encode(io.Writer, server.Response) error | |
12 | Decode(context.Context, io.Reader, interface{}) (context.Context, error) | |
13 | Encode(io.Writer, interface{}) error | |
16 | 14 | } |
5 | 5 | |
6 | 6 | "golang.org/x/net/context" |
7 | 7 | |
8 | "github.com/go-kit/kit/server" | |
9 | 8 | "github.com/go-kit/kit/transport/codec" |
10 | 9 | ) |
11 | 10 | |
15 | 14 | // properly-tagged fields. |
16 | 15 | func New() codec.Codec { return jsonCodec{} } |
17 | 16 | |
18 | func (jsonCodec) Decode(ctx context.Context, r io.Reader, req server.Request) (context.Context, error) { | |
19 | return ctx, json.NewDecoder(r).Decode(req) | |
17 | func (jsonCodec) Decode(ctx context.Context, r io.Reader, v interface{}) (context.Context, error) { | |
18 | return ctx, json.NewDecoder(r).Decode(v) | |
20 | 19 | } |
21 | 20 | |
22 | func (jsonCodec) Encode(w io.Writer, resp server.Response) error { | |
23 | return json.NewEncoder(w).Encode(resp) | |
21 | func (jsonCodec) Encode(w io.Writer, v interface{}) error { | |
22 | return json.NewEncoder(w).Encode(v) | |
24 | 23 | } |
1 | 1 | |
2 | 2 | import ( |
3 | 3 | "net/http" |
4 | "reflect" | |
5 | 4 | |
6 | 5 | "golang.org/x/net/context" |
7 | 6 | |
24 | 23 | |
25 | 24 | type binding struct { |
26 | 25 | context.Context |
27 | requestType reflect.Type | |
26 | makeRequest func() interface{} | |
28 | 27 | codec.Codec |
29 | 28 | server.Endpoint |
30 | 29 | before []BeforeFunc |
32 | 31 | } |
33 | 32 | |
34 | 33 | // NewBinding returns an HTTP handler that wraps the given endpoint. |
35 | func NewBinding(ctx context.Context, requestType reflect.Type, cdc codec.Codec, endpoint server.Endpoint, options ...BindingOption) http.Handler { | |
34 | func NewBinding(ctx context.Context, makeRequest func() interface{}, cdc codec.Codec, endpoint server.Endpoint, options ...BindingOption) http.Handler { | |
36 | 35 | b := &binding{ |
37 | 36 | Context: ctx, |
38 | requestType: requestType, | |
37 | makeRequest: makeRequest, | |
39 | 38 | Codec: cdc, |
40 | 39 | Endpoint: endpoint, |
41 | 40 | } |
56 | 55 | } |
57 | 56 | |
58 | 57 | // Decode request. |
59 | req := reflect.New(b.requestType).Interface() | |
58 | req := b.makeRequest() | |
60 | 59 | ctx, err := b.Codec.Decode(ctx, r.Body, req) |
61 | 60 | if err != nil { |
62 | 61 | http.Error(w, err.Error(), http.StatusBadRequest) |
10 | 10 | |
11 | 11 | "golang.org/x/net/context" |
12 | 12 | |
13 | "github.com/go-kit/kit/server" | |
14 | 13 | jsoncodec "github.com/go-kit/kit/transport/codec/json" |
15 | 14 | httptransport "github.com/go-kit/kit/transport/http" |
16 | 15 | ) |
28 | 27 | return 3 * i // doesn't matter, just do something |
29 | 28 | } |
30 | 29 | |
31 | endpoint := func(_ context.Context, req server.Request) (server.Response, error) { | |
32 | r, ok := req.(*myRequest) | |
30 | endpoint := func(_ context.Context, request interface{}) (interface{}, error) { | |
31 | r, ok := request.(*myRequest) | |
33 | 32 | if !ok { |
34 | return nil, fmt.Errorf("not myRequest (%s)", reflect.TypeOf(req)) | |
33 | return nil, fmt.Errorf("not myRequest (%s)", reflect.TypeOf(request)) | |
35 | 34 | } |
36 | 35 | return myResponse{transform(r.In)}, nil |
37 | 36 | } |
38 | 37 | |
39 | 38 | ctx := context.Background() |
40 | requestType := reflect.TypeOf(myRequest{}) | |
39 | makeRequest := func() interface{} { return &myRequest{} } | |
41 | 40 | codec := jsoncodec.New() |
42 | binding := httptransport.NewBinding(ctx, requestType, codec, endpoint) | |
41 | binding := httptransport.NewBinding(ctx, makeRequest, codec, endpoint) | |
43 | 42 | server := httptest.NewServer(binding) |
44 | 43 | defer server.Close() |
45 | 44 |