Codebase list golang-github-go-kit-kit / 8b1f69c
Improvements surrounding HTTP and Zipkin - HTTP binding moved to package transport - Remove concept of Gate; not useful at the moment - Zipkin will create Span and Trace IDs, if they don't exist - Method to set Zipkin fields in HTTP header - Break out HTTP Before and After funcs Peter Bourgon 9 years ago
10 changed file(s) with 271 addition(s) and 143 deletion(s). Raw diff Collapse all Expand all
0 package main
1
2 import (
3 "encoding/json"
4 "io"
5 "net/http"
6 "time"
7
8 "golang.org/x/net/context"
9
10 "github.com/peterbourgon/gokit/metrics"
11 "github.com/peterbourgon/gokit/server"
12 )
13
14 // jsonCodec implements transport/codec, decoding and encoding requests and
15 // responses respectively as JSON. It requires that the concrete request and
16 // response types support JSON de/serialization.
17 //
18 // This type is mostly boiler-plate; in theory, it could be generated.
19 type jsonCodec struct{}
20
21 func (jsonCodec) Decode(ctx context.Context, r io.Reader) (server.Request, context.Context, error) {
22 var req request
23 err := json.NewDecoder(r).Decode(&req)
24 return &req, ctx, err
25 }
26
27 func (jsonCodec) Encode(w io.Writer, resp server.Response) error {
28 return json.NewEncoder(w).Encode(resp)
29 }
30
31 // The HTTP binding exists in the HTTP transport package, because it uses the
32 // codec to deserialize and serialize requests and responses, and therefore
33 // doesn't need to have access to the concrete request and response types.
34
35 func httpInstrument(requests metrics.Counter, duration metrics.Histogram) func(http.Handler) http.Handler {
36 return func(next http.Handler) http.Handler {
37 return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
38 requests.Add(1)
39 defer func(begin time.Time) { duration.Observe(time.Since(begin).Nanoseconds()) }(time.Now())
40 next.ServeHTTP(w, r)
41 })
42 }
43 }
+0
-80
addsvc/httpjson_binding.go less more
0 package main
1
2 import (
3 "encoding/json"
4 "io"
5 "net/http"
6 "time"
7
8 "golang.org/x/net/context"
9
10 "github.com/peterbourgon/gokit/metrics"
11 "github.com/peterbourgon/gokit/server"
12 "github.com/peterbourgon/gokit/server/zipkin"
13 "github.com/peterbourgon/gokit/transport/codec"
14 )
15
16 // jsonCodec decodes and encodes requests and responses respectively as JSON.
17 // It requires that the (package main) request and response structs support
18 // JSON de/serialization.
19 //
20 // This type is mostly boiler-plate; in theory, it could be generated.
21 type jsonCodec struct{}
22
23 func (jsonCodec) Decode(ctx context.Context, r io.Reader) (server.Request, context.Context, error) {
24 var req request
25 err := json.NewDecoder(r).Decode(&req)
26 return &req, ctx, err
27 }
28
29 func (jsonCodec) Encode(w io.Writer, resp server.Response) error {
30 return json.NewEncoder(w).Encode(resp)
31 }
32
33 // A binding wraps an Endpoint so that it's usable by a transport. httpBinding
34 // makes an Endpoint usable over HTTP. It combines a parent context, a codec,
35 // and an endpoint to expose. It implements http.Handler by decoding a request
36 // from the HTTP request body, and encoding a response to the response writer.
37 type httpBinding struct {
38 context.Context // parent context
39 codec.Codec // how to decode requests and encode responses
40 contentType string // what we report as the response ContentType
41 server.Endpoint // the endpoint being bound
42 }
43
44 func (b httpBinding) ServeHTTP(w http.ResponseWriter, r *http.Request) {
45 // Perform HTTP-specific context amendments.
46 b.Context = zipkin.GetHeaders(b.Context, r.Header)
47
48 // Decode request.
49 req, ctx, err := b.Codec.Decode(b.Context, r.Body)
50 if err != nil {
51 http.Error(w, err.Error(), http.StatusBadRequest)
52 return
53 }
54 b.Context = ctx
55
56 // Execute RPC.
57 resp, err := b.Endpoint(b.Context, req)
58 if err != nil {
59 http.Error(w, err.Error(), http.StatusInternalServerError)
60 return
61 }
62
63 // Encode response.
64 w.Header().Set("Content-Type", b.contentType)
65 if err := b.Codec.Encode(w, resp); err != nil {
66 http.Error(w, err.Error(), http.StatusInternalServerError)
67 return
68 }
69 }
70
71 func httpInstrument(requests metrics.Counter, duration metrics.Histogram) func(http.Handler) http.Handler {
72 return func(next http.Handler) http.Handler {
73 return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
74 requests.Add(1)
75 defer func(begin time.Time) { duration.Observe(time.Since(begin).Nanoseconds()) }(time.Now())
76 next.ServeHTTP(w, r)
77 })
78 }
79 }
1111 "syscall"
1212 "time"
1313
14 "github.com/justinas/alice"
1514 "github.com/streadway/handy/cors"
1615 "github.com/streadway/handy/encoding"
1716 "golang.org/x/net/context"
2322 "github.com/peterbourgon/gokit/metrics/statsd"
2423 "github.com/peterbourgon/gokit/server"
2524 "github.com/peterbourgon/gokit/server/zipkin"
25 kithttp "github.com/peterbourgon/gokit/transport/http"
2626 )
2727
2828 func main() {
2929 var (
30 httpJSONAddr = flag.String("http.json.addr", ":8001", "Address for HTTP/JSON server")
31 grpcTCPAddr = flag.String("grpc.tcp.addr", ":8002", "Address for gRPC (TCP) server")
30 httpAddr = flag.String("http.addr", ":8001", "Address for HTTP (JSON) server")
31 grpcAddr = flag.String("grpc.addr", ":8002", "Address for gRPC server")
3232 )
3333 flag.Parse()
3434
4040 // `package server` domain
4141 var e server.Endpoint
4242 e = makeEndpoint(a)
43 e = server.Gate(zipkin.RequireInContext)(e) // must have Zipkin headers
4443 // e = server.ChainableEnhancement(arg1, arg2, e)
4544
4645 // `package metrics` domain
6362
6463 // Transport: gRPC
6564 go func() {
66 ln, err := net.Listen("tcp", *grpcTCPAddr)
65 ln, err := net.Listen("tcp", *grpcAddr)
6766 if err != nil {
6867 errc <- err
6968 return
7473 var addServer pb.AddServer
7574 addServer = grpcBinding{e}
7675 addServer = grpcInstrument(requests.With(field), duration.With(field))(addServer)
77 // Note that this will always fail, because the Endpoint is gated on
78 // Zipkin headers, and we don't extract them from the gRPC request.
7976
8077 pb.RegisterAddServer(s, addServer)
81 log.Printf("gRPC server on TCP %s", *grpcTCPAddr)
78 log.Printf("gRPC server on %s", *grpcAddr)
8279 errc <- s.Serve(ln)
8380 }()
8481
85 // Transport: HTTP/JSON
82 // Transport: HTTP (JSON)
8683 go func() {
8784 ctx, cancel := context.WithCancel(root)
8885 defer cancel()
86
87 field := metrics.Field{Key: "transport", Value: "http"}
88 before := kithttp.Before(zipkin.GetFromHTTP)
89 after := kithttp.After(kithttp.SetContentType("application/json"))
90
91 var handler http.Handler
92 handler = kithttp.NewBinding(ctx, jsonCodec{}, e, before, after)
93 handler = encoding.Gzip(handler)
94 handler = httpInstrument(requests.With(field), duration.With(field))(handler)
95 handler = cors.Middleware(cors.Config{})(handler)
96
8997 mux := http.NewServeMux()
90 field := metrics.Field{Key: "transport", Value: "http"}
91
92 handler := alice.New(
93 httpInstrument(requests.With(field), duration.With(field)),
94 encoding.Gzip,
95 cors.Middleware(cors.Config{}),
96 ).Then(httpBinding{ctx, jsonCodec{}, "application/json", e})
97
9898 mux.Handle("/add", handler)
99 log.Printf("HTTP/JSON server on %s", *httpJSONAddr)
100 errc <- http.ListenAndServe(*httpJSONAddr, mux)
99 log.Printf("HTTP server on %s", *httpAddr)
100 errc <- http.ListenAndServe(*httpAddr, mux)
101101 }()
102102
103103 log.Fatal(<-errc)
88 }
99
1010 type timeHistogram struct {
11 unit time.Duration
1112 Histogram
12 unit time.Duration
1313 }
1414
1515 // NewTimeHistogram returns a TimeHistogram wrapper around the passed
1616 // Histogram, in units of unit.
17 func NewTimeHistogram(h Histogram, unit time.Duration) TimeHistogram {
17 func NewTimeHistogram(unit time.Duration, h Histogram) TimeHistogram {
1818 return &timeHistogram{
19 unit: unit,
1920 Histogram: h,
20 unit: unit,
2121 }
2222 }
2323
1212 const metricName string = "test_time_histogram"
1313 quantiles := []int{50, 90, 99}
1414 h0 := expvar.NewHistogram(metricName, 0, 200, 3, quantiles...)
15 h := metrics.NewTimeHistogram(h0, time.Millisecond)
15 h := metrics.NewTimeHistogram(time.Millisecond, h0)
1616 const seed, mean, stdev int64 = 321, 100, 20
1717
1818 for i := 0; i < 4321; i++ {
+0
-18
server/gate.go less more
0 package server
1
2 import (
3 "golang.org/x/net/context"
4 )
5
6 // Gate returns a middleware that gates requests. If the gating function
7 // returns an error, the request is aborted, and that error is returned.
8 func Gate(allow func(context.Context, Request) error) func(Endpoint) Endpoint {
9 return func(next Endpoint) Endpoint {
10 return func(ctx context.Context, req Request) (Response, error) {
11 if err := allow(ctx, req); err != nil {
12 return nil, err
13 }
14 return next(ctx, req)
15 }
16 }
17 }
00 package zipkin
11
22 import (
3 "errors"
3 "math/rand"
44 "net/http"
5
6 "github.com/peterbourgon/gokit/server"
5 "strconv"
76
87 "golang.org/x/net/context"
98 )
1413 spanIDHTTPHeader = "X-B3-SpanId"
1514 parentSpanIDHTTPHeader = "X-B3-ParentSpanId"
1615
17 // TraceIDContextKey holds the Zipkin TraceId, if available.
16 // TraceIDContextKey holds the Zipkin TraceId.
1817 TraceIDContextKey = "Zipkin-Trace-ID"
1918
20 // SpanIDContextKey holds the Zipkin SpanId, if available.
19 // SpanIDContextKey holds the Zipkin SpanId.
2120 SpanIDContextKey = "Zipkin-Span-ID"
2221
2322 // ParentSpanIDContextKey holds the Zipkin ParentSpanId, if available.
2423 ParentSpanIDContextKey = "Zipkin-Parent-Span-ID"
2524 )
2625
27 // ErrMissingZipkinHeaders is returned when a context doesn't contain Zipkin
28 // trace, span, or parent span IDs.
29 var ErrMissingZipkinHeaders = errors.New("Zipkin headers missing from request context")
30
31 // GetHeaders extracts Zipkin headers from the HTTP request, and populates
32 // them into the context, if present.
33 func GetHeaders(ctx context.Context, header http.Header) context.Context {
34 if val := header.Get(traceIDHTTPHeader); val != "" {
26 // GetFromHTTP implements transport/http.BeforeFunc, populating Zipkin headers
27 // into the context from the HTTP headers. It will generate new trace and span
28 // IDs if none are found.
29 func GetFromHTTP(ctx context.Context, r *http.Request) context.Context {
30 if val := r.Header.Get(traceIDHTTPHeader); val != "" {
3531 ctx = context.WithValue(ctx, TraceIDContextKey, val)
32 } else {
33 ctx = context.WithValue(ctx, TraceIDContextKey, strconv.FormatInt(rand.Int63(), 16))
3634 }
37 if val := header.Get(spanIDHTTPHeader); val != "" {
35 if val := r.Header.Get(spanIDHTTPHeader); val != "" {
3836 ctx = context.WithValue(ctx, SpanIDContextKey, val)
37 } else {
38 ctx = context.WithValue(ctx, SpanIDContextKey, strconv.FormatInt(rand.Int63(), 16))
3939 }
40 if val := header.Get(parentSpanIDHTTPHeader); val != "" {
40 if val := r.Header.Get(parentSpanIDHTTPHeader); val != "" {
4141 ctx = context.WithValue(ctx, ParentSpanIDContextKey, val)
4242 }
4343 return ctx
4444 }
4545
46 // RequireInContext implements the server.Gate allow func by checking if the
47 // context contains extracted Zipkin headers. Contexts without all headers
48 // aren't allowed to proceed.
49 func RequireInContext(ctx context.Context, _ server.Request) error {
50 if ctx.Value(TraceIDContextKey) == nil || ctx.Value(SpanIDContextKey) == nil || ctx.Value(ParentSpanIDContextKey) == nil {
51 return ErrMissingZipkinHeaders
46 // SetHTTPHeaders copies Zipkin headers from the context into the HTTP header.
47 func SetHTTPHeaders(ctx context.Context, h http.Header) {
48 for ctxKey, hdrKey := range map[string]string{
49 TraceIDContextKey: traceIDHTTPHeader,
50 SpanIDContextKey: spanIDHTTPHeader,
51 ParentSpanIDContextKey: parentSpanIDHTTPHeader,
52 } {
53 if val := ctx.Value(ctxKey); val != nil {
54 s, ok := val.(string)
55 if !ok {
56 panic("context value for " + ctxKey + " isn't string")
57 }
58 h.Set(hdrKey, s)
59 }
5260 }
53 return nil
5461 }
0 package zipkin_test
1
2 import (
3 "math/rand"
4 "net/http"
5 "testing"
6
7 "github.com/peterbourgon/gokit/server/zipkin"
8 "golang.org/x/net/context"
9 )
10
11 func TestGeneration(t *testing.T) {
12 rand.Seed(123)
13
14 r, _ := http.NewRequest("GET", "http://cool.horse", nil)
15 ctx := zipkin.GetFromHTTP(context.Background(), r)
16
17 for key, want := range map[string]string{
18 zipkin.TraceIDContextKey: "4a68998bed5c40f1",
19 zipkin.SpanIDContextKey: "35b51599210f9ba",
20 } {
21 val := ctx.Value(key)
22 if val == nil {
23 t.Errorf("%s: no entry", key)
24 continue
25 }
26 have, ok := val.(string)
27 if !ok {
28 t.Errorf("%s: value not a string", key)
29 continue
30 }
31 if want != have {
32 t.Errorf("%s: want %q, have %q", key, want, have)
33 continue
34 }
35 }
36 }
37
38 func TestHTTPHeaders(t *testing.T) {
39 ids := map[string]string{
40 zipkin.TraceIDContextKey: "some_trace_id",
41 zipkin.SpanIDContextKey: "some_span_id",
42 zipkin.ParentSpanIDContextKey: "some_parent_span_id",
43 }
44
45 ctx0 := context.Background()
46 for key, val := range ids {
47 ctx0 = context.WithValue(ctx0, key, val)
48 }
49 r, _ := http.NewRequest("GET", "http://best.horse", nil)
50 zipkin.SetHTTPHeaders(ctx0, r.Header)
51 ctx1 := zipkin.GetFromHTTP(context.Background(), r)
52
53 for key, want := range ids {
54 val := ctx1.Value(key)
55 if val == nil {
56 t.Errorf("%s: no entry", key)
57 continue
58 }
59 have, ok := val.(string)
60 if !ok {
61 t.Errorf("%s: value not a string", key)
62 continue
63 }
64 if want != have {
65 t.Errorf("%s: want %q, have %q", key, want, have)
66 continue
67 }
68 }
69 }
0 package http
1
2 import (
3 "net/http"
4
5 "golang.org/x/net/context"
6 )
7
8 // BeforeFunc may take information from a HTTP request and put it into a
9 // request context. BeforeFuncs are executed in HTTP bindings, prior to
10 // invoking the endpoint.
11 type BeforeFunc func(context.Context, *http.Request) context.Context
12
13 // AfterFunc may take information from a request context and use it to
14 // manipulate a ResponseWriter. AfterFuncs are executed in HTTP bindings,
15 // after invoking the endpoint but prior to writing a response.
16 type AfterFunc func(context.Context, http.ResponseWriter)
17
18 // SetContentType returns a AfterFunc that sets the HTTP Content-Type
19 // header to the provided value.
20 func SetContentType(value string) AfterFunc {
21 return func(_ context.Context, w http.ResponseWriter) {
22 w.Header().Set("Content-Type", value)
23 }
24 }
0 package http
1
2 import (
3 "net/http"
4
5 "golang.org/x/net/context"
6
7 "github.com/peterbourgon/gokit/server"
8 "github.com/peterbourgon/gokit/transport/codec"
9 )
10
11 // BindingOption sets a parameter for the binding.
12 type BindingOption func(*binding)
13
14 // Before adds pre-RPC BeforeFuncs to the binding.
15 func Before(funcs ...BeforeFunc) BindingOption {
16 return func(b *binding) { b.before = append(b.before, funcs...) }
17 }
18
19 // After adds post-RPC AfterFuncs to the binding.
20 func After(funcs ...AfterFunc) BindingOption {
21 return func(b *binding) { b.after = append(b.after, funcs...) }
22 }
23
24 type binding struct {
25 context.Context
26 codec.Codec
27 server.Endpoint
28 before []BeforeFunc
29 after []AfterFunc
30 }
31
32 // NewBinding returns an HTTP handler that wraps the given endpoint.
33 func NewBinding(ctx context.Context, cdc codec.Codec, endpoint server.Endpoint, options ...BindingOption) http.Handler {
34 b := &binding{
35 Context: ctx,
36 Codec: cdc,
37 Endpoint: endpoint,
38 }
39 for _, option := range options {
40 option(b)
41 }
42 return b
43 }
44
45 func (b *binding) ServeHTTP(w http.ResponseWriter, r *http.Request) {
46 // Per-request context.
47 ctx, cancel := context.WithCancel(b.Context)
48 defer cancel()
49
50 // Prepare the RPC's context with details from the request.
51 for _, f := range b.before {
52 ctx = f(ctx, r)
53 }
54
55 // Decode request.
56 req, ctx, err := b.Codec.Decode(ctx, r.Body)
57 if err != nil {
58 http.Error(w, err.Error(), http.StatusBadRequest)
59 return
60 }
61
62 // Execute RPC.
63 resp, err := b.Endpoint(ctx, req)
64 if err != nil {
65 http.Error(w, err.Error(), http.StatusInternalServerError)
66 return
67 }
68
69 // Prepare the ResponseWriter.
70 for _, f := range b.after {
71 f(ctx, w)
72 }
73
74 // Encode response.
75 if err := b.Codec.Encode(w, resp); err != nil {
76 http.Error(w, err.Error(), http.StatusInternalServerError)
77 return
78 }
79 }