Codebase list golang-github-go-kit-kit / 4b54361
Merge branch 'master' into logfmt-logger Conflicts: addsvc/main.go Chris Hines 8 years ago
31 changed file(s) with 883 addition(s) and 351 deletion(s). Raw diff Collapse all Expand all
0 addsvc/addsvc
1 cover.out
2
3 # Compiled Object files, Static and Dynamic libs (Shared Objects)
4 *.o
5 *.a
6 *.so
7
8 # Folders
9 _obj
10 _test
11 _old*
12
13 # Architecture specific extensions/prefixes
14 *.[568vq]
15 [568vq].out
16
17 *.cgo1.go
18 *.cgo2.c
19 _cgo_defun.c
20 _cgo_gotypes.go
21 _cgo_export.*
22
23 _testmain.go
24
25 *.exe
1111 ## Goals
1212
1313 - Operate in a heterogeneous SOA — expect to interact with mostly non-gokit services
14 - RPC as the messaging pattern
14 - RPC as the primary messaging pattern
1515 - Pluggable serialization and transport — not just JSON over HTTP
1616 - Zipkin-compatible request tracing
1717
1818 ## Non-goals
1919
20 - Supporting messaging patterns other than RPC — pub/sub, CQRS, etc.
20 - Supporting messaging patterns other than RPC (in the initial release) — pub/sub, CQRS, etc.
2121 - Having opinions on deployment, orchestration, process supervision, etc.
22 - Having opinions on configuration passing, i.e. flags, env vars, files, etc.
22 - Having opinions on configuration passing — flags, env vars, files, etc.
2323
2424 ## Component status
2525
26 - API stability — adopted
27 - `package metrics` — [implemented](https://github.com/go-kit/kit/tree/master/metrics)
28 - `package server` — [implemented](https://github.com/go-kit/kit/tree/master/server)
29 - `package transport` — [implemented](https://github.com/go-kit/kit/tree/master/transport)
30 - `package log` — [implemented](https://github.com/go-kit/kit/tree/master/log)
31 - `package tracing` — [prototyping](Https://github.com/go-kit/kit/tree/master/tracing)
26 - [API stability](https://github.com/go-kit/kit/blob/master/rfc/rfc007-api-stability.md) — **adopted**
27 - [`package metrics`](https://github.com/go-kit/kit/tree/master/metrics) — **implemented**
28 - [`package server`](https://github.com/go-kit/kit/tree/master/server) — **implemented**
29 - [`package transport`](https://github.com/go-kit/kit/tree/master/transport) — **implemented**
30 - [`package log`](https://github.com/go-kit/kit/tree/master/log) — **implemented**
31 - [`package tracing`](https://github.com/go-kit/kit/tree/master/tracing) — prototyping
3232 - `package client` — pending
3333 - Service discovery — pending
3434
101101 - [Gorilla](http://www.gorillatoolkit.org)
102102 - [Martini](https://github.com/go-martini/martini)
103103 - [Negroni](https://github.com/codegangsta/negroni)
104 - [Revel](https://revel.github.io/)
104 - [Revel](https://revel.github.io/) (considered harmful)
105105
106106 ## Additional reading
107107
+0
-1
addsvc/.gitignore less more
0 addsvc
0 # addsvc
1
2 addsvc is an example service, used to illustrate the mechanics of gokit.
3 It exposes simple functionality on a variety of transports and endpoints.
4
5 ## Server
6
7 To build and run addsvc,
8
9 ```
10 $ go install
11 $ addsvc
12 ```
13
14 ## Client
15
16 TODO
00 package main
11
2 import (
3 "golang.org/x/net/context"
4
5 "github.com/go-kit/kit/endpoint"
6 "github.com/go-kit/kit/log"
7 )
8
29 // Add is the abstract definition of what this service does. It could easily
3 // be an interface type with multiple methods. Each method would be an
4 // endpoint.
5 type Add func(int64, int64) int64
10 // be an interface type with multiple methods, in which case each method would
11 // be an endpoint.
12 type Add func(context.Context, int64, int64) int64
613
7 func pureAdd(a, b int64) int64 { return a + b }
14 // pureAdd implements Add with no dependencies.
15 func pureAdd(_ context.Context, a, b int64) int64 { return a + b }
816
9 func addVia(r Resource) Add {
10 return func(a, b int64) int64 {
11 return r.Value(a) + r.Value(b)
17 // proxyAdd returns an implementation of Add that invokes a remote Add
18 // service.
19 func proxyAdd(e endpoint.Endpoint, logger log.Logger) Add {
20 return func(ctx context.Context, a, b int64) int64 {
21 resp, err := e(ctx, &addRequest{a, b})
22 if err != nil {
23 logger.Log("err", err)
24 return 0
25 }
26 addResp, ok := resp.(*addResponse)
27 if !ok {
28 logger.Log("err", endpoint.ErrBadCast)
29 return 0
30 }
31 return addResp.V
1232 }
1333 }
14
15 // Resource represents some dependency, outside of our control.
16 type Resource interface {
17 Value(int64) int64
18 }
19
20 type mockResource struct{}
21
22 func (mockResource) Value(i int64) int64 { return i }
22 import (
33 "golang.org/x/net/context"
44
5 "github.com/go-kit/kit/server"
5 "github.com/go-kit/kit/endpoint"
66 )
77
8 // makeEndpoint returns a server.Endpoint wrapping the passed Add. If Add were
9 // an interface with multiple methods, we'd need individual endpoints for
10 // each.
8 // makeEndpoint returns an endpoint wrapping the passed Add. If Add were an
9 // interface with multiple methods, we'd need individual endpoints for each.
1110 //
1211 // This function is just boiler-plate; in theory, it could be generated.
13 func makeEndpoint(a Add) server.Endpoint {
14 return func(ctx context.Context, req server.Request) (server.Response, error) {
12 func makeEndpoint(a Add) endpoint.Endpoint {
13 return func(ctx context.Context, request interface{}) (interface{}, error) {
1514 select {
15 default:
1616 case <-ctx.Done():
17 return nil, server.ErrContextCanceled
18 default:
17 return nil, endpoint.ErrContextCanceled
1918 }
2019
21 addReq, ok := req.(*request)
20 addReq, ok := request.(*addRequest)
2221 if !ok {
23 return nil, server.ErrBadCast
22 return nil, endpoint.ErrBadCast
2423 }
2524
26 v := a(addReq.A, addReq.B)
25 v := a(ctx, addReq.A, addReq.B)
2726
28 return response{
29 V: v,
30 }, nil
27 return addResponse{V: v}, nil
3128 }
3229 }
22 import (
33 "time"
44
5 "golang.org/x/net/context"
6
57 "github.com/go-kit/kit/log"
68 )
79
8 func logging(logger log.Logger, add Add) Add {
9 return func(a, b int64) (v int64) {
10 defer func(begin time.Time) {
11 logger.Log("a", a, "b", b, "result", v, "took", time.Since(begin))
12 }(time.Now())
13 v = add(a, b)
14 return
10 func logging(logger log.Logger) func(Add) Add {
11 return func(next Add) Add {
12 return func(ctx context.Context, a, b int64) (v int64) {
13 defer func(begin time.Time) {
14 logger.Log("a", a, "b", b, "result", v, "took", time.Since(begin))
15 }(time.Now())
16 v = next(ctx, a, b)
17 return
18 }
1519 }
1620 }
55 "golang.org/x/net/context"
66
77 "github.com/go-kit/kit/addsvc/pb"
8 "github.com/go-kit/kit/endpoint"
89 "github.com/go-kit/kit/metrics"
9 "github.com/go-kit/kit/server"
1010 )
1111
1212 // A binding wraps an Endpoint so that it's usable by a transport. grpcBinding
1313 // makes an Endpoint usable over gRPC.
14 type grpcBinding struct{ server.Endpoint }
14 type grpcBinding struct{ endpoint.Endpoint }
1515
1616 // Add implements the proto3 AddServer by forwarding to the wrapped Endpoint.
1717 //
1919 // way to manipulate the RPC context, like headers for HTTP. So we don't have
2020 // a way to transport e.g. Zipkin IDs with the request. TODO.
2121 func (b grpcBinding) Add(ctx context.Context, req *pb.AddRequest) (*pb.AddReply, error) {
22 addReq := request{req.A, req.B}
22 addReq := addRequest{req.A, req.B}
2323 r, err := b.Endpoint(ctx, addReq)
2424 if err != nil {
2525 return nil, err
2626 }
2727
28 resp, ok := r.(*response)
28 resp, ok := r.(*addResponse)
2929 if !ok {
30 return nil, server.ErrBadCast
30 return nil, endpoint.ErrBadCast
3131 }
3232
3333 return &pb.AddReply{
44 "fmt"
55 "io/ioutil"
66 stdlog "log"
7 "math/rand"
78 "net"
89 "net/http"
910 _ "net/http/pprof"
1011 "os"
1112 "os/signal"
12 "reflect"
13 "strings"
1314 "syscall"
1415 "time"
1516
2223
2324 thriftadd "github.com/go-kit/kit/addsvc/_thrift/gen-go/add"
2425 "github.com/go-kit/kit/addsvc/pb"
26 "github.com/go-kit/kit/endpoint"
2527 kitlog "github.com/go-kit/kit/log"
2628 "github.com/go-kit/kit/metrics"
2729 "github.com/go-kit/kit/metrics/expvar"
2830 "github.com/go-kit/kit/metrics/prometheus"
2931 "github.com/go-kit/kit/metrics/statsd"
30 "github.com/go-kit/kit/server"
3132 "github.com/go-kit/kit/tracing/zipkin"
3233 jsoncodec "github.com/go-kit/kit/transport/codec/json"
3334 httptransport "github.com/go-kit/kit/transport/http"
3435 )
3536
3637 func main() {
38 // Flag domain. Note that gRPC transitively registers flags via its import
39 // of glog. So, we define a new flag set, to keep those domains distinct.
40 fs := flag.NewFlagSet("", flag.ExitOnError)
3741 var (
38 debugAddr = flag.String("debug.addr", ":8000", "Address for HTTP debug/instrumentation server")
39 httpAddr = flag.String("http.addr", ":8001", "Address for HTTP (JSON) server")
40 grpcAddr = flag.String("grpc.addr", ":8002", "Address for gRPC server")
41 thriftAddr = flag.String("thrift.addr", ":8003", "Address for Thrift server")
42 thriftProtocol = flag.String("thrift.protocol", "binary", "binary, compact, json, simplejson")
43 thriftBufferSize = flag.Int("thrift.buffer.size", 0, "0 for unbuffered")
44 thriftFramed = flag.Bool("thrift.framed", false, "true to enable framing")
45 )
46 flag.Parse()
42 debugAddr = fs.String("debug.addr", ":8000", "Address for HTTP debug/instrumentation server")
43 httpAddr = fs.String("http.addr", ":8001", "Address for HTTP (JSON) server")
44 grpcAddr = fs.String("grpc.addr", ":8002", "Address for gRPC server")
45 thriftAddr = fs.String("thrift.addr", ":8003", "Address for Thrift server")
46 thriftProtocol = fs.String("thrift.protocol", "binary", "binary, compact, json, simplejson")
47 thriftBufferSize = fs.Int("thrift.buffer.size", 0, "0 for unbuffered")
48 thriftFramed = fs.Bool("thrift.framed", false, "true to enable framing")
49
50 proxyHTTPAddr = fs.String("proxy.http.url", "", "if set, proxy requests over HTTP to this addsvc")
51
52 zipkinServiceName = fs.String("zipkin.service.name", "addsvc", "Zipkin service name")
53 zipkinCollectorAddr = fs.String("zipkin.collector.addr", "", "Zipkin Scribe collector address (empty will log spans)")
54 zipkinCollectorTimeout = fs.Duration("zipkin.collector.timeout", time.Second, "Zipkin collector timeout")
55 zipkinCollectorBatchSize = fs.Int("zipkin.collector.batch.size", 100, "Zipkin collector batch size")
56 zipkinCollectorBatchInterval = fs.Duration("zipkin.collector.batch.interval", time.Second, "Zipkin collector batch interval")
57 )
58 flag.Usage = fs.Usage // only show our flags
59 fs.Parse(os.Args[1:])
4760
4861 // `package log` domain
4962 var logger kitlog.Logger
5063 logger = kitlog.NewLogfmtLogger(os.Stderr)
51 logger = kitlog.With(logger, "ts", kitlog.DefaultTimestampUTC)
64 logger = kitlog.With(logger, "ts", kitlog.DefaultTimestampUTC, "caller", kitlog.DefaultCaller)
5265 stdlog.SetOutput(kitlog.NewStdlibAdapter(logger)) // redirect stdlib logging to us
5366 stdlog.SetFlags(0) // flags are handled in our logger
5467
7588 )
7689
7790 // `package tracing` domain
78 zipkinHost := "my-host"
79 zipkinCollector := loggingCollector{logger}
80 zipkinAddName := "ADD" // is that right?
81 zipkinAddSpanFunc := zipkin.NewSpanFunc(zipkinHost, zipkinAddName)
91 zipkinHostPort := "localhost:1234" // TODO Zipkin makes overly simple assumptions about services
92 var zipkinCollector zipkin.Collector = loggingCollector{logger}
93 if *zipkinCollectorAddr != "" {
94 var err error
95 if zipkinCollector, err = zipkin.NewScribeCollector(
96 *zipkinCollectorAddr,
97 *zipkinCollectorTimeout,
98 *zipkinCollectorBatchSize,
99 *zipkinCollectorBatchInterval,
100 ); err != nil {
101 logger.Log("err", err)
102 os.Exit(1)
103 }
104 }
105 zipkinMethodName := "add"
106 zipkinSpanFunc := zipkin.MakeNewSpanFunc(zipkinHostPort, *zipkinServiceName, zipkinMethodName)
107 zipkin.Log.Swap(logger) // log diagnostic/error details
82108
83109 // Our business and operational domain
84 var a Add
85 a = pureAdd
86 a = logging(logger, a)
87
88 // `package server` domain
89 var e server.Endpoint
110 var a Add = pureAdd
111 if *proxyHTTPAddr != "" {
112 codec := jsoncodec.New()
113 makeResponse := func() interface{} { return &addResponse{} }
114
115 var e endpoint.Endpoint
116 e = httptransport.NewClient(*proxyHTTPAddr, codec, makeResponse, httptransport.ClientBefore(zipkin.ToRequest(zipkinSpanFunc)))
117 e = zipkin.AnnotateClient(zipkinSpanFunc, zipkinCollector)(e)
118
119 a = proxyAdd(e, logger)
120 }
121 a = logging(logger)(a)
122
123 // Server domain
124 var e endpoint.Endpoint
90125 e = makeEndpoint(a)
91 e = zipkin.AnnotateEndpoint(zipkinAddSpanFunc, zipkinCollector)(e)
92 // e = someother.Middleware(arg1, arg2)(e)
126 e = zipkin.AnnotateServer(zipkinSpanFunc, zipkinCollector)(e)
93127
94128 // Mechanical stuff
129 rand.Seed(time.Now().UnixNano())
95130 root := context.Background()
96131 errc := make(chan error)
97132
111146 defer cancel()
112147
113148 field := metrics.Field{Key: "transport", Value: "http"}
114 before := httptransport.Before(zipkin.ToContext(zipkin.FromHTTP(zipkinAddSpanFunc)))
115 after := httptransport.After(httptransport.SetContentType("application/json"))
149 before := httptransport.BindingBefore(zipkin.ToContext(zipkinSpanFunc))
150 after := httptransport.BindingAfter(httptransport.SetContentType("application/json"))
151 makeRequest := func() interface{} { return &addRequest{} }
116152
117153 var handler http.Handler
118 handler = httptransport.NewBinding(ctx, reflect.TypeOf(request{}), jsoncodec.New(), e, before, after)
154 handler = httptransport.NewBinding(ctx, makeRequest, jsoncodec.New(), e, before, after)
119155 handler = encoding.Gzip(handler)
120156 handler = cors.Middleware(cors.Config{})(handler)
121157 handler = httpInstrument(requests.With(field), duration.With(field))(handler)
209245 type loggingCollector struct{ kitlog.Logger }
210246
211247 func (c loggingCollector) Collect(s *zipkin.Span) error {
212 kitlog.With(c.Logger, "caller", kitlog.DefaultCaller).Log(
248 annotations := s.Encode().GetAnnotations()
249 values := make([]string, len(annotations))
250 for i, a := range annotations {
251 values[i] = a.Value
252 }
253 c.Logger.Log(
213254 "trace_id", s.TraceID(),
214255 "span_id", s.SpanID(),
215256 "parent_span_id", s.ParentSpanID(),
257 "annotations", strings.Join(values, " "),
216258 )
217259 return nil
218260 }
22 // The request and response types should be annotated sufficiently for all
33 // transports we intend to use.
44
5 type request struct {
5 type addRequest struct {
66 A int64 `json:"a"`
77 B int64 `json:"b"`
88 }
99
10 type response struct {
10 type addResponse struct {
1111 V int64 `json:"v"`
1212 }
55 "golang.org/x/net/context"
66
77 thriftadd "github.com/go-kit/kit/addsvc/_thrift/gen-go/add"
8 "github.com/go-kit/kit/endpoint"
89 "github.com/go-kit/kit/metrics"
9 "github.com/go-kit/kit/server"
1010 )
1111
1212 // A binding wraps an Endpoint so that it's usable by a transport.
1313 // thriftBinding makes an Endpoint usable over Thrift.
1414 type thriftBinding struct {
1515 context.Context
16 server.Endpoint
16 endpoint.Endpoint
1717 }
1818
1919 // Add implements Thrift's AddService interface.
2020 func (tb thriftBinding) Add(a, b int64) (*thriftadd.AddReply, error) {
21 r, err := tb.Endpoint(tb.Context, request{a, b})
21 r, err := tb.Endpoint(tb.Context, addRequest{a, b})
2222 if err != nil {
2323 return nil, err
2424 }
2525
26 resp, ok := r.(*response)
26 resp, ok := r.(*addResponse)
2727 if !ok {
28 return nil, server.ErrBadCast
28 return nil, endpoint.ErrBadCast
2929 }
3030
3131 return &thriftadd.AddReply{Value: resp.V}, nil
1616
1717 function unique_directories { directories | sort | uniq ; }
1818
19 PATHS=${1:-$(unique_directories)}
20
1921 function package_names {
20 unique_directories | while read d
22 for d in $PATHS
2123 do
2224 echo github.com/go-kit/kit/$d
2325 done
0 package endpoint
1
2 import (
3 "errors"
4
5 "golang.org/x/net/context"
6 )
7
8 // Endpoint is the fundamental building block of servers and clients.
9 // It represents a single RPC method.
10 type Endpoint func(ctx context.Context, request interface{}) (response interface{}, err error)
11
12 // Middleware is a chainable behavior modifier for endpoints.
13 type Middleware func(Endpoint) Endpoint
14
15 // ErrBadCast indicates an unexpected concrete request or response struct was
16 // received from an endpoint.
17 var ErrBadCast = errors.New("bad cast")
18
19 // ContextCanceled indicates the request context was canceled.
20 var ErrContextCanceled = errors.New("context canceled")
0 # package log
1
2 `package log` provides a minimal interface for structured logging in services.
3 It may be wrapped to encode conventions, enforce type-safety, etc.
4 It can be used for both typical application log events, and log-structured data streams.
5
6 ## Rationale
7
8 TODO
9
10 ## Usage
11
12 Typical application logging.
13
14 ```go
15 import "github.com/go-kit/kit/log"
16
17 func main() {
18 logger := log.NewPrefixLogger(os.Stderr)
19 logger.Log("question", "what is the meaning of life?", "answer", 42)
20 }
21 ```
22
23 Contextual logging.
24
25 ```go
26 func handle(logger log.Logger, req *Request) {
27 logger = log.With(logger, "txid", req.TransactionID, "query", req.Query)
28 logger.Log()
29
30 answer, err := process(logger, req.Query)
31 if err != nil {
32 logger.Log("err", err)
33 return
34 }
35
36 logger.Log("answer", answer)
37 }
38 ```
39
40 Redirect stdlib log to gokit logger.
41
42 ```go
43 import (
44 "os"
45 stdlog "log"
46 kitlog "github.com/go-kit/kit/log"
47 )
48
49 func main() {
50 logger := kitlog.NewJSONLogger(os.Stdout)
51 stdlog.SetOutput(kitlog.NewStdlibAdapter(logger))
52 }
53 ```
88
99 // StdlibWriter implements io.Writer by invoking the stdlib log.Print. It's
1010 // designed to be passed to a gokit logger as the writer, for cases where it's
11 // desirable to pipe all log output to the same, canonical destination.
11 // necessary to redirect all gokit log output to the stdlib logger.
12 //
13 // If you have any choice in the matter, you shouldn't use this. Prefer to
14 // redirect the stdlib log to the gokit logger via NewStdlibAdapter.
1215 type StdlibWriter struct{}
1316
1417 // Write implements io.Writer.
0 # package metrics
1
2 `package metrics` provides a set of uniform interfaces for service instrumentation.
3 It has **[counters][]**, **[gauges][]**, and **[histograms][]**,
4 and provides adapters to popular metrics packages, like **[expvar][]**, **[statsd][]**, and **[Prometheus][]**.
5
6 [counters]: http://prometheus.io/docs/concepts/metric_types/#counter
7 [gauges]: http://prometheus.io/docs/concepts/metric_types/#gauge
8 [histograms]: http://prometheus.io/docs/concepts/metric_types/#histogram
9 [expvar]: https://golang.org/pkg/expvar
10 [statsd]: https://github.com/etsy/statsd
11 [Prometheus]: http://prometheus.io
12
13 ## Rationale
14
15 TODO
16
17 ## Usage
18
19 A simple counter, exported via expvar.
20
21 ```go
22 import "github.com/go-kit/kit/metrics/expvar"
23
24 func main() {
25 myCount := expvar.NewCounter("my_count")
26 myCount.Add(1)
27 }
28 ```
29
30 A histogram for request duration, exported via a Prometheus summary with
31 dynamically-computed quantiles.
32
33 ```go
34 import (
35 stdprometheus "github.com/prometheus/client_golang/prometheus"
36
37 "github.com/go-kit/kit/metrics"
38 "github.com/go-kit/kit/metrics/prometheus"
39 "github.com/go-kit/kit/metrics/statsd"
40 )
41
42 var requestDuration = prometheus.NewSummary(stdprometheus.SummaryOpts{
43 Namespace: "myservice",
44 Subsystem: "api",
45 Name: "request_duration_nanoseconds_count",
46 Help: "Total time spent serving requests.",
47 }, []string{})
48
49 func handleRequest() {
50 defer func(begin time.Time) { requestDuration.Observe(time.Since(begin)) }(time.Now())
51 // handle request
52 }
53 ```
0 # package server
1
2 `package server` is a very small package that collects interfaces used by services.
3 Most server-side functionality is actually implemented by surrounding packages.
4
5 # Rationale
6
7 TODO
8
9 # Usage
10
11 As currently defined, you shouldn't need to use `package server` directly.
12 Other gokit components integrate on `package server` interfaces.
+0
-24
server/server.go less more
0 package server
1
2 import (
3 "errors"
4
5 "golang.org/x/net/context"
6 )
7
8 // Request is an RPC request.
9 type Request interface{}
10
11 // Response is an RPC response.
12 type Response interface{}
13
14 // Endpoint is the fundamental building block of package server.
15 // It represents a single RPC method.
16 type Endpoint func(context.Context, Request) (Response, error)
17
18 // ErrBadCast indicates a type error during decoding or encoding.
19 var ErrBadCast = errors.New("bad cast")
20
21 // ErrContextCanceled indicates a controlling context was canceled before the
22 // request could be served.
23 var ErrContextCanceled = errors.New("context was canceled")
0 # package tracing
1
2 `package tracing` provides [Dapper-style][dapper] request tracing to services.
3 An implementation exists for [Zipkin][]; [Appdash][] support is planned.
4
5 [dapper]: http://research.google.com/pubs/pub36356.html
6 [Zipkin]: https://blog.twitter.com/2012/distributed-systems-tracing-with-zipkin
7 [Appdash]: https://sourcegraph.com/blog/117580140734
8
9 ## Rationale
10
11 TODO
12
13 ## Usage
14
15 Wrap a [server.Endpoint][] so that it emits traces to a Zipkin collector.
16
17 [server.Endpoint]: http://godoc.org/github.com/go-kit/kit/server#Endpoint
18
19 ```go
20 func main() {
21 var (
22 myHost = "instance01.addsvc.internal.net"
23 myMethod = "ADD"
24 scribeHost = "scribe.internal.net"
25 timeout = 50 * time.Millisecond
26 batchSize = 100
27 batchInterval = 3 * time.Second
28 )
29
30 spanFunc := zipkin.NewSpanFunc(myHost, myMethod)
31 collector, _ := zipkin.NewScribeCollector(scribeHost, timeout, batchSize, batchInterval)
32
33 var e server.Endpoint
34 e = makeEndpoint() // for your service
35 e = zipkin.AnnotateEndpoint(spanFunc, collector)
36
37 serve(e)
38 }
39 ```
2626 }
2727
2828 var (
29 name = "span-name"
29 serviceName = "service"
30 methodName = "method"
3031 traceID = int64(123)
3132 spanID = int64(456)
3233 parentSpanID = int64(0)
3435 duration = 42 * time.Millisecond
3536 )
3637
37 span := zipkin.NewSpan("some-host", name, traceID, spanID, parentSpanID)
38 span := zipkin.NewSpan("1.2.3.4:1234", serviceName, methodName, traceID, spanID, parentSpanID)
3839 span.AnnotateDuration("foo", 42*time.Millisecond)
3940 if err := c.Collect(span); err != nil {
4041 t.Errorf("error during collection: %v", err)
5758 }
5859
5960 gotSpan := server.spans()[0]
60 if want, have := name, gotSpan.GetName(); want != have {
61 if want, have := methodName, gotSpan.GetName(); want != have {
6162 t.Errorf("want %q, have %q", want, have)
6263 }
6364 if want, have := traceID, gotSpan.GetTraceId(); want != have {
00 package zipkin
11
22 import (
3 "errors"
3 "encoding/binary"
4 "net"
5 "strconv"
46 "time"
57
68 "github.com/go-kit/kit/tracing/zipkin/_thrift/gen-go/zipkincore"
911 var (
1012 // SpanContextKey represents the Span in the request context.
1113 SpanContextKey = "Zipkin-Span"
12
13 // ErrSpanNotFound is returned when a Span isn't found in a context.
14 ErrSpanNotFound = errors.New("span not found")
1514 )
1615
1716 // A Span is a named collection of annotations. It represents meaningful
1918 // service. Clients should annotate the span, and submit it when the request
2019 // that generated it is complete.
2120 type Span struct {
22 host string
23 name string
21 host *zipkincore.Endpoint
22 methodName string
23
2424 traceID int64
2525 spanID int64
2626 parentSpanID int64
2727
2828 annotations []annotation
29 //binaryAnnotations []BinaryAnnotation
29 //binaryAnnotations []BinaryAnnotation // TODO
3030 }
3131
3232 // NewSpan returns a new Span object ready for use.
33 func NewSpan(host string, name string, traceID, spanID, parentSpanID int64) *Span {
33 func NewSpan(hostport, serviceName, methodName string, traceID, spanID, parentSpanID int64) *Span {
3434 return &Span{
35 host: host,
36 name: name,
35 host: makeEndpoint(hostport, serviceName),
36 methodName: methodName,
3737 traceID: traceID,
3838 spanID: spanID,
3939 parentSpanID: parentSpanID,
4040 }
4141 }
4242
43 // NewSpanFunc returns a function that generates a new Zipkin span.
44 func NewSpanFunc(host, name string) func(int64, int64, int64) *Span {
43 // makeEndpoint will return a nil Endpoint if the input parameters are
44 // malformed.
45 func makeEndpoint(hostport, serviceName string) *zipkincore.Endpoint {
46 host, port, err := net.SplitHostPort(hostport)
47 if err != nil {
48 Log.Log("hostport", hostport, "err", err)
49 return nil
50 }
51 addrs, err := net.LookupIP(host)
52 if err != nil {
53 Log.Log("host", host, "err", err)
54 return nil
55 }
56 if len(addrs) <= 0 {
57 Log.Log("host", host, "err", "no IPs")
58 return nil
59 }
60 portInt, err := strconv.ParseInt(port, 10, 16)
61 if err != nil {
62 Log.Log("port", port, "err", err)
63 return nil
64 }
65 endpoint := zipkincore.NewEndpoint()
66 binary.LittleEndian.PutUint32(addrs[0], (uint32)(endpoint.Ipv4))
67 endpoint.Port = int16(portInt)
68 endpoint.ServiceName = serviceName
69 return endpoint
70 }
71
72 // MakeNewSpanFunc returns a function that generates a new Zipkin span.
73 func MakeNewSpanFunc(hostport, serviceName, methodName string) NewSpanFunc {
4574 return func(traceID, spanID, parentSpanID int64) *Span {
46 return NewSpan(host, name, traceID, spanID, parentSpanID)
75 return NewSpan(hostport, serviceName, methodName, traceID, spanID, parentSpanID)
4776 }
4877 }
78
79 // NewSpanFunc takes trace, span, & parent span IDs to produce a Span object.
80 type NewSpanFunc func(traceID, spanID, parentSpanID int64) *Span
4981
5082 // TraceID returns the ID of the trace that this span is a member of.
5183 func (s *Span) TraceID() int64 { return s.traceID }
78110 // Thrift stuff into an encoder struct, owned by the ScribeCollector.
79111 zs := zipkincore.Span{
80112 TraceId: s.traceID,
81 Name: s.name,
113 Name: s.methodName,
82114 Id: s.spanID,
83115 BinaryAnnotations: []*zipkincore.BinaryAnnotation{}, // TODO
84 Debug: false, // TODO
116 Debug: true, // TODO
85117 }
86118 if s.parentSpanID != 0 {
119 zs.ParentId = new(int64)
87120 (*zs.ParentId) = s.parentSpanID
88121 }
89122 zs.Annotations = make([]*zipkincore.Annotation, len(s.annotations))
91124 zs.Annotations[i] = &zipkincore.Annotation{
92125 Timestamp: a.timestamp.UnixNano() / 1e3,
93126 Value: a.value,
94 }
95 if a.host != "" {
96 // zs.Annotations[i].Host = TODO
127 Host: a.host,
97128 }
98129 if a.duration > 0 {
99130 zs.Annotations[i].Duration = new(int32)
100 (*zs.Annotations[i].Duration) = int32(a.duration / time.Microsecond)
131 *(zs.Annotations[i].Duration) = int32(a.duration / time.Microsecond)
101132 }
102133 }
103134 return &zs
107138 timestamp time.Time
108139 value string
109140 duration time.Duration // optional
110 host string
141 host *zipkincore.Endpoint
111142 }
66
77 "golang.org/x/net/context"
88
9 "github.com/go-kit/kit/endpoint"
910 "github.com/go-kit/kit/log"
10 "github.com/go-kit/kit/server"
1111 )
12
13 // In Zipkin, "spans are considered to start and stop with the client." The
14 // client is responsible for creating a new span ID for each outgoing request,
15 // copying its span ID to the parent span ID, and maintaining the same trace
16 // ID. The server-receive and server-send annotations can be considered value
17 // added information and aren't strictly necessary.
18 //
19 // Further reading:
20 // • http://www.slideshare.net/johanoskarsson/zipkin-runtime-open-house
21 // • https://groups.google.com/forum/#!topic/zipkin-user/KilwtSA0g1k
22 // • https://gist.github.com/yoavaa/3478d3a0df666f21a98c
1223
1324 // Log is used to report diagnostic information. To enable it, swap in your
1425 // application's logger.
1526 var Log log.SwapLogger
16
17 // http://www.slideshare.net/johanoskarsson/zipkin-runtime-open-house
18 // https://groups.google.com/forum/#!topic/zipkin-user/KilwtSA0g1k
19 // https://gist.github.com/yoavaa/3478d3a0df666f21a98c
2027
2128 const (
2229 // https://github.com/racker/tryfer#headers
2431 spanIDHTTPHeader = "X-B3-SpanId"
2532 parentSpanIDHTTPHeader = "X-B3-ParentSpanId"
2633
27 clientSend = "cs"
28 serverReceive = "sr"
29 serverSend = "ss"
30 clientReceive = "cr"
34 // ClientSend is the annotation value used to mark a client sending a
35 // request to a server.
36 ClientSend = "cs"
37
38 // ServerReceive is the annotation value used to mark a server's receipt
39 // of a request from a client.
40 ServerReceive = "sr"
41
42 // ServerSend is the annotation value used to mark a server's completion
43 // of a request and response to a client.
44 ServerSend = "ss"
45
46 // ClientReceive is the annotation value used to mark a client's receipt
47 // of a completed request from a server.
48 ClientReceive = "cr"
3149 )
3250
33 // AnnotateEndpoint extracts a span from the context, adds server-receive and
34 // server-send annotations at the boundaries, and submits the span to the
35 // collector. If no span is present, a new span is generated and put in the
36 // context.
37 func AnnotateEndpoint(f func(int64, int64, int64) *Span, c Collector) func(server.Endpoint) server.Endpoint {
38 return func(e server.Endpoint) server.Endpoint {
39 return func(ctx context.Context, req server.Request) (server.Response, error) {
40 span, ctx := mustGetServerSpan(ctx, f)
41 span.Annotate(serverReceive)
42 defer func() { span.Annotate(serverSend); c.Collect(span) }()
43 return e(ctx, req)
51 // AnnotateServer returns a server.Middleware that extracts a span from the
52 // context, adds server-receive and server-send annotations at the boundaries,
53 // and submits the span to the collector. If no span is found in the context,
54 // a new span is generated and inserted.
55 func AnnotateServer(newSpan NewSpanFunc, c Collector) endpoint.Middleware {
56 return func(e endpoint.Endpoint) endpoint.Endpoint {
57 return func(ctx context.Context, request interface{}) (interface{}, error) {
58 span, ok := fromContext(ctx)
59 if !ok {
60 span = newSpan(newID(), newID(), 0)
61 ctx = context.WithValue(ctx, SpanContextKey, span)
62 }
63 span.Annotate(ServerReceive)
64 defer func() { span.Annotate(ServerSend); c.Collect(span) }()
65 return e(ctx, request)
4466 }
4567 }
4668 }
4769
48 // FromHTTP is a helper method that allows NewSpanFunc's factory function to
49 // be easily invoked by passing an HTTP request. The span name is the HTTP
50 // method. The trace, span, and parent span IDs are taken from the request
51 // headers.
52 func FromHTTP(f func(int64, int64, int64) *Span) func(*http.Request) *Span {
53 return func(r *http.Request) *Span {
54 return f(
55 getID(r.Header, traceIDHTTPHeader),
56 getID(r.Header, spanIDHTTPHeader),
57 getID(r.Header, parentSpanIDHTTPHeader),
58 )
70 // AnnotateClient returns a middleware that extracts a parent span from the
71 // context, produces a client (child) span from it, adds client-send and
72 // client-receive annotations at the boundaries, and submits the span to the
73 // collector. If no span is found in the context, a new span is generated and
74 // inserted.
75 func AnnotateClient(newSpan NewSpanFunc, c Collector) endpoint.Middleware {
76 return func(e endpoint.Endpoint) endpoint.Endpoint {
77 return func(ctx context.Context, request interface{}) (interface{}, error) {
78 var clientSpan *Span
79 parentSpan, ok := fromContext(ctx)
80 if ok {
81 clientSpan = newSpan(parentSpan.TraceID(), newID(), parentSpan.SpanID())
82 } else {
83 clientSpan = newSpan(newID(), newID(), 0)
84 }
85 ctx = context.WithValue(ctx, SpanContextKey, clientSpan) // set
86 defer func() { ctx = context.WithValue(ctx, SpanContextKey, parentSpan) }() // reset
87 clientSpan.Annotate(ClientSend)
88 defer func() { clientSpan.Annotate(ClientReceive); c.Collect(clientSpan) }()
89 return e(ctx, request)
90 }
5991 }
6092 }
6193
62 // ToContext returns a function that satisfies transport/http.BeforeFunc. When
63 // invoked, it generates a Zipkin span from the incoming HTTP request, and
64 // saves it in the request context under the SpanContextKey.
65 func ToContext(f func(*http.Request) *Span) func(context.Context, *http.Request) context.Context {
94 // ToContext returns a function that satisfies transport/http.BeforeFunc. It
95 // takes a Zipkin span from the incoming HTTP request, and saves it in the
96 // request context. It's designed to be wired into a server's HTTP transport
97 // Before stack.
98 func ToContext(newSpan NewSpanFunc) func(ctx context.Context, r *http.Request) context.Context {
6699 return func(ctx context.Context, r *http.Request) context.Context {
67 return context.WithValue(ctx, SpanContextKey, f(r))
100 return context.WithValue(ctx, SpanContextKey, fromHTTP(newSpan, r))
68101 }
69102 }
70103
71 // NewChildSpan creates a new child (client) span. If a span is present in the
72 // context, it will be interpreted as the parent.
73 func NewChildSpan(ctx context.Context, f func(int64, int64, int64) *Span) *Span {
74 val := ctx.Value(SpanContextKey)
75 if val == nil {
76 return f(newID(), newID(), 0)
77 }
78 parentSpan, ok := val.(*Span)
79 if !ok {
80 panic(SpanContextKey + " value isn't a span object")
81 }
82 var (
83 traceID = parentSpan.TraceID()
84 spanID = newID()
85 parentSpanID = parentSpan.SpanID()
86 )
87 return f(traceID, spanID, parentSpanID)
88 }
89
90 // SetRequestHeaders sets up HTTP headers for a new outbound request based on
91 // the (client) span. All IDs are encoded as hex strings.
92 func SetRequestHeaders(h http.Header, s *Span) {
93 if id := s.TraceID(); id > 0 {
94 h.Set(traceIDHTTPHeader, strconv.FormatInt(id, 16))
95 }
96 if id := s.SpanID(); id > 0 {
97 h.Set(spanIDHTTPHeader, strconv.FormatInt(id, 16))
98 }
99 if id := s.ParentSpanID(); id > 0 {
100 h.Set(parentSpanIDHTTPHeader, strconv.FormatInt(id, 16))
104 // ToRequest returns a function that satisfies transport/http.BeforeFunc. It
105 // takes a Zipkin span from the context, and injects it into the HTTP request.
106 // It's designed to be wired into a client's HTTP transport Before stack. It's
107 // expected that AnnotateClient has already ensured the span in the context is
108 // a child/client span.
109 func ToRequest(newSpan NewSpanFunc) func(ctx context.Context, r *http.Request) context.Context {
110 return func(ctx context.Context, r *http.Request) context.Context {
111 span, ok := fromContext(ctx)
112 if !ok {
113 span = newSpan(newID(), newID(), 0)
114 }
115 if id := span.TraceID(); id > 0 {
116 r.Header.Set(traceIDHTTPHeader, strconv.FormatInt(id, 16))
117 }
118 if id := span.SpanID(); id > 0 {
119 r.Header.Set(spanIDHTTPHeader, strconv.FormatInt(id, 16))
120 }
121 if id := span.ParentSpanID(); id > 0 {
122 r.Header.Set(parentSpanIDHTTPHeader, strconv.FormatInt(id, 16))
123 }
124 return ctx
101125 }
102126 }
103127
104 func mustGetServerSpan(ctx context.Context, f func(int64, int64, int64) *Span) (*Span, context.Context) {
128 func fromHTTP(newSpan NewSpanFunc, r *http.Request) *Span {
129 traceIDStr := r.Header.Get(traceIDHTTPHeader)
130 if traceIDStr == "" {
131 Log.Log("debug", "make new span")
132 return newSpan(newID(), newID(), 0) // normal; just make a new one
133 }
134 traceID, err := strconv.ParseInt(traceIDStr, 16, 64)
135 if err != nil {
136 Log.Log(traceIDHTTPHeader, traceIDStr, "err", err)
137 return newSpan(newID(), newID(), 0)
138 }
139 spanIDStr := r.Header.Get(spanIDHTTPHeader)
140 if spanIDStr == "" {
141 Log.Log("msg", "trace ID without span ID") // abnormal
142 spanIDStr = strconv.FormatInt(newID(), 64) // deal with it
143 }
144 spanID, err := strconv.ParseInt(spanIDStr, 16, 64)
145 if err != nil {
146 Log.Log(spanIDHTTPHeader, spanIDStr, "err", err) // abnormal
147 spanID = newID() // deal with it
148 }
149 parentSpanIDStr := r.Header.Get(parentSpanIDHTTPHeader)
150 if parentSpanIDStr == "" {
151 parentSpanIDStr = "0" // normal
152 }
153 parentSpanID, err := strconv.ParseInt(parentSpanIDStr, 16, 64)
154 if err != nil {
155 Log.Log(parentSpanIDHTTPHeader, parentSpanIDStr, "err", err) // abnormal
156 parentSpanID = 0 // the only way to deal with it
157 }
158 return newSpan(traceID, spanID, parentSpanID)
159 }
160
161 func fromContext(ctx context.Context) (*Span, bool) {
105162 val := ctx.Value(SpanContextKey)
106163 if val == nil {
107 span := f(newID(), newID(), 0)
108 return span, context.WithValue(ctx, SpanContextKey, span)
164 return nil, false
109165 }
110166 span, ok := val.(*Span)
111167 if !ok {
112168 panic(SpanContextKey + " value isn't a span object")
113169 }
114 return span, ctx
115 }
116
117 func getID(h http.Header, key string) int64 {
118 val := h.Get(key)
119 if val == "" {
120 return 0
121 }
122 i, err := strconv.ParseInt(val, 16, 64)
123 if err != nil {
124 panic("invalid Zipkin ID in HTTP header: " + val)
125 }
126 return i
170 return span, true
127171 }
128172
129173 func newID() int64 {
00 package zipkin_test
11
22 import (
3 "math/rand"
3 "fmt"
44 "net/http"
5 "reflect"
6 "runtime"
57 "strconv"
6 "sync/atomic"
8 "strings"
79 "testing"
810
911 "golang.org/x/net/context"
1012
11 "github.com/go-kit/kit/server"
13 "github.com/go-kit/kit/endpoint"
1214 "github.com/go-kit/kit/tracing/zipkin"
1315 )
1416
15 func TestAnnotateEndpoint(t *testing.T) {
17 func TestToContext(t *testing.T) {
1618 const (
17 host = "some-host"
18 name = "some-name"
19 )
20
21 f := zipkin.NewSpanFunc(host, name)
22 c := &countingCollector{}
23
24 var e server.Endpoint
25 e = func(context.Context, server.Request) (server.Response, error) { return struct{}{}, nil }
26 e = zipkin.AnnotateEndpoint(f, c)(e)
27
28 if want, have := int32(0), int32(c.int32); want != have {
29 t.Errorf("want %d, have %d", want, have)
30 }
31 if _, err := e(context.Background(), struct{}{}); err != nil {
32 t.Fatal(err)
33 }
34 if want, have := int32(1), int32(c.int32); want != have {
35 t.Errorf("want %d, have %d", want, have)
36 }
37 }
38
39 func TestFromHTTPToContext(t *testing.T) {
40 const (
41 host = "foo-host"
42 name = "foo-name"
19 hostport = "5.5.5.5:5555"
20 serviceName = "foo-service"
21 methodName = "foo-method"
4322 traceID int64 = 12
4423 spanID int64 = 34
4524 parentSpanID int64 = 56
5029 r.Header.Set("X-B3-SpanId", strconv.FormatInt(spanID, 16))
5130 r.Header.Set("X-B3-ParentSpanId", strconv.FormatInt(parentSpanID, 16))
5231
53 sf := zipkin.NewSpanFunc(host, name)
54 hf := zipkin.FromHTTP(sf)
55 cf := zipkin.ToContext(hf)
32 newSpan := zipkin.MakeNewSpanFunc(hostport, serviceName, methodName)
33 toContext := zipkin.ToContext(newSpan)
5634
57 ctx := cf(context.Background(), r)
35 ctx := toContext(context.Background(), r)
5836 val := ctx.Value(zipkin.SpanContextKey)
5937 if val == nil {
6038 t.Fatalf("%s returned no value", zipkin.SpanContextKey)
6442 t.Fatalf("%s was not a Span object", zipkin.SpanContextKey)
6543 }
6644
67 if want, have := traceID, span.TraceID(); want != have {
68 t.Errorf("want %d, have %d", want, have)
69 }
70
71 if want, have := spanID, span.SpanID(); want != have {
72 t.Errorf("want %d, have %d", want, have)
73 }
74
75 if want, have := parentSpanID, span.ParentSpanID(); want != have {
76 t.Errorf("want %d, have %d", want, have)
77 }
78 }
79
80 func TestNewChildSpan(t *testing.T) {
81 rand.Seed(123)
82
83 const (
84 host = "my-host"
85 name = "my-name"
86 traceID int64 = 123
87 spanID int64 = 456
88 parentSpanID int64 = 789
89 )
90
91 f := zipkin.NewSpanFunc(host, name)
92 ctx := context.WithValue(context.Background(), zipkin.SpanContextKey, f(traceID, spanID, parentSpanID))
93 childSpan := zipkin.NewChildSpan(ctx, f)
94
95 if want, have := traceID, childSpan.TraceID(); want != have {
96 t.Errorf("want %d, have %d", want, have)
97 }
98 if have := childSpan.SpanID(); have == spanID {
99 t.Errorf("span ID should be random, but we have %d", have)
100 }
101 if want, have := spanID, childSpan.ParentSpanID(); want != have {
102 t.Errorf("want %d, have %d", want, have)
103 }
104 }
105
106 func TestSetRequestHeaders(t *testing.T) {
107 const (
108 host = "bar-host"
109 name = "bar-name"
110 traceID int64 = 123
111 spanID int64 = 456
112 parentSpanID int64 = 789
113 )
114
115 r, _ := http.NewRequest("POST", "http://destroy.horse", nil)
116 zipkin.SetRequestHeaders(r.Header, zipkin.NewSpan(host, name, traceID, spanID, parentSpanID))
117
118 for h, want := range map[string]string{
119 "X-B3-TraceId": strconv.FormatInt(traceID, 16),
120 "X-B3-SpanId": strconv.FormatInt(spanID, 16),
121 "X-B3-ParentSpanId": strconv.FormatInt(parentSpanID, 16),
45 for want, haveFunc := range map[int64]func() int64{
46 traceID: span.TraceID,
47 spanID: span.SpanID,
48 parentSpanID: span.ParentSpanID,
12249 } {
123 if have := r.Header.Get(h); want != have {
124 t.Errorf("%s: want %s, have %s", h, want, have)
50 if have := haveFunc(); want != have {
51 name := runtime.FuncForPC(reflect.ValueOf(haveFunc).Pointer()).Name()
52 name = strings.Split(name, "·")[0]
53 toks := strings.Split(name, ".")
54 name = toks[len(toks)-1]
55 t.Errorf("%s: want %d, have %d", name, want, have)
12556 }
12657 }
12758 }
12859
129 type countingCollector struct{ int32 }
60 func TestToRequest(t *testing.T) {
61 const (
62 hostport = "5.5.5.5:5555"
63 serviceName = "foo-service"
64 methodName = "foo-method"
65 traceID int64 = 20
66 spanID int64 = 40
67 parentSpanID int64 = 90
68 )
13069
131 func (c *countingCollector) Collect(*zipkin.Span) error { atomic.AddInt32(&(c.int32), 1); return nil }
70 newSpan := zipkin.MakeNewSpanFunc(hostport, serviceName, methodName)
71 span := newSpan(traceID, spanID, parentSpanID)
72 ctx := context.WithValue(context.Background(), zipkin.SpanContextKey, span)
73 r, _ := http.NewRequest("GET", "https://best.horse", nil)
74 ctx = zipkin.ToRequest(newSpan)(ctx, r)
75
76 for header, wantInt := range map[string]int64{
77 "X-B3-TraceId": traceID,
78 "X-B3-SpanId": spanID,
79 "X-B3-ParentSpanId": parentSpanID,
80 } {
81 if want, have := strconv.FormatInt(wantInt, 16), r.Header.Get(header); want != have {
82 t.Errorf("%s: want %q, have %q", header, want, have)
83 }
84 }
85 }
86
87 func TestAnnotateServer(t *testing.T) {
88 if err := testAnnotate(zipkin.AnnotateServer, zipkin.ServerReceive, zipkin.ServerSend); err != nil {
89 t.Fatal(err)
90 }
91 }
92
93 func TestAnnotateClient(t *testing.T) {
94 if err := testAnnotate(zipkin.AnnotateClient, zipkin.ClientSend, zipkin.ClientReceive); err != nil {
95 t.Fatal(err)
96 }
97 }
98
99 func testAnnotate(
100 annotate func(newSpan zipkin.NewSpanFunc, c zipkin.Collector) endpoint.Middleware,
101 wantAnnotations ...string,
102 ) error {
103 const (
104 hostport = "1.2.3.4:1234"
105 serviceName = "some-service"
106 methodName = "some-method"
107 )
108
109 newSpan := zipkin.MakeNewSpanFunc(hostport, serviceName, methodName)
110 collector := &countingCollector{}
111
112 var e endpoint.Endpoint
113 e = func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil }
114 e = annotate(newSpan, collector)(e)
115
116 if want, have := 0, len(collector.annotations); want != have {
117 return fmt.Errorf("pre-invocation: want %d, have %d", want, have)
118 }
119 if _, err := e(context.Background(), struct{}{}); err != nil {
120 return fmt.Errorf("during invocation: %v", err)
121 }
122 if want, have := wantAnnotations, collector.annotations; !reflect.DeepEqual(want, have) {
123 return fmt.Errorf("after invocation: want %v, have %v", want, have)
124 }
125
126 return nil
127 }
128
129 type countingCollector struct{ annotations []string }
130
131 func (c *countingCollector) Collect(s *zipkin.Span) error {
132 for _, annotation := range s.Encode().GetAnnotations() {
133 c.annotations = append(c.annotations, annotation.GetValue())
134 }
135 return nil
136 }
0 # package transport
1
2 `package transport` defines interfaces for service transports and codecs.
3 It also provides implementations for transports and codecs that aren't already well-defined by other packages.
4 The most common use case for `package transport` is probably to bind a gokit [server.Endpoint][] with a stdlib [http.Handler][], via gokit's [http.Binding][].
5 Refer to the [addsvc][] example service to see how to make e.g. [Thrift][] or [gRPC][] transport bindings.
6
7 [server.Endpoint]: https://godoc.org/github.com/go-kit/kit/server/#Endpoint
8 [http.Handler]: https://golang.org/pkg/net/http/#Handler
9 [http.Binding]: https://godoc.org/github.com/go-kit/kit/transport/http/#Binding
10 [addsvc]: https://github.com/go-kit/kit/blob/319c1c7129a146b541bbbaf18e2502bf32c603c5/addsvc/main.go
11 [Thrift]: https://github.com/go-kit/kit/blob/319c1c7129a146b541bbbaf18e2502bf32c603c5/addsvc/main.go#L142-192
12 [gRPC]: https://github.com/go-kit/kit/blob/319c1c7129a146b541bbbaf18e2502bf32c603c5/addsvc/main.go#L102-119
13
14 ## Rationale
15
16 TODO
17
18 ## Usage
19
20 Bind a gokit [server.Endpoint][] with a stdlib [http.Handler][].
21
22 ```go
23 import (
24 "net/http"
25 "reflect"
26
27 "golang.org/x/net/context"
28
29 jsoncodec "github.com/go-kit/kit/transport/codec/json"
30 httptransport "github.com/go-kit/kit/transport/http"
31 )
32
33 type request struct{}
34
35 func main() {
36 var (
37 ctx = context.Background()
38 requestType = reflect.TypeOf(request{})
39 codec = jsoncodec.New()
40 e = makeEndpoint()
41 before = []httptransport.BeforeFunc{}
42 after = []httptransport.AfterFunc{}
43 )
44 handler := httptransport.NewBinding(ctx, requestType, codec, e, before, after)
45 mux := http.NewServeMux()
46 mux.Handle("/path", handler)
47 http.ListenAndServe(":8080", mux)
48 }
49 ```
33 "io"
44
55 "golang.org/x/net/context"
6
7 "github.com/go-kit/kit/server"
86 )
97
10 // Codec defines how to decode and encode requests and responses. Decode takes
11 // and returns a context because the request may be accompanied by information
8 // Codec decodes and encodes requests and responses. Decode takes and returns
9 // a context because the request or response may be accompanied by information
1210 // that needs to be applied there.
1311 type Codec interface {
14 Decode(context.Context, io.Reader, server.Request) (context.Context, error)
15 Encode(io.Writer, server.Response) error
12 Decode(context.Context, io.Reader, interface{}) (context.Context, error)
13 Encode(io.Writer, interface{}) error
1614 }
55
66 "golang.org/x/net/context"
77
8 "github.com/go-kit/kit/server"
98 "github.com/go-kit/kit/transport/codec"
109 )
1110
1514 // properly-tagged fields.
1615 func New() codec.Codec { return jsonCodec{} }
1716
18 func (jsonCodec) Decode(ctx context.Context, r io.Reader, req server.Request) (context.Context, error) {
19 return ctx, json.NewDecoder(r).Decode(req)
17 func (jsonCodec) Decode(ctx context.Context, r io.Reader, v interface{}) (context.Context, error) {
18 return ctx, json.NewDecoder(r).Decode(v)
2019 }
2120
22 func (jsonCodec) Encode(w io.Writer, resp server.Response) error {
23 return json.NewEncoder(w).Encode(resp)
21 func (jsonCodec) Encode(w io.Writer, v interface{}) error {
22 return json.NewEncoder(w).Encode(v)
2423 }
0 package http_test
1
2 import (
3 "net/http/httptest"
4 "testing"
5
6 "golang.org/x/net/context"
7
8 httptransport "github.com/go-kit/kit/transport/http"
9 )
10
11 func TestSetContentType(t *testing.T) {
12 contentType := "application/whatever"
13 rec := httptest.NewRecorder()
14 httptransport.SetContentType(contentType)(context.Background(), rec)
15 if want, have := contentType, rec.Header().Get("Content-Type"); want != have {
16 t.Errorf("want %q, have %q", want, have)
17 }
18 }
11
22 import (
33 "net/http"
4 "reflect"
54
65 "golang.org/x/net/context"
76
8 "github.com/go-kit/kit/server"
7 "github.com/go-kit/kit/endpoint"
98 "github.com/go-kit/kit/transport/codec"
109 )
1110
12 // BindingOption sets a parameter for the binding.
13 type BindingOption func(*binding)
14
15 // Before adds pre-RPC BeforeFuncs to the binding.
16 func Before(funcs ...BeforeFunc) BindingOption {
17 return func(b *binding) { b.before = append(b.before, funcs...) }
18 }
19
20 // After adds post-RPC AfterFuncs to the binding.
21 func After(funcs ...AfterFunc) BindingOption {
22 return func(b *binding) { b.after = append(b.after, funcs...) }
23 }
24
2511 type binding struct {
2612 context.Context
27 requestType reflect.Type
13 makeRequest func() interface{}
2814 codec.Codec
29 server.Endpoint
15 endpoint.Endpoint
3016 before []BeforeFunc
3117 after []AfterFunc
3218 }
3319
3420 // NewBinding returns an HTTP handler that wraps the given endpoint.
35 func NewBinding(ctx context.Context, requestType reflect.Type, cdc codec.Codec, endpoint server.Endpoint, options ...BindingOption) http.Handler {
21 func NewBinding(ctx context.Context, makeRequest func() interface{}, cdc codec.Codec, e endpoint.Endpoint, options ...BindingOption) http.Handler {
3622 b := &binding{
3723 Context: ctx,
38 requestType: requestType,
24 makeRequest: makeRequest,
3925 Codec: cdc,
40 Endpoint: endpoint,
26 Endpoint: e,
4127 }
4228 for _, option := range options {
4329 option(b)
5642 }
5743
5844 // Decode request.
59 req := reflect.New(b.requestType).Interface()
45 req := b.makeRequest()
6046 ctx, err := b.Codec.Decode(ctx, r.Body, req)
6147 if err != nil {
6248 http.Error(w, err.Error(), http.StatusBadRequest)
8167 return
8268 }
8369 }
70
71 // BindingOption sets a parameter for the HTTP binding.
72 type BindingOption func(*binding)
73
74 // BindingBefore adds pre-RPC BeforeFuncs to the HTTP binding.
75 func BindingBefore(funcs ...BeforeFunc) BindingOption {
76 return func(b *binding) { b.before = append(b.before, funcs...) }
77 }
78
79 // BindingAfter adds post-RPC AfterFuncs to the HTTP binding.
80 func BindingAfter(funcs ...AfterFunc) BindingOption {
81 return func(b *binding) { b.after = append(b.after, funcs...) }
82 }
1010
1111 "golang.org/x/net/context"
1212
13 "github.com/go-kit/kit/server"
1413 jsoncodec "github.com/go-kit/kit/transport/codec/json"
1514 httptransport "github.com/go-kit/kit/transport/http"
1615 )
2827 return 3 * i // doesn't matter, just do something
2928 }
3029
31 endpoint := func(_ context.Context, req server.Request) (server.Response, error) {
32 r, ok := req.(*myRequest)
30 endpoint := func(_ context.Context, request interface{}) (interface{}, error) {
31 r, ok := request.(*myRequest)
3332 if !ok {
34 return nil, fmt.Errorf("not myRequest (%s)", reflect.TypeOf(req))
33 return nil, fmt.Errorf("not myRequest (%s)", reflect.TypeOf(request))
3534 }
3635 return myResponse{transform(r.In)}, nil
3736 }
3837
3938 ctx := context.Background()
40 requestType := reflect.TypeOf(myRequest{})
39 makeRequest := func() interface{} { return &myRequest{} }
4140 codec := jsoncodec.New()
42 binding := httptransport.NewBinding(ctx, requestType, codec, endpoint)
41 binding := httptransport.NewBinding(ctx, makeRequest, codec, endpoint)
4342 server := httptest.NewServer(binding)
4443 defer server.Close()
4544
5655 defer resp.Body.Close()
5756
5857 var r myResponse
59 if err := json.NewDecoder(resp.Body).Decode(&r); err != nil {
58 if _, err := codec.Decode(ctx, resp.Body, &r); err != nil {
6059 t.Fatal(err)
6160 }
6261
0 package http
1
2 import (
3 "bytes"
4 "net/http"
5 "net/url"
6
7 "golang.org/x/net/context"
8
9 "github.com/go-kit/kit/endpoint"
10 "github.com/go-kit/kit/transport/codec"
11 )
12
13 type httpClient struct {
14 *url.URL
15 codec.Codec
16 method string
17 *http.Client
18 before []BeforeFunc
19 makeResponse func() interface{}
20 }
21
22 // NewClient returns a client endpoint for a remote service. addr must be a
23 // valid, parseable URL, including the scheme and path.
24 func NewClient(addr string, cdc codec.Codec, makeResponse func() interface{}, options ...ClientOption) endpoint.Endpoint {
25 u, err := url.Parse(addr)
26 if err != nil {
27 panic(err)
28 }
29 c := httpClient{
30 URL: u,
31 Codec: cdc,
32 method: "GET",
33 Client: http.DefaultClient,
34 makeResponse: makeResponse,
35 }
36 for _, option := range options {
37 option(&c)
38 }
39 return c.endpoint
40 }
41
42 func (c httpClient) endpoint(ctx context.Context, request interface{}) (interface{}, error) {
43 var buf bytes.Buffer
44 if err := c.Codec.Encode(&buf, request); err != nil {
45 return nil, err
46 }
47
48 req, err := http.NewRequest(c.method, c.URL.String(), &buf)
49 if err != nil {
50 return nil, err
51 }
52
53 for _, f := range c.before {
54 f(ctx, req)
55 }
56
57 resp, err := c.Client.Do(req)
58 if err != nil {
59 return nil, err
60 }
61 defer resp.Body.Close()
62
63 response := c.makeResponse()
64 ctx, err = c.Codec.Decode(ctx, resp.Body, response)
65 if err != nil {
66 return nil, err
67 }
68
69 return response, nil
70 }
71
72 // ClientOption sets a parameter for the HTTP client.
73 type ClientOption func(*httpClient)
74
75 // ClientBefore adds pre-invocation BeforeFuncs to the HTTP client.
76 func ClientBefore(funcs ...BeforeFunc) ClientOption {
77 return func(c *httpClient) { c.before = append(c.before, funcs...) }
78 }
79
80 // ClientMethod sets the method used to invoke the RPC. By default, it's GET.
81 func ClientMethod(method string) ClientOption {
82 return func(c *httpClient) { c.method = method }
83 }
84
85 // SetClient sets the HTTP client struct used to invoke the RPC. By default,
86 // it's http.DefaultClient.
87 func SetClient(client *http.Client) ClientOption {
88 return func(c *httpClient) { c.Client = client }
89 }
0 package http_test
1
2 import (
3 "net/http"
4 "net/http/httptest"
5 "reflect"
6 "testing"
7
8 "golang.org/x/net/context"
9
10 jsoncodec "github.com/go-kit/kit/transport/codec/json"
11 httptransport "github.com/go-kit/kit/transport/http"
12 )
13
14 func TestClient(t *testing.T) {
15 type myResponse struct {
16 V int `json:"v"`
17 }
18 const v = 123
19 codec := jsoncodec.New()
20 server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
21 codec.Encode(w, myResponse{v})
22 }))
23 defer server.Close()
24 makeResponse := func() interface{} { return &myResponse{} }
25 client := httptransport.NewClient(server.URL, codec, makeResponse)
26 resp, err := client(context.Background(), struct{}{})
27 if err != nil {
28 t.Fatal(err)
29 }
30 response, ok := resp.(*myResponse)
31 if !ok {
32 t.Fatalf("not myResponse (%s)", reflect.TypeOf(response))
33 }
34 if want, have := v, response.V; want != have {
35 t.Errorf("want %d, have %d", want, have)
36 }
37 }