Codebase list golang-github-go-kit-kit / a8cd9c8
Merge pull request #45 from go-kit/fix-zipkin-tracing Fix Zipkin tracing, and introduce package client Peter Bourgon 8 years ago
23 changed file(s) with 641 addition(s) and 340 deletion(s). Raw diff Collapse all Expand all
0 addsvc/addsvc
1 cover.out
3 # Compiled Object files, Static and Dynamic libs (Shared Objects)
4 *.o
5 *.a
6 *.so
8 # Folders
9 _obj
10 _test
11 _old*
13 # Architecture specific extensions/prefixes
14 *.[568vq]
15 [568vq].out
17 *.cgo1.go
18 *.cgo2.c
19 _cgo_defun.c
20 _cgo_gotypes.go
21 _cgo_export.*
23 _testmain.go
25 *.exe
addsvc/.gitignore less more
0 addsvc
00 package main
2 import (
3 ""
5 ""
6 ""
7 )
29 // Add is the abstract definition of what this service does. It could easily
3 // be an interface type with multiple methods. Each method would be an
4 // endpoint.
5 type Add func(int64, int64) int64
10 // be an interface type with multiple methods, in which case each method would
11 // be an endpoint.
12 type Add func(context.Context, int64, int64) int64
7 func pureAdd(a, b int64) int64 { return a + b }
14 // pureAdd implements Add with no dependencies.
15 func pureAdd(_ context.Context, a, b int64) int64 { return a + b }
9 func addVia(r Resource) Add {
10 return func(a, b int64) int64 {
11 return r.Value(a) + r.Value(b)
17 // proxyAdd returns an implementation of Add that invokes a remote Add
18 // service.
19 func proxyAdd(e endpoint.Endpoint, logger log.Logger) Add {
20 return func(ctx context.Context, a, b int64) int64 {
21 resp, err := e(ctx, &addRequest{a, b})
22 if err != nil {
23 logger.Log("err", err)
24 return 0
25 }
26 addResp, ok := resp.(*addResponse)
27 if !ok {
28 logger.Log("err", endpoint.ErrBadCast)
29 return 0
30 }
31 return addResp.V
1232 }
1333 }
15 // Resource represents some dependency, outside of our control.
16 type Resource interface {
17 Value(int64) int64
18 }
20 type mockResource struct{}
22 func (mockResource) Value(i int64) int64 { return i }
22 import (
33 ""
5 ""
5 ""
66 )
8 // makeEndpoint returns a server.Endpoint wrapping the passed Add. If Add were
9 // an interface with multiple methods, we'd need individual endpoints for
10 // each.
8 // makeEndpoint returns an endpoint wrapping the passed Add. If Add were an
9 // interface with multiple methods, we'd need individual endpoints for each.
1110 //
1211 // This function is just boiler-plate; in theory, it could be generated.
13 func makeEndpoint(a Add) server.Endpoint {
14 return func(ctx context.Context, req server.Request) (server.Response, error) {
12 func makeEndpoint(a Add) endpoint.Endpoint {
13 return func(ctx context.Context, request interface{}) (interface{}, error) {
1514 select {
15 default:
1616 case <-ctx.Done():
17 return nil, server.ErrContextCanceled
18 default:
17 return nil, endpoint.ErrContextCanceled
1918 }
21 addReq, ok := req.(*request)
20 addReq, ok := request.(*addRequest)
2221 if !ok {
23 return nil, server.ErrBadCast
22 return nil, endpoint.ErrBadCast
2423 }
26 v := a(addReq.A, addReq.B)
25 v := a(ctx, addReq.A, addReq.B)
28 return response{
29 V: v,
30 }, nil
27 return addResponse{V: v}, nil
3128 }
3229 }
22 import (
33 "time"
5 ""
57 ""
68 )
8 func logging(logger log.Logger, add Add) Add {
9 return func(a, b int64) (v int64) {
10 defer func(begin time.Time) {
11 logger.Log("a", a, "b", b, "result", v, "took", time.Since(begin))
12 }(time.Now())
13 v = add(a, b)
14 return
10 func logging(logger log.Logger) func(Add) Add {
11 return func(next Add) Add {
12 return func(ctx context.Context, a, b int64) (v int64) {
13 defer func(begin time.Time) {
14 logger.Log("a", a, "b", b, "result", v, "took", time.Since(begin))
15 }(time.Now())
16 v = next(ctx, a, b)
17 return
18 }
1519 }
1620 }
55 ""
77 ""
8 ""
89 ""
9 ""
1010 )
1212 // A binding wraps an Endpoint so that it's usable by a transport. grpcBinding
1313 // makes an Endpoint usable over gRPC.
14 type grpcBinding struct{ server.Endpoint }
14 type grpcBinding struct{ endpoint.Endpoint }
1616 // Add implements the proto3 AddServer by forwarding to the wrapped Endpoint.
1717 //
1919 // way to manipulate the RPC context, like headers for HTTP. So we don't have
2020 // a way to transport e.g. Zipkin IDs with the request. TODO.
2121 func (b grpcBinding) Add(ctx context.Context, req *pb.AddRequest) (*pb.AddReply, error) {
22 addReq := request{req.A, req.B}
22 addReq := addRequest{req.A, req.B}
2323 r, err := b.Endpoint(ctx, addReq)
2424 if err != nil {
2525 return nil, err
2626 }
28 resp, ok := r.(*response)
28 resp, ok := r.(*addResponse)
2929 if !ok {
30 return nil, server.ErrBadCast
30 return nil, endpoint.ErrBadCast
3131 }
3333 return &pb.AddReply{
44 "fmt"
55 "io/ioutil"
66 stdlog "log"
7 "math/rand"
78 "net"
89 "net/http"
910 _ "net/http/pprof"
1011 "os"
1112 "os/signal"
12 "reflect"
13 "strings"
1314 "syscall"
1415 "time"
2324 thriftadd ""
2425 ""
26 ""
2527 kitlog ""
2628 ""
2729 ""
2830 ""
2931 ""
30 ""
3132 ""
3233 jsoncodec ""
3334 httptransport ""
3435 )
3637 func main() {
38 // Flag domain. Note that gRPC transitively registers flags via its import
39 // of glog. So, we define a new flag set, to keep those domains distinct.
40 fs := flag.NewFlagSet("", flag.ExitOnError)
3741 var (
38 debugAddr = flag.String("debug.addr", ":8000", "Address for HTTP debug/instrumentation server")
39 httpAddr = flag.String("http.addr", ":8001", "Address for HTTP (JSON) server")
40 grpcAddr = flag.String("grpc.addr", ":8002", "Address for gRPC server")
41 thriftAddr = flag.String("thrift.addr", ":8003", "Address for Thrift server")
42 thriftProtocol = flag.String("thrift.protocol", "binary", "binary, compact, json, simplejson")
43 thriftBufferSize = flag.Int("thrift.buffer.size", 0, "0 for unbuffered")
44 thriftFramed = flag.Bool("thrift.framed", false, "true to enable framing")
45 )
46 flag.Parse()
42 debugAddr = fs.String("debug.addr", ":8000", "Address for HTTP debug/instrumentation server")
43 httpAddr = fs.String("http.addr", ":8001", "Address for HTTP (JSON) server")
44 grpcAddr = fs.String("grpc.addr", ":8002", "Address for gRPC server")
45 thriftAddr = fs.String("thrift.addr", ":8003", "Address for Thrift server")
46 thriftProtocol = fs.String("thrift.protocol", "binary", "binary, compact, json, simplejson")
47 thriftBufferSize = fs.Int("thrift.buffer.size", 0, "0 for unbuffered")
48 thriftFramed = fs.Bool("thrift.framed", false, "true to enable framing")
50 proxyHTTPAddr = fs.String("proxy.http.url", "", "if set, proxy requests over HTTP to this addsvc")
52 zipkinServiceName = fs.String("", "addsvc", "Zipkin service name")
53 zipkinCollectorAddr = fs.String("zipkin.collector.addr", "", "Zipkin Scribe collector address (empty will log spans)")
54 zipkinCollectorTimeout = fs.Duration("zipkin.collector.timeout", time.Second, "Zipkin collector timeout")
55 zipkinCollectorBatchSize = fs.Int("zipkin.collector.batch.size", 100, "Zipkin collector batch size")
56 zipkinCollectorBatchInterval = fs.Duration("zipkin.collector.batch.interval", time.Second, "Zipkin collector batch interval")
57 )
58 flag.Usage = fs.Usage // only show our flags
59 fs.Parse(os.Args[1:])
4861 // `package log` domain
4962 var logger kitlog.Logger
5063 logger = kitlog.NewPrefixLogger(os.Stderr)
51 logger = kitlog.With(logger, "ts", kitlog.DefaultTimestampUTC)
64 logger = kitlog.With(logger, "ts", kitlog.DefaultTimestampUTC, "caller", kitlog.DefaultCaller)
5265 stdlog.SetOutput(kitlog.NewStdlibAdapter(logger)) // redirect stdlib logging to us
5366 stdlog.SetFlags(0) // flags are handled in our logger
7588 )
7790 // `package tracing` domain
78 zipkinHost := "my-host"
79 zipkinCollector := loggingCollector{logger}
80 zipkinAddName := "ADD" // is that right?
81 zipkinAddSpanFunc := zipkin.NewSpanFunc(zipkinHost, zipkinAddName)
91 zipkinHostPort := "localhost:1234" // TODO Zipkin makes overly simple assumptions about services
92 var zipkinCollector zipkin.Collector = loggingCollector{logger}
93 if *zipkinCollectorAddr != "" {
94 var err error
95 if zipkinCollector, err = zipkin.NewScribeCollector(
96 *zipkinCollectorAddr,
97 *zipkinCollectorTimeout,
98 *zipkinCollectorBatchSize,
99 *zipkinCollectorBatchInterval,
100 ); err != nil {
101 logger.Log("err", err)
102 os.Exit(1)
103 }
104 }
105 zipkinMethodName := "add"
106 zipkinSpanFunc := zipkin.MakeNewSpanFunc(zipkinHostPort, *zipkinServiceName, zipkinMethodName)
107 zipkin.Log.Swap(logger) // log diagnostic/error details
83109 // Our business and operational domain
84 var a Add
85 a = pureAdd
86 a = logging(logger, a)
88 // `package server` domain
89 var e server.Endpoint
110 var a Add = pureAdd
111 if *proxyHTTPAddr != "" {
112 codec := jsoncodec.New()
113 makeResponse := func() interface{} { return &addResponse{} }
115 var e endpoint.Endpoint
116 e = httptransport.NewClient(*proxyHTTPAddr, codec, makeResponse, httptransport.ClientBefore(zipkin.ToRequest(zipkinSpanFunc)))
117 e = zipkin.AnnotateClient(zipkinSpanFunc, zipkinCollector)(e)
119 a = proxyAdd(e, logger)
120 }
121 a = logging(logger)(a)
123 // Server domain
124 var e endpoint.Endpoint
90125 e = makeEndpoint(a)
91 e = zipkin.AnnotateEndpoint(zipkinAddSpanFunc, zipkinCollector)(e)
92 // e = someother.Middleware(arg1, arg2)(e)
126 e = zipkin.AnnotateServer(zipkinSpanFunc, zipkinCollector)(e)
94128 // Mechanical stuff
129 rand.Seed(time.Now().UnixNano())
95130 root := context.Background()
96131 errc := make(chan error)
111146 defer cancel()
113148 field := metrics.Field{Key: "transport", Value: "http"}
114 before := httptransport.Before(zipkin.ToContext(zipkin.FromHTTP(zipkinAddSpanFunc)))
115 after := httptransport.After(httptransport.SetContentType("application/json"))
149 before := httptransport.BindingBefore(zipkin.ToContext(zipkinSpanFunc))
150 after := httptransport.BindingAfter(httptransport.SetContentType("application/json"))
151 makeRequest := func() interface{} { return &addRequest{} }
117153 var handler http.Handler
118 handler = httptransport.NewBinding(ctx, reflect.TypeOf(request{}), jsoncodec.New(), e, before, after)
154 handler = httptransport.NewBinding(ctx, makeRequest, jsoncodec.New(), e, before, after)
119155 handler = encoding.Gzip(handler)
120156 handler = cors.Middleware(cors.Config{})(handler)
121157 handler = httpInstrument(requests.With(field), duration.With(field))(handler)
209245 type loggingCollector struct{ kitlog.Logger }
211247 func (c loggingCollector) Collect(s *zipkin.Span) error {
212 kitlog.With(c.Logger, "caller", kitlog.DefaultCaller).Log(
248 annotations := s.Encode().GetAnnotations()
249 values := make([]string, len(annotations))
250 for i, a := range annotations {
251 values[i] = a.Value
252 }
253 c.Logger.Log(
213254 "trace_id", s.TraceID(),
214255 "span_id", s.SpanID(),
215256 "parent_span_id", s.ParentSpanID(),
257 "annotations", strings.Join(values, " "),
216258 )
217259 return nil
218260 }
22 // The request and response types should be annotated sufficiently for all
33 // transports we intend to use.
5 type request struct {
5 type addRequest struct {
66 A int64 `json:"a"`
77 B int64 `json:"b"`
88 }
10 type response struct {
10 type addResponse struct {
1111 V int64 `json:"v"`
1212 }
55 ""
77 thriftadd ""
8 ""
89 ""
9 ""
1010 )
1212 // A binding wraps an Endpoint so that it's usable by a transport.
1313 // thriftBinding makes an Endpoint usable over Thrift.
1414 type thriftBinding struct {
1515 context.Context
16 server.Endpoint
16 endpoint.Endpoint
1717 }
1919 // Add implements Thrift's AddService interface.
2020 func (tb thriftBinding) Add(a, b int64) (*thriftadd.AddReply, error) {
21 r, err := tb.Endpoint(tb.Context, request{a, b})
21 r, err := tb.Endpoint(tb.Context, addRequest{a, b})
2222 if err != nil {
2323 return nil, err
2424 }
26 resp, ok := r.(*response)
26 resp, ok := r.(*addResponse)
2727 if !ok {
28 return nil, server.ErrBadCast
28 return nil, endpoint.ErrBadCast
2929 }
3131 return &thriftadd.AddReply{Value: resp.V}, nil
1717 function unique_directories { directories | sort | uniq ; }
19 PATHS=${1:-$(unique_directories)}
1921 function package_names {
20 unique_directories | while read d
22 for d in $PATHS
2123 do
2224 echo$d
2325 done
0 package endpoint
2 import (
3 "errors"
5 ""
6 )
8 // Endpoint is the fundamental building block of servers and clients.
9 // It represents a single RPC method.
10 type Endpoint func(ctx context.Context, request interface{}) (response interface{}, err error)
12 // Middleware is a chainable behavior modifier for endpoints.
13 type Middleware func(Endpoint) Endpoint
15 // ErrBadCast indicates an unexpected concrete request or response struct was
16 // received from an endpoint.
17 var ErrBadCast = errors.New("bad cast")
19 // ContextCanceled indicates the request context was canceled.
20 var ErrContextCanceled = errors.New("context canceled")
server/server.go less more
0 package server
2 import (
3 "errors"
5 ""
6 )
8 // Request is an RPC request.
9 type Request interface{}
11 // Response is an RPC response.
12 type Response interface{}
14 // Endpoint is the fundamental building block of package server.
15 // It represents a single RPC method.
16 type Endpoint func(context.Context, Request) (Response, error)
18 // ErrBadCast indicates a type error during decoding or encoding.
19 var ErrBadCast = errors.New("bad cast")
21 // ErrContextCanceled indicates a controlling context was canceled before the
22 // request could be served.
23 var ErrContextCanceled = errors.New("context was canceled")
2626 }
2828 var (
29 name = "span-name"
29 serviceName = "service"
30 methodName = "method"
3031 traceID = int64(123)
3132 spanID = int64(456)
3233 parentSpanID = int64(0)
3435 duration = 42 * time.Millisecond
3536 )
37 span := zipkin.NewSpan("some-host", name, traceID, spanID, parentSpanID)
38 span := zipkin.NewSpan("", serviceName, methodName, traceID, spanID, parentSpanID)
3839 span.AnnotateDuration("foo", 42*time.Millisecond)
3940 if err := c.Collect(span); err != nil {
4041 t.Errorf("error during collection: %v", err)
5758 }
5960 gotSpan := server.spans()[0]
60 if want, have := name, gotSpan.GetName(); want != have {
61 if want, have := methodName, gotSpan.GetName(); want != have {
6162 t.Errorf("want %q, have %q", want, have)
6263 }
6364 if want, have := traceID, gotSpan.GetTraceId(); want != have {
00 package zipkin
22 import (
3 "errors"
3 "encoding/binary"
4 "net"
5 "strconv"
46 "time"
68 ""
911 var (
1012 // SpanContextKey represents the Span in the request context.
1113 SpanContextKey = "Zipkin-Span"
13 // ErrSpanNotFound is returned when a Span isn't found in a context.
14 ErrSpanNotFound = errors.New("span not found")
1514 )
1716 // A Span is a named collection of annotations. It represents meaningful
1918 // service. Clients should annotate the span, and submit it when the request
2019 // that generated it is complete.
2120 type Span struct {
22 host string
23 name string
21 host *zipkincore.Endpoint
22 methodName string
2424 traceID int64
2525 spanID int64
2626 parentSpanID int64
2828 annotations []annotation
29 //binaryAnnotations []BinaryAnnotation
29 //binaryAnnotations []BinaryAnnotation // TODO
3030 }
3232 // NewSpan returns a new Span object ready for use.
33 func NewSpan(host string, name string, traceID, spanID, parentSpanID int64) *Span {
33 func NewSpan(hostport, serviceName, methodName string, traceID, spanID, parentSpanID int64) *Span {
3434 return &Span{
35 host: host,
36 name: name,
35 host: makeEndpoint(hostport, serviceName),
36 methodName: methodName,
3737 traceID: traceID,
3838 spanID: spanID,
3939 parentSpanID: parentSpanID,
4040 }
4141 }
43 // NewSpanFunc returns a function that generates a new Zipkin span.
44 func NewSpanFunc(host, name string) func(int64, int64, int64) *Span {
43 // makeEndpoint will return a nil Endpoint if the input parameters are
44 // malformed.
45 func makeEndpoint(hostport, serviceName string) *zipkincore.Endpoint {
46 host, port, err := net.SplitHostPort(hostport)
47 if err != nil {
48 Log.Log("hostport", hostport, "err", err)
49 return nil
50 }
51 addrs, err := net.LookupIP(host)
52 if err != nil {
53 Log.Log("host", host, "err", err)
54 return nil
55 }
56 if len(addrs) <= 0 {
57 Log.Log("host", host, "err", "no IPs")
58 return nil
59 }
60 portInt, err := strconv.ParseInt(port, 10, 16)
61 if err != nil {
62 Log.Log("port", port, "err", err)
63 return nil
64 }
65 endpoint := zipkincore.NewEndpoint()
66 binary.LittleEndian.PutUint32(addrs[0], (uint32)(endpoint.Ipv4))
67 endpoint.Port = int16(portInt)
68 endpoint.ServiceName = serviceName
69 return endpoint
70 }
72 // MakeNewSpanFunc returns a function that generates a new Zipkin span.
73 func MakeNewSpanFunc(hostport, serviceName, methodName string) NewSpanFunc {
4574 return func(traceID, spanID, parentSpanID int64) *Span {
46 return NewSpan(host, name, traceID, spanID, parentSpanID)
75 return NewSpan(hostport, serviceName, methodName, traceID, spanID, parentSpanID)
4776 }
4877 }
79 // NewSpanFunc takes trace, span, & parent span IDs to produce a Span object.
80 type NewSpanFunc func(traceID, spanID, parentSpanID int64) *Span
5082 // TraceID returns the ID of the trace that this span is a member of.
5183 func (s *Span) TraceID() int64 { return s.traceID }
78110 // Thrift stuff into an encoder struct, owned by the ScribeCollector.
79111 zs := zipkincore.Span{
80112 TraceId: s.traceID,
81 Name:,
113 Name: s.methodName,
82114 Id: s.spanID,
83115 BinaryAnnotations: []*zipkincore.BinaryAnnotation{}, // TODO
84 Debug: false, // TODO
116 Debug: true, // TODO
85117 }
86118 if s.parentSpanID != 0 {
119 zs.ParentId = new(int64)
87120 (*zs.ParentId) = s.parentSpanID
88121 }
89122 zs.Annotations = make([]*zipkincore.Annotation, len(s.annotations))
91124 zs.Annotations[i] = &zipkincore.Annotation{
92125 Timestamp: a.timestamp.UnixNano() / 1e3,
93126 Value: a.value,
94 }
95 if != "" {
96 // zs.Annotations[i].Host = TODO
127 Host:,
97128 }
98129 if a.duration > 0 {
99130 zs.Annotations[i].Duration = new(int32)
100 (*zs.Annotations[i].Duration) = int32(a.duration / time.Microsecond)
131 *(zs.Annotations[i].Duration) = int32(a.duration / time.Microsecond)
101132 }
102133 }
103134 return &zs
107138 timestamp time.Time
108139 value string
109140 duration time.Duration // optional
110 host string
141 host *zipkincore.Endpoint
111142 }
77 ""
9 ""
910 ""
10 ""
1111 )
13 // In Zipkin, "spans are considered to start and stop with the client." The
14 // client is responsible for creating a new span ID for each outgoing request,
15 // copying its span ID to the parent span ID, and maintaining the same trace
16 // ID. The server-receive and server-send annotations can be considered value
17 // added information and aren't strictly necessary.
18 //
19 // Further reading:
20 // •
21 // •!topic/zipkin-user/KilwtSA0g1k
22 // •
1324 // Log is used to report diagnostic information. To enable it, swap in your
1425 // application's logger.
1526 var Log log.SwapLogger
17 //
18 //!topic/zipkin-user/KilwtSA0g1k
19 //
2128 const (
2229 //
2431 spanIDHTTPHeader = "X-B3-SpanId"
2532 parentSpanIDHTTPHeader = "X-B3-ParentSpanId"
27 clientSend = "cs"
28 serverReceive = "sr"
29 serverSend = "ss"
30 clientReceive = "cr"
34 // ClientSend is the annotation value used to mark a client sending a
35 // request to a server.
36 ClientSend = "cs"
38 // ServerReceive is the annotation value used to mark a server's receipt
39 // of a request from a client.
40 ServerReceive = "sr"
42 // ServerSend is the annotation value used to mark a server's completion
43 // of a request and response to a client.
44 ServerSend = "ss"
46 // ClientReceive is the annotation value used to mark a client's receipt
47 // of a completed request from a server.
48 ClientReceive = "cr"
3149 )
33 // AnnotateEndpoint extracts a span from the context, adds server-receive and
34 // server-send annotations at the boundaries, and submits the span to the
35 // collector. If no span is present, a new span is generated and put in the
36 // context.
37 func AnnotateEndpoint(f func(int64, int64, int64) *Span, c Collector) func(server.Endpoint) server.Endpoint {
38 return func(e server.Endpoint) server.Endpoint {
39 return func(ctx context.Context, req server.Request) (server.Response, error) {
40 span, ctx := mustGetServerSpan(ctx, f)
41 span.Annotate(serverReceive)
42 defer func() { span.Annotate(serverSend); c.Collect(span) }()
43 return e(ctx, req)
51 // AnnotateServer returns a server.Middleware that extracts a span from the
52 // context, adds server-receive and server-send annotations at the boundaries,
53 // and submits the span to the collector. If no span is found in the context,
54 // a new span is generated and inserted.
55 func AnnotateServer(newSpan NewSpanFunc, c Collector) endpoint.Middleware {
56 return func(e endpoint.Endpoint) endpoint.Endpoint {
57 return func(ctx context.Context, request interface{}) (interface{}, error) {
58 span, ok := fromContext(ctx)
59 if !ok {
60 span = newSpan(newID(), newID(), 0)
61 ctx = context.WithValue(ctx, SpanContextKey, span)
62 }
63 span.Annotate(ServerReceive)
64 defer func() { span.Annotate(ServerSend); c.Collect(span) }()
65 return e(ctx, request)
4466 }
4567 }
4668 }
48 // FromHTTP is a helper method that allows NewSpanFunc's factory function to
49 // be easily invoked by passing an HTTP request. The span name is the HTTP
50 // method. The trace, span, and parent span IDs are taken from the request
51 // headers.
52 func FromHTTP(f func(int64, int64, int64) *Span) func(*http.Request) *Span {
53 return func(r *http.Request) *Span {
54 return f(
55 getID(r.Header, traceIDHTTPHeader),
56 getID(r.Header, spanIDHTTPHeader),
57 getID(r.Header, parentSpanIDHTTPHeader),
58 )
70 // AnnotateClient returns a middleware that extracts a parent span from the
71 // context, produces a client (child) span from it, adds client-send and
72 // client-receive annotations at the boundaries, and submits the span to the
73 // collector. If no span is found in the context, a new span is generated and
74 // inserted.
75 func AnnotateClient(newSpan NewSpanFunc, c Collector) endpoint.Middleware {
76 return func(e endpoint.Endpoint) endpoint.Endpoint {
77 return func(ctx context.Context, request interface{}) (interface{}, error) {
78 var clientSpan *Span
79 parentSpan, ok := fromContext(ctx)
80 if ok {
81 clientSpan = newSpan(parentSpan.TraceID(), newID(), parentSpan.SpanID())
82 } else {
83 clientSpan = newSpan(newID(), newID(), 0)
84 }
85 ctx = context.WithValue(ctx, SpanContextKey, clientSpan) // set
86 defer func() { ctx = context.WithValue(ctx, SpanContextKey, parentSpan) }() // reset
87 clientSpan.Annotate(ClientSend)
88 defer func() { clientSpan.Annotate(ClientReceive); c.Collect(clientSpan) }()
89 return e(ctx, request)
90 }
5991 }
6092 }
62 // ToContext returns a function that satisfies transport/http.BeforeFunc. When
63 // invoked, it generates a Zipkin span from the incoming HTTP request, and
64 // saves it in the request context under the SpanContextKey.
65 func ToContext(f func(*http.Request) *Span) func(context.Context, *http.Request) context.Context {
94 // ToContext returns a function that satisfies transport/http.BeforeFunc. It
95 // takes a Zipkin span from the incoming HTTP request, and saves it in the
96 // request context. It's designed to be wired into a server's HTTP transport
97 // Before stack.
98 func ToContext(newSpan NewSpanFunc) func(ctx context.Context, r *http.Request) context.Context {
6699 return func(ctx context.Context, r *http.Request) context.Context {
67 return context.WithValue(ctx, SpanContextKey, f(r))
100 return context.WithValue(ctx, SpanContextKey, fromHTTP(newSpan, r))
68101 }
69102 }
71 // NewChildSpan creates a new child (client) span. If a span is present in the
72 // context, it will be interpreted as the parent.
73 func NewChildSpan(ctx context.Context, f func(int64, int64, int64) *Span) *Span {
74 val := ctx.Value(SpanContextKey)
75 if val == nil {
76 return f(newID(), newID(), 0)
77 }
78 parentSpan, ok := val.(*Span)
79 if !ok {
80 panic(SpanContextKey + " value isn't a span object")
81 }
82 var (
83 traceID = parentSpan.TraceID()
84 spanID = newID()
85 parentSpanID = parentSpan.SpanID()
86 )
87 return f(traceID, spanID, parentSpanID)
88 }
90 // SetRequestHeaders sets up HTTP headers for a new outbound request based on
91 // the (client) span. All IDs are encoded as hex strings.
92 func SetRequestHeaders(h http.Header, s *Span) {
93 if id := s.TraceID(); id > 0 {
94 h.Set(traceIDHTTPHeader, strconv.FormatInt(id, 16))
95 }
96 if id := s.SpanID(); id > 0 {
97 h.Set(spanIDHTTPHeader, strconv.FormatInt(id, 16))
98 }
99 if id := s.ParentSpanID(); id > 0 {
100 h.Set(parentSpanIDHTTPHeader, strconv.FormatInt(id, 16))
104 // ToRequest returns a function that satisfies transport/http.BeforeFunc. It
105 // takes a Zipkin span from the context, and injects it into the HTTP request.
106 // It's designed to be wired into a client's HTTP transport Before stack. It's
107 // expected that AnnotateClient has already ensured the span in the context is
108 // a child/client span.
109 func ToRequest(newSpan NewSpanFunc) func(ctx context.Context, r *http.Request) context.Context {
110 return func(ctx context.Context, r *http.Request) context.Context {
111 span, ok := fromContext(ctx)
112 if !ok {
113 span = newSpan(newID(), newID(), 0)
114 }
115 if id := span.TraceID(); id > 0 {
116 r.Header.Set(traceIDHTTPHeader, strconv.FormatInt(id, 16))
117 }
118 if id := span.SpanID(); id > 0 {
119 r.Header.Set(spanIDHTTPHeader, strconv.FormatInt(id, 16))
120 }
121 if id := span.ParentSpanID(); id > 0 {
122 r.Header.Set(parentSpanIDHTTPHeader, strconv.FormatInt(id, 16))
123 }
124 return ctx
101125 }
102126 }
104 func mustGetServerSpan(ctx context.Context, f func(int64, int64, int64) *Span) (*Span, context.Context) {
128 func fromHTTP(newSpan NewSpanFunc, r *http.Request) *Span {
129 traceIDStr := r.Header.Get(traceIDHTTPHeader)
130 if traceIDStr == "" {
131 Log.Log("debug", "make new span")
132 return newSpan(newID(), newID(), 0) // normal; just make a new one
133 }
134 traceID, err := strconv.ParseInt(traceIDStr, 16, 64)
135 if err != nil {
136 Log.Log(traceIDHTTPHeader, traceIDStr, "err", err)
137 return newSpan(newID(), newID(), 0)
138 }
139 spanIDStr := r.Header.Get(spanIDHTTPHeader)
140 if spanIDStr == "" {
141 Log.Log("msg", "trace ID without span ID") // abnormal
142 spanIDStr = strconv.FormatInt(newID(), 64) // deal with it
143 }
144 spanID, err := strconv.ParseInt(spanIDStr, 16, 64)
145 if err != nil {
146 Log.Log(spanIDHTTPHeader, spanIDStr, "err", err) // abnormal
147 spanID = newID() // deal with it
148 }
149 parentSpanIDStr := r.Header.Get(parentSpanIDHTTPHeader)
150 if parentSpanIDStr == "" {
151 parentSpanIDStr = "0" // normal
152 }
153 parentSpanID, err := strconv.ParseInt(parentSpanIDStr, 16, 64)
154 if err != nil {
155 Log.Log(parentSpanIDHTTPHeader, parentSpanIDStr, "err", err) // abnormal
156 parentSpanID = 0 // the only way to deal with it
157 }
158 return newSpan(traceID, spanID, parentSpanID)
159 }
161 func fromContext(ctx context.Context) (*Span, bool) {
105162 val := ctx.Value(SpanContextKey)
106163 if val == nil {
107 span := f(newID(), newID(), 0)
108 return span, context.WithValue(ctx, SpanContextKey, span)
164 return nil, false
109165 }
110166 span, ok := val.(*Span)
111167 if !ok {
112168 panic(SpanContextKey + " value isn't a span object")
113169 }
114 return span, ctx
115 }
117 func getID(h http.Header, key string) int64 {
118 val := h.Get(key)
119 if val == "" {
120 return 0
121 }
122 i, err := strconv.ParseInt(val, 16, 64)
123 if err != nil {
124 panic("invalid Zipkin ID in HTTP header: " + val)
125 }
126 return i
170 return span, true
127171 }
129173 func newID() int64 {
00 package zipkin_test
22 import (
3 "math/rand"
3 "fmt"
44 "net/http"
5 "reflect"
6 "runtime"
57 "strconv"
6 "sync/atomic"
8 "strings"
79 "testing"
911 ""
11 ""
13 ""
1214 ""
1315 )
15 func TestAnnotateEndpoint(t *testing.T) {
17 func TestToContext(t *testing.T) {
1618 const (
17 host = "some-host"
18 name = "some-name"
19 )
21 f := zipkin.NewSpanFunc(host, name)
22 c := &countingCollector{}
24 var e server.Endpoint
25 e = func(context.Context, server.Request) (server.Response, error) { return struct{}{}, nil }
26 e = zipkin.AnnotateEndpoint(f, c)(e)
28 if want, have := int32(0), int32(c.int32); want != have {
29 t.Errorf("want %d, have %d", want, have)
30 }
31 if _, err := e(context.Background(), struct{}{}); err != nil {
32 t.Fatal(err)
33 }
34 if want, have := int32(1), int32(c.int32); want != have {
35 t.Errorf("want %d, have %d", want, have)
36 }
37 }
39 func TestFromHTTPToContext(t *testing.T) {
40 const (
41 host = "foo-host"
42 name = "foo-name"
19 hostport = ""
20 serviceName = "foo-service"
21 methodName = "foo-method"
4322 traceID int64 = 12
4423 spanID int64 = 34
4524 parentSpanID int64 = 56
5029 r.Header.Set("X-B3-SpanId", strconv.FormatInt(spanID, 16))
5130 r.Header.Set("X-B3-ParentSpanId", strconv.FormatInt(parentSpanID, 16))
53 sf := zipkin.NewSpanFunc(host, name)
54 hf := zipkin.FromHTTP(sf)
55 cf := zipkin.ToContext(hf)
32 newSpan := zipkin.MakeNewSpanFunc(hostport, serviceName, methodName)
33 toContext := zipkin.ToContext(newSpan)
57 ctx := cf(context.Background(), r)
35 ctx := toContext(context.Background(), r)
5836 val := ctx.Value(zipkin.SpanContextKey)
5937 if val == nil {
6038 t.Fatalf("%s returned no value", zipkin.SpanContextKey)
6442 t.Fatalf("%s was not a Span object", zipkin.SpanContextKey)
6543 }
67 if want, have := traceID, span.TraceID(); want != have {
68 t.Errorf("want %d, have %d", want, have)
69 }
71 if want, have := spanID, span.SpanID(); want != have {
72 t.Errorf("want %d, have %d", want, have)
73 }
75 if want, have := parentSpanID, span.ParentSpanID(); want != have {
76 t.Errorf("want %d, have %d", want, have)
77 }
78 }
80 func TestNewChildSpan(t *testing.T) {
81 rand.Seed(123)
83 const (
84 host = "my-host"
85 name = "my-name"
86 traceID int64 = 123
87 spanID int64 = 456
88 parentSpanID int64 = 789
89 )
91 f := zipkin.NewSpanFunc(host, name)
92 ctx := context.WithValue(context.Background(), zipkin.SpanContextKey, f(traceID, spanID, parentSpanID))
93 childSpan := zipkin.NewChildSpan(ctx, f)
95 if want, have := traceID, childSpan.TraceID(); want != have {
96 t.Errorf("want %d, have %d", want, have)
97 }
98 if have := childSpan.SpanID(); have == spanID {
99 t.Errorf("span ID should be random, but we have %d", have)
100 }
101 if want, have := spanID, childSpan.ParentSpanID(); want != have {
102 t.Errorf("want %d, have %d", want, have)
103 }
104 }
106 func TestSetRequestHeaders(t *testing.T) {
107 const (
108 host = "bar-host"
109 name = "bar-name"
110 traceID int64 = 123
111 spanID int64 = 456
112 parentSpanID int64 = 789
113 )
115 r, _ := http.NewRequest("POST", "", nil)
116 zipkin.SetRequestHeaders(r.Header, zipkin.NewSpan(host, name, traceID, spanID, parentSpanID))
118 for h, want := range map[string]string{
119 "X-B3-TraceId": strconv.FormatInt(traceID, 16),
120 "X-B3-SpanId": strconv.FormatInt(spanID, 16),
121 "X-B3-ParentSpanId": strconv.FormatInt(parentSpanID, 16),
45 for want, haveFunc := range map[int64]func() int64{
46 traceID: span.TraceID,
47 spanID: span.SpanID,
48 parentSpanID: span.ParentSpanID,
12249 } {
123 if have := r.Header.Get(h); want != have {
124 t.Errorf("%s: want %s, have %s", h, want, have)
50 if have := haveFunc(); want != have {
51 name := runtime.FuncForPC(reflect.ValueOf(haveFunc).Pointer()).Name()
52 name = strings.Split(name, "·")[0]
53 toks := strings.Split(name, ".")
54 name = toks[len(toks)-1]
55 t.Errorf("%s: want %d, have %d", name, want, have)
12556 }
12657 }
12758 }
129 type countingCollector struct{ int32 }
60 func TestToRequest(t *testing.T) {
61 const (
62 hostport = ""
63 serviceName = "foo-service"
64 methodName = "foo-method"
65 traceID int64 = 20
66 spanID int64 = 40
67 parentSpanID int64 = 90
68 )
131 func (c *countingCollector) Collect(*zipkin.Span) error { atomic.AddInt32(&(c.int32), 1); return nil }
70 newSpan := zipkin.MakeNewSpanFunc(hostport, serviceName, methodName)
71 span := newSpan(traceID, spanID, parentSpanID)
72 ctx := context.WithValue(context.Background(), zipkin.SpanContextKey, span)
73 r, _ := http.NewRequest("GET", "", nil)
74 ctx = zipkin.ToRequest(newSpan)(ctx, r)
76 for header, wantInt := range map[string]int64{
77 "X-B3-TraceId": traceID,
78 "X-B3-SpanId": spanID,
79 "X-B3-ParentSpanId": parentSpanID,
80 } {
81 if want, have := strconv.FormatInt(wantInt, 16), r.Header.Get(header); want != have {
82 t.Errorf("%s: want %q, have %q", header, want, have)
83 }
84 }
85 }
87 func TestAnnotateServer(t *testing.T) {
88 if err := testAnnotate(zipkin.AnnotateServer, zipkin.ServerReceive, zipkin.ServerSend); err != nil {
89 t.Fatal(err)
90 }
91 }
93 func TestAnnotateClient(t *testing.T) {
94 if err := testAnnotate(zipkin.AnnotateClient, zipkin.ClientSend, zipkin.ClientReceive); err != nil {
95 t.Fatal(err)
96 }
97 }
99 func testAnnotate(
100 annotate func(newSpan zipkin.NewSpanFunc, c zipkin.Collector) endpoint.Middleware,
101 wantAnnotations ...string,
102 ) error {
103 const (
104 hostport = ""
105 serviceName = "some-service"
106 methodName = "some-method"
107 )
109 newSpan := zipkin.MakeNewSpanFunc(hostport, serviceName, methodName)
110 collector := &countingCollector{}
112 var e endpoint.Endpoint
113 e = func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil }
114 e = annotate(newSpan, collector)(e)
116 if want, have := 0, len(collector.annotations); want != have {
117 return fmt.Errorf("pre-invocation: want %d, have %d", want, have)
118 }
119 if _, err := e(context.Background(), struct{}{}); err != nil {
120 return fmt.Errorf("during invocation: %v", err)
121 }
122 if want, have := wantAnnotations, collector.annotations; !reflect.DeepEqual(want, have) {
123 return fmt.Errorf("after invocation: want %v, have %v", want, have)
124 }
126 return nil
127 }
129 type countingCollector struct{ annotations []string }
131 func (c *countingCollector) Collect(s *zipkin.Span) error {
132 for _, annotation := range s.Encode().GetAnnotations() {
133 c.annotations = append(c.annotations, annotation.GetValue())
134 }
135 return nil
136 }
33 "io"
55 ""
7 ""
86 )
10 // Codec defines how to decode and encode requests and responses. Decode takes
11 // and returns a context because the request may be accompanied by information
8 // Codec decodes and encodes requests and responses. Decode takes and returns
9 // a context because the request or response may be accompanied by information
1210 // that needs to be applied there.
1311 type Codec interface {
14 Decode(context.Context, io.Reader, server.Request) (context.Context, error)
15 Encode(io.Writer, server.Response) error
12 Decode(context.Context, io.Reader, interface{}) (context.Context, error)
13 Encode(io.Writer, interface{}) error
1614 }
66 ""
8 ""
98 ""
109 )
1514 // properly-tagged fields.
1615 func New() codec.Codec { return jsonCodec{} }
18 func (jsonCodec) Decode(ctx context.Context, r io.Reader, req server.Request) (context.Context, error) {
19 return ctx, json.NewDecoder(r).Decode(req)
17 func (jsonCodec) Decode(ctx context.Context, r io.Reader, v interface{}) (context.Context, error) {
18 return ctx, json.NewDecoder(r).Decode(v)
2019 }
22 func (jsonCodec) Encode(w io.Writer, resp server.Response) error {
23 return json.NewEncoder(w).Encode(resp)
21 func (jsonCodec) Encode(w io.Writer, v interface{}) error {
22 return json.NewEncoder(w).Encode(v)
2423 }
0 package http_test
2 import (
3 "net/http/httptest"
4 "testing"
6 ""
8 httptransport ""
9 )
11 func TestSetContentType(t *testing.T) {
12 contentType := "application/whatever"
13 rec := httptest.NewRecorder()
14 httptransport.SetContentType(contentType)(context.Background(), rec)
15 if want, have := contentType, rec.Header().Get("Content-Type"); want != have {
16 t.Errorf("want %q, have %q", want, have)
17 }
18 }
22 import (
33 "net/http"
4 "reflect"
65 ""
8 ""
7 ""
98 ""
109 )
12 // BindingOption sets a parameter for the binding.
13 type BindingOption func(*binding)
15 // Before adds pre-RPC BeforeFuncs to the binding.
16 func Before(funcs ...BeforeFunc) BindingOption {
17 return func(b *binding) { b.before = append(b.before, funcs...) }
18 }
20 // After adds post-RPC AfterFuncs to the binding.
21 func After(funcs ...AfterFunc) BindingOption {
22 return func(b *binding) { b.after = append(b.after, funcs...) }
23 }
2511 type binding struct {
2612 context.Context
27 requestType reflect.Type
13 makeRequest func() interface{}
2814 codec.Codec
29 server.Endpoint
15 endpoint.Endpoint
3016 before []BeforeFunc
3117 after []AfterFunc
3218 }
3420 // NewBinding returns an HTTP handler that wraps the given endpoint.
35 func NewBinding(ctx context.Context, requestType reflect.Type, cdc codec.Codec, endpoint server.Endpoint, options ...BindingOption) http.Handler {
21 func NewBinding(ctx context.Context, makeRequest func() interface{}, cdc codec.Codec, e endpoint.Endpoint, options ...BindingOption) http.Handler {
3622 b := &binding{
3723 Context: ctx,
38 requestType: requestType,
24 makeRequest: makeRequest,
3925 Codec: cdc,
40 Endpoint: endpoint,
26 Endpoint: e,
4127 }
4228 for _, option := range options {
4329 option(b)
5642 }
5844 // Decode request.
59 req := reflect.New(b.requestType).Interface()
45 req := b.makeRequest()
6046 ctx, err := b.Codec.Decode(ctx, r.Body, req)
6147 if err != nil {
6248 http.Error(w, err.Error(), http.StatusBadRequest)
8167 return
8268 }
8369 }
71 // BindingOption sets a parameter for the HTTP binding.
72 type BindingOption func(*binding)
74 // BindingBefore adds pre-RPC BeforeFuncs to the HTTP binding.
75 func BindingBefore(funcs ...BeforeFunc) BindingOption {
76 return func(b *binding) { b.before = append(b.before, funcs...) }
77 }
79 // BindingAfter adds post-RPC AfterFuncs to the HTTP binding.
80 func BindingAfter(funcs ...AfterFunc) BindingOption {
81 return func(b *binding) { b.after = append(b.after, funcs...) }
82 }
1111 ""
13 ""
1413 jsoncodec ""
1514 httptransport ""
1615 )
2827 return 3 * i // doesn't matter, just do something
2928 }
31 endpoint := func(_ context.Context, req server.Request) (server.Response, error) {
32 r, ok := req.(*myRequest)
30 endpoint := func(_ context.Context, request interface{}) (interface{}, error) {
31 r, ok := request.(*myRequest)
3332 if !ok {
34 return nil, fmt.Errorf("not myRequest (%s)", reflect.TypeOf(req))
33 return nil, fmt.Errorf("not myRequest (%s)", reflect.TypeOf(request))
3534 }
3635 return myResponse{transform(r.In)}, nil
3736 }
3938 ctx := context.Background()
40 requestType := reflect.TypeOf(myRequest{})
39 makeRequest := func() interface{} { return &myRequest{} }
4140 codec := jsoncodec.New()
42 binding := httptransport.NewBinding(ctx, requestType, codec, endpoint)
41 binding := httptransport.NewBinding(ctx, makeRequest, codec, endpoint)
4342 server := httptest.NewServer(binding)
4443 defer server.Close()
5655 defer resp.Body.Close()
5857 var r myResponse
59 if err := json.NewDecoder(resp.Body).Decode(&r); err != nil {
58 if _, err := codec.Decode(ctx, resp.Body, &r); err != nil {
6059 t.Fatal(err)
6160 }
0 package http
2 import (
3 "bytes"
4 "net/http"
5 "net/url"
7 ""
9 ""
10 ""
11 )
13 type httpClient struct {
14 *url.URL
15 codec.Codec
16 method string
17 *http.Client
18 before []BeforeFunc
19 makeResponse func() interface{}
20 }
22 // NewClient returns a client endpoint for a remote service. addr must be a
23 // valid, parseable URL, including the scheme and path.
24 func NewClient(addr string, cdc codec.Codec, makeResponse func() interface{}, options ...ClientOption) endpoint.Endpoint {
25 u, err := url.Parse(addr)
26 if err != nil {
27 panic(err)
28 }
29 c := httpClient{
30 URL: u,
31 Codec: cdc,
32 method: "GET",
33 Client: http.DefaultClient,
34 makeResponse: makeResponse,
35 }
36 for _, option := range options {
37 option(&c)
38 }
39 return c.endpoint
40 }
42 func (c httpClient) endpoint(ctx context.Context, request interface{}) (interface{}, error) {
43 var buf bytes.Buffer
44 if err := c.Codec.Encode(&buf, request); err != nil {
45 return nil, err
46 }
48 req, err := http.NewRequest(c.method, c.URL.String(), &buf)
49 if err != nil {
50 return nil, err
51 }
53 for _, f := range c.before {
54 f(ctx, req)
55 }
57 resp, err := c.Client.Do(req)
58 if err != nil {
59 return nil, err
60 }
61 defer resp.Body.Close()
63 response := c.makeResponse()
64 ctx, err = c.Codec.Decode(ctx, resp.Body, response)
65 if err != nil {
66 return nil, err
67 }
69 return response, nil
70 }
72 // ClientOption sets a parameter for the HTTP client.
73 type ClientOption func(*httpClient)
75 // ClientBefore adds pre-invocation BeforeFuncs to the HTTP client.
76 func ClientBefore(funcs ...BeforeFunc) ClientOption {
77 return func(c *httpClient) { c.before = append(c.before, funcs...) }
78 }
80 // ClientMethod sets the method used to invoke the RPC. By default, it's GET.
81 func ClientMethod(method string) ClientOption {
82 return func(c *httpClient) { c.method = method }
83 }
85 // SetClient sets the HTTP client struct used to invoke the RPC. By default,
86 // it's http.DefaultClient.
87 func SetClient(client *http.Client) ClientOption {
88 return func(c *httpClient) { c.Client = client }
89 }
0 package http_test
2 import (
3 "net/http"
4 "net/http/httptest"
5 "reflect"
6 "testing"
8 ""
10 jsoncodec ""
11 httptransport ""
12 )
14 func TestClient(t *testing.T) {
15 type myResponse struct {
16 V int `json:"v"`
17 }
18 const v = 123
19 codec := jsoncodec.New()
20 server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
21 codec.Encode(w, myResponse{v})
22 }))
23 defer server.Close()
24 makeResponse := func() interface{} { return &myResponse{} }
25 client := httptransport.NewClient(server.URL, codec, makeResponse)
26 resp, err := client(context.Background(), struct{}{})
27 if err != nil {
28 t.Fatal(err)
29 }
30 response, ok := resp.(*myResponse)
31 if !ok {
32 t.Fatalf("not myResponse (%s)", reflect.TypeOf(response))
33 }
34 if want, have := v, response.V; want != have {
35 t.Errorf("want %d, have %d", want, have)
36 }
37 }