Codebase list golang-github-go-kit-kit / 24833ee
Merge branch 'master' into readmes Peter Bourgon 8 years ago
75 changed file(s) with 2297 addition(s) and 737 deletion(s). Raw diff Collapse all Expand all
00 addsvc/addsvc
1 addsvc/client/addcli/addcli
12 cover.out
23
34 # Compiled Object files, Static and Dynamic libs (Shared Objects)
00 # Go kit [![Circle CI](https://circleci.com/gh/go-kit/kit.svg?style=svg)](https://circleci.com/gh/go-kit/kit) [![Drone.io](https://drone.io/github.com/go-kit/kit/status.png)](https://drone.io/github.com/go-kit/kit/latest) [![Travis CI](https://travis-ci.org/go-kit/kit.svg?branch=master)](https://travis-ci.org/go-kit/kit) [![GoDoc](https://godoc.org/github.com/go-kit/kit?status.svg)](https://godoc.org/github.com/go-kit/kit)
11
2 **Go kit** is a **distributed programming toolkit** designed for microservices.
2 **Go kit** is a **distributed programming toolkit** for microservices in the modern enterprise. We want to make Go a viable choice for application (business-logic) software in large organizations.
33
44 - Mailing list: [go-kit](https://groups.google.com/forum/#!forum/go-kit)
55 - Slack: [gophers.slack.com](https://gophers.slack.com) **#go-kit** ([invite](http://bit.ly/go-slack-signup))
4141 ## Component status
4242
4343 - [API stability](https://github.com/go-kit/kit/blob/master/rfc/rfc007-api-stability.md) — **adopted**
44 - [`package log`](https://github.com/go-kit/kit/tree/master/log) — **implemented**
4445 - [`package metrics`](https://github.com/go-kit/kit/tree/master/metrics) — **implemented**
45 - [`package server`](https://github.com/go-kit/kit/tree/master/server) — **implemented**
46 - [`package endpoint`](https://github.com/go-kit/kit/tree/master/endpoint) — **implemented**
4647 - [`package transport`](https://github.com/go-kit/kit/tree/master/transport) — **implemented**
47 - [`package log`](https://github.com/go-kit/kit/tree/master/log) — **implemented**
48 - [`package tracing`](https://github.com/go-kit/kit/tree/master/tracing) — Zipkin supported, Appdash coming
48 - [`package circuitbreaker`](https://github.com/go-kit/kit/tree/master/circuitbreaker) — **implemented**
49 - [`package loadbalancer`](https://github.com/go-kit/kit/tree/master/loadbalancer) — **implemented**
50 - [`package ratelimit`](https://github.com/go-kit/kit/tree/master/ratelimit) — **implemented**
51 - [`package tracing`](https://github.com/go-kit/kit/tree/master/tracing) — prototyping
52 - Client patterns — prototyping
4953 - Service discovery — pending
54 - Example [addsvc](https://github.com/go-kit/kit/tree/master/addsvc) — **implemented**
5055
5156 ### Dependency management
5257
6772
6873 ### API stability policy
6974
70 The Go kit project depends on code maintained by others. This includes the Go
71 standard library and sub-repositories and other external libraries. The Go
72 language and standard library provide stability guarantees, but the other
73 external libraries typically do not.
75 The Go kit project depends on code maintained by others.
76 This includes the Go standard library and sub-repositories and other external libraries.
77 The Go language and standard library provide stability guarantees, but the other external libraries typically do not.
7478 [The API Stability RFC](https://github.com/go-kit/kit/tree/master/rfc/rfc007-api-stability.md)
75 proposes a standard policy for package authors to advertise API stability. The
76 Go kit project prefers to depend on code that abides the API stability policy.
79 proposes a standard policy for package authors to advertise API stability.
80 The Go kit project prefers to depend on code that abides the API stability policy.
7781
7882 ## Related projects
7983
6060
6161 ## Client
6262
63 TODO
63 addsvc comes with an example client, [addcli][].
64
65 [addcli]: https://github.com/go-kit/kit/blob/master/addsvc/client/addcli/main.go
66
67 ```
68 $ cd client/addcli
69 $ go install
70 $ addcli
71 ```
72
55 import (
66 "bytes"
77 "fmt"
8
89 "github.com/apache/thrift/lib/go/thrift"
910 )
1011
22 import (
33 "golang.org/x/net/context"
44
5 "github.com/go-kit/kit/addsvc/reqrep"
56 "github.com/go-kit/kit/endpoint"
67 "github.com/go-kit/kit/log"
78 )
1819 // service.
1920 func proxyAdd(remote endpoint.Endpoint, logger log.Logger) Add {
2021 return func(ctx context.Context, a, b int64) int64 {
21 resp, err := remote(ctx, &addRequest{a, b})
22 resp, err := e(ctx, reqrep.AddRequest{A: a, B: b})
2223 if err != nil {
2324 logger.Log("err", err)
2425 return 0
2526 }
26 addResp, ok := resp.(*addResponse)
27 addResp, ok := resp.(reqrep.AddResponse)
2728 if !ok {
2829 logger.Log("err", endpoint.ErrBadCast)
2930 return 0
0 package main
1
2 import (
3 "flag"
4 "log"
5 "net/rpc"
6 "net/url"
7 "os"
8 "strings"
9
10 "golang.org/x/net/context"
11 "google.golang.org/grpc"
12
13 "github.com/apache/thrift/lib/go/thrift"
14 thriftadd "github.com/go-kit/kit/addsvc/_thrift/gen-go/add"
15 grpcclient "github.com/go-kit/kit/addsvc/client/grpc"
16 httpclient "github.com/go-kit/kit/addsvc/client/http"
17 netrpcclient "github.com/go-kit/kit/addsvc/client/netrpc"
18 thriftclient "github.com/go-kit/kit/addsvc/client/thrift"
19 "github.com/go-kit/kit/addsvc/reqrep"
20 "github.com/go-kit/kit/endpoint"
21 )
22
23 func main() {
24 // Flag domain. Note that gRPC transitively registers flags via its import
25 // of glog. So, we define a new flag set, to keep those domains distinct.
26 fs := flag.NewFlagSet("", flag.ExitOnError)
27 var (
28 transport = fs.String("transport", "grpc", "http, grpc, netrpc, thrift")
29 httpAddr = fs.String("http.addr", "localhost:8001", "HTTP (JSON) address")
30 grpcAddr = fs.String("grpc.addr", "localhost:8002", "gRPC address")
31 netrpcAddr = fs.String("netrpc.addr", "localhost:8003", "net/rpc address")
32 thriftAddr = fs.String("thrift.addr", "localhost:8004", "Thrift address")
33 thriftProtocol = fs.String("thrift.protocol", "binary", "binary, compact, json, simplejson")
34 thriftBufferSize = fs.Int("thrift.buffer.size", 0, "0 for unbuffered")
35 thriftFramed = fs.Bool("thrift.framed", false, "true to enable framing")
36 a = fs.Int64("a", 1, "a value")
37 b = fs.Int64("b", 2, "b value")
38 )
39 flag.Usage = fs.Usage // only show our flags
40 fs.Parse(os.Args[1:])
41 log.SetFlags(0)
42 log.SetOutput(os.Stdout)
43
44 var e endpoint.Endpoint
45 switch *transport {
46 case "http":
47 if !strings.HasPrefix(*httpAddr, "http") {
48 *httpAddr = "http://" + *httpAddr
49 }
50 u, err := url.Parse(*httpAddr)
51 if err != nil {
52 log.Fatalf("url.Parse: %v", err)
53 }
54 if u.Path == "" {
55 u.Path = "/add"
56 }
57 e = httpclient.NewClient("GET", u.String())
58
59 case "grpc":
60 cc, err := grpc.Dial(*grpcAddr)
61 if err != nil {
62 log.Fatalf("grpc.Dial: %v", err)
63 }
64 e = grpcclient.NewClient(cc)
65
66 case "netrpc":
67 client, err := rpc.DialHTTP("tcp", *netrpcAddr)
68 if err != nil {
69 log.Fatalf("rpc.DialHTTP: %v", err)
70 }
71 e = netrpcclient.NewClient(client)
72
73 case "thrift":
74 var protocolFactory thrift.TProtocolFactory
75 switch *thriftProtocol {
76 case "compact":
77 protocolFactory = thrift.NewTCompactProtocolFactory()
78 case "simplejson":
79 protocolFactory = thrift.NewTSimpleJSONProtocolFactory()
80 case "json":
81 protocolFactory = thrift.NewTJSONProtocolFactory()
82 case "binary", "":
83 protocolFactory = thrift.NewTBinaryProtocolFactoryDefault()
84 default:
85 log.Fatalf("invalid protocol %q", *thriftProtocol)
86 }
87
88 var transportFactory thrift.TTransportFactory
89 if *thriftBufferSize > 0 {
90 transportFactory = thrift.NewTBufferedTransportFactory(*thriftBufferSize)
91 } else {
92 transportFactory = thrift.NewTTransportFactory()
93 }
94
95 if *thriftFramed {
96 transportFactory = thrift.NewTFramedTransportFactory(transportFactory)
97 }
98
99 transportSocket, err := thrift.NewTSocket(*thriftAddr)
100 if err != nil {
101 log.Fatalf("thrift.NewTSocket: %v", err)
102 }
103
104 transport := transportFactory.GetTransport(transportSocket)
105 defer transport.Close()
106 if err := transport.Open(); err != nil {
107 log.Fatalf("Thrift transport.Open: %v", err)
108 }
109
110 e = thriftclient.NewClient(thriftadd.NewAddServiceClientFactory(transport, protocolFactory))
111
112 default:
113 log.Fatalf("unsupported transport %q", *transport)
114 }
115
116 response, err := e(context.Background(), reqrep.AddRequest{A: *a, B: *b})
117 if err != nil {
118 log.Fatalf("when invoking request: %v", err)
119 }
120 addResponse, ok := response.(reqrep.AddResponse)
121 if !ok {
122 log.Fatalf("when type-asserting response: %v", endpoint.ErrBadCast)
123 }
124 log.Print(addResponse.V)
125 }
0 package grpc
1
2 import (
3 "golang.org/x/net/context"
4 "google.golang.org/grpc"
5
6 "github.com/go-kit/kit/addsvc/pb"
7 "github.com/go-kit/kit/addsvc/reqrep"
8 "github.com/go-kit/kit/endpoint"
9 )
10
11 // NewClient takes a gRPC ClientConn that should point to an instance of an
12 // addsvc. It returns an endpoint that wraps and invokes that ClientConn.
13 func NewClient(cc *grpc.ClientConn) endpoint.Endpoint {
14 client := pb.NewAddClient(cc)
15 return func(ctx context.Context, request interface{}) (interface{}, error) {
16 var (
17 errs = make(chan error, 1)
18 responses = make(chan interface{}, 1)
19 )
20 go func() {
21 addReq, ok := request.(reqrep.AddRequest)
22 if !ok {
23 errs <- endpoint.ErrBadCast
24 return
25 }
26 reply, err := client.Add(ctx, &pb.AddRequest{A: addReq.A, B: addReq.B})
27 if err != nil {
28 errs <- err
29 return
30 }
31 responses <- reqrep.AddResponse{V: reply.V}
32 }()
33 select {
34 case <-ctx.Done():
35 return nil, context.DeadlineExceeded
36 case err := <-errs:
37 return nil, err
38 case response := <-responses:
39 return response, nil
40 }
41 }
42 }
0 package http
1
2 import (
3 "bytes"
4 "encoding/json"
5 "net/http"
6
7 "golang.org/x/net/context"
8
9 "github.com/go-kit/kit/addsvc/reqrep"
10 "github.com/go-kit/kit/endpoint"
11 httptransport "github.com/go-kit/kit/transport/http"
12 )
13
14 // NewClient takes a URL that should point to an instance of an addsvc. It
15 // returns an endpoint that makes a request to that URL.
16 func NewClient(method, url string, before ...httptransport.BeforeFunc) endpoint.Endpoint {
17 return func(ctx0 context.Context, request interface{}) (interface{}, error) {
18 var (
19 ctx, cancel = context.WithCancel(ctx0)
20 errs = make(chan error, 1)
21 responses = make(chan interface{}, 1)
22 )
23 defer cancel()
24 go func() {
25 var buf bytes.Buffer
26 if err := json.NewEncoder(&buf).Encode(request); err != nil {
27 errs <- err
28 return
29 }
30 req, err := http.NewRequest(method, url, &buf)
31 if err != nil {
32 errs <- err
33 return
34 }
35 for _, f := range before {
36 ctx = f(ctx, req)
37 }
38 resp, err := http.DefaultClient.Do(req)
39 if err != nil {
40 errs <- err
41 return
42 }
43 defer resp.Body.Close()
44 var response reqrep.AddResponse
45 if err := json.NewDecoder(resp.Body).Decode(&response); err != nil {
46 errs <- err
47 return
48 }
49 responses <- response
50 }()
51 select {
52 case <-ctx.Done():
53 return nil, context.DeadlineExceeded
54 case err := <-errs:
55 return nil, err
56 case response := <-responses:
57 return response, nil
58 }
59 }
60 }
0 package netrpc
1
2 import (
3 "net/rpc"
4
5 "golang.org/x/net/context"
6
7 "github.com/go-kit/kit/addsvc/reqrep"
8 "github.com/go-kit/kit/endpoint"
9 )
10
11 // NewClient takes a net/rpc Client that should point to an instance of an
12 // addsvc. It returns an endpoint that wraps and invokes that Client.
13 func NewClient(c *rpc.Client) endpoint.Endpoint {
14 return func(ctx context.Context, request interface{}) (interface{}, error) {
15 var (
16 errs = make(chan error, 1)
17 responses = make(chan interface{}, 1)
18 )
19 go func() {
20 var response reqrep.AddResponse
21 if err := c.Call("addsvc.Add", request, &response); err != nil {
22 errs <- err
23 return
24 }
25 responses <- response
26 }()
27 select {
28 case <-ctx.Done():
29 return nil, context.DeadlineExceeded
30 case err := <-errs:
31 return nil, err
32 case response := <-responses:
33 return response, nil
34 }
35 }
36 }
0 package thrift
1
2 import (
3 "golang.org/x/net/context"
4
5 thriftadd "github.com/go-kit/kit/addsvc/_thrift/gen-go/add"
6 "github.com/go-kit/kit/addsvc/reqrep"
7 "github.com/go-kit/kit/endpoint"
8 )
9
10 // NewClient takes a Thrift AddServiceClient, which should point to an
11 // instance of an addsvc. It returns an endpoint that wraps and invokes that
12 // client.
13 func NewClient(client *thriftadd.AddServiceClient) endpoint.Endpoint {
14 return func(ctx context.Context, request interface{}) (interface{}, error) {
15 var (
16 errs = make(chan error, 1)
17 responses = make(chan interface{}, 1)
18 )
19 go func() {
20 addReq, ok := request.(reqrep.AddRequest)
21 if !ok {
22 errs <- endpoint.ErrBadCast
23 return
24 }
25 reply, err := client.Add(addReq.A, addReq.B)
26 if err != nil {
27 errs <- err
28 return
29 }
30 responses <- reqrep.AddResponse{V: reply.Value}
31 }()
32 select {
33 case <-ctx.Done():
34 return nil, context.DeadlineExceeded
35 case err := <-errs:
36 return nil, err
37 case response := <-responses:
38 return response, nil
39 }
40 }
41 }
22 import (
33 "golang.org/x/net/context"
44
5 "github.com/go-kit/kit/addsvc/reqrep"
56 "github.com/go-kit/kit/endpoint"
67 )
78
1718 return nil, endpoint.ErrContextCanceled
1819 }
1920
20 addReq, ok := request.(*addRequest)
21 addReq, ok := request.(reqrep.AddRequest)
2122 if !ok {
2223 return nil, endpoint.ErrBadCast
2324 }
2425
2526 v := a(ctx, addReq.A, addReq.B)
26
27 return addResponse{V: v}, nil
27 return reqrep.AddResponse{V: v}, nil
2828 }
2929 }
55 "golang.org/x/net/context"
66
77 "github.com/go-kit/kit/log"
8 "github.com/go-kit/kit/metrics"
89 )
910
1011 func logging(logger log.Logger) func(Add) Add {
1819 }
1920 }
2021 }
22
23 func instrument(requests metrics.Counter, duration metrics.TimeHistogram) func(Add) Add {
24 return func(next Add) Add {
25 return func(ctx context.Context, a, b int64) int64 {
26 defer func(begin time.Time) {
27 requests.Add(1)
28 duration.Observe(time.Since(begin))
29 }(time.Now())
30 return next(ctx, a, b)
31 }
32 }
33 }
00 package main
11
22 import (
3 "time"
4
53 "golang.org/x/net/context"
64
75 "github.com/go-kit/kit/addsvc/pb"
6 "github.com/go-kit/kit/addsvc/reqrep"
87 "github.com/go-kit/kit/endpoint"
9 "github.com/go-kit/kit/metrics"
108 )
119
1210 // A binding wraps an Endpoint so that it's usable by a transport. grpcBinding
1816 // As far as I can tell, gRPC doesn't (currently) provide a user-accessible
1917 // way to manipulate the RPC context, like headers for HTTP. So we don't have
2018 // a way to transport e.g. Zipkin IDs with the request. TODO.
21 func (b grpcBinding) Add(ctx context.Context, req *pb.AddRequest) (*pb.AddReply, error) {
22 addReq := addRequest{req.A, req.B}
23 r, err := b.Endpoint(ctx, addReq)
24 if err != nil {
19 func (b grpcBinding) Add(ctx0 context.Context, req *pb.AddRequest) (*pb.AddReply, error) {
20 var (
21 ctx, cancel = context.WithCancel(ctx0)
22 errs = make(chan error, 1)
23 replies = make(chan *pb.AddReply, 1)
24 )
25 defer cancel()
26 go func() {
27 r, err := b.Endpoint(ctx, reqrep.AddRequest{A: req.A, B: req.B})
28 if err != nil {
29 errs <- err
30 return
31 }
32 resp, ok := r.(reqrep.AddResponse)
33 if !ok {
34 errs <- endpoint.ErrBadCast
35 return
36 }
37 replies <- &pb.AddReply{V: resp.V}
38 }()
39 select {
40 case <-ctx.Done():
41 return nil, context.DeadlineExceeded
42 case err := <-errs:
2543 return nil, err
26 }
27
28 resp, ok := r.(*addResponse)
29 if !ok {
30 return nil, endpoint.ErrBadCast
31 }
32
33 return &pb.AddReply{
34 V: resp.V,
35 }, nil
36 }
37
38 func grpcInstrument(requests metrics.Counter, duration metrics.Histogram) func(pb.AddServer) pb.AddServer {
39 return func(next pb.AddServer) pb.AddServer {
40 return grpcInstrumented{requests, duration, next}
44 case reply := <-replies:
45 return reply, nil
4146 }
4247 }
43
44 type grpcInstrumented struct {
45 requests metrics.Counter
46 duration metrics.Histogram
47 next pb.AddServer
48 }
49
50 func (i grpcInstrumented) Add(ctx context.Context, req *pb.AddRequest) (*pb.AddReply, error) {
51 i.requests.Add(1)
52 defer func(begin time.Time) { i.duration.Observe(time.Since(begin).Nanoseconds()) }(time.Now())
53 return i.next.Add(ctx, req)
54 }
00 package main
11
22 import (
3 "encoding/json"
34 "net/http"
4 "time"
55
6 "github.com/go-kit/kit/metrics"
6 "golang.org/x/net/context"
7
8 "github.com/go-kit/kit/addsvc/reqrep"
9 "github.com/go-kit/kit/endpoint"
10 httptransport "github.com/go-kit/kit/transport/http"
711 )
812
9 // HTTP bindings require no service-specific declarations, and so are defined
10 // in transport/http.
11
12 func httpInstrument(requests metrics.Counter, duration metrics.Histogram) func(http.Handler) http.Handler {
13 return func(next http.Handler) http.Handler {
14 return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
15 requests.Add(1)
16 defer func(begin time.Time) { duration.Observe(time.Since(begin).Nanoseconds()) }(time.Now())
17 next.ServeHTTP(w, r)
18 })
13 func makeHTTPBinding(ctx context.Context, e endpoint.Endpoint, before []httptransport.BeforeFunc, after []httptransport.AfterFunc) http.Handler {
14 decode := func(r *http.Request) (interface{}, error) {
15 var request reqrep.AddRequest
16 if err := json.NewDecoder(r.Body).Decode(&request); err != nil {
17 return nil, err
18 }
19 r.Body.Close()
20 return request, nil
21 }
22 encode := func(w http.ResponseWriter, response interface{}) error {
23 return json.NewEncoder(w).Encode(response)
24 }
25 return httptransport.Server{
26 Context: ctx,
27 Endpoint: e,
28 DecodeFunc: decode,
29 EncodeFunc: encode,
30 Before: before,
31 After: append([]httptransport.AfterFunc{httptransport.SetContentType("application/json; charset=utf-8")}, after...),
1932 }
2033 }
88 "net"
99 "net/http"
1010 _ "net/http/pprof"
11 "net/rpc"
1112 "os"
1213 "os/signal"
1314 "strings"
1617
1718 "github.com/apache/thrift/lib/go/thrift"
1819 stdprometheus "github.com/prometheus/client_golang/prometheus"
19 "github.com/streadway/handy/cors"
20 "github.com/streadway/handy/encoding"
2120 "golang.org/x/net/context"
2221 "google.golang.org/grpc"
2322
2423 thriftadd "github.com/go-kit/kit/addsvc/_thrift/gen-go/add"
24 httpclient "github.com/go-kit/kit/addsvc/client/http"
2525 "github.com/go-kit/kit/addsvc/pb"
2626 "github.com/go-kit/kit/endpoint"
2727 kitlog "github.com/go-kit/kit/log"
3030 "github.com/go-kit/kit/metrics/prometheus"
3131 "github.com/go-kit/kit/metrics/statsd"
3232 "github.com/go-kit/kit/tracing/zipkin"
33 jsoncodec "github.com/go-kit/kit/transport/codec/json"
3433 httptransport "github.com/go-kit/kit/transport/http"
3534 )
3635
4241 debugAddr = fs.String("debug.addr", ":8000", "Address for HTTP debug/instrumentation server")
4342 httpAddr = fs.String("http.addr", ":8001", "Address for HTTP (JSON) server")
4443 grpcAddr = fs.String("grpc.addr", ":8002", "Address for gRPC server")
45 thriftAddr = fs.String("thrift.addr", ":8003", "Address for Thrift server")
44 netrpcAddr = fs.String("netrpc.addr", ":8003", "Address for net/rpc server")
45 thriftAddr = fs.String("thrift.addr", ":8004", "Address for Thrift server")
4646 thriftProtocol = fs.String("thrift.protocol", "binary", "binary, compact, json, simplejson")
4747 thriftBufferSize = fs.Int("thrift.buffer.size", 0, "0 for unbuffered")
4848 thriftFramed = fs.Bool("thrift.framed", false, "true to enable framing")
6161 // `package log` domain
6262 var logger kitlog.Logger
6363 logger = kitlog.NewLogfmtLogger(os.Stderr)
64 logger = kitlog.With(logger, "ts", kitlog.DefaultTimestampUTC, "caller", kitlog.DefaultCaller)
64 logger = kitlog.NewContext(logger).With("ts", kitlog.DefaultTimestampUTC)
6565 stdlog.SetOutput(kitlog.NewStdlibAdapter(logger)) // redirect stdlib logging to us
6666 stdlog.SetFlags(0) // flags are handled in our logger
6767
7676 Help: "Total number of received requests.",
7777 }, []string{}),
7878 )
79 duration := metrics.NewMultiHistogram(
80 expvar.NewHistogram("duration_nanoseconds_total", 0, 100000000, 3),
79 duration := metrics.NewTimeHistogram(time.Nanosecond, metrics.NewMultiHistogram(
80 expvar.NewHistogram("duration_nanoseconds_total", 0, 1e9, 3, 50, 95, 99),
8181 statsd.NewHistogram(ioutil.Discard, "duration_nanoseconds_total", time.Second),
8282 prometheus.NewSummary(stdprometheus.SummaryOpts{
8383 Namespace: "addsvc",
8585 Name: "duration_nanoseconds_total",
8686 Help: "Total nanoseconds spend serving requests.",
8787 }, []string{}),
88 )
88 ))
8989
9090 // `package tracing` domain
9191 zipkinHostPort := "localhost:1234" // TODO Zipkin makes overly simple assumptions about services
108108
109109 // Our business and operational domain
110110 var a Add = pureAdd
111 if *proxyHTTPURL != "" {
112 codec := jsoncodec.New()
113 makeResponse := func() interface{} { return &addResponse{} }
114
111 if *proxyHTTPAddr != "" {
115112 var e endpoint.Endpoint
116 e = httptransport.NewClient(*proxyHTTPURL, codec, makeResponse, httptransport.ClientBefore(zipkin.ToRequest(zipkinSpanFunc)))
113 e = httpclient.NewClient("GET", *proxyHTTPAddr, zipkin.ToRequest(zipkinSpanFunc))
117114 e = zipkin.AnnotateClient(zipkinSpanFunc, zipkinCollector)(e)
118
119115 a = proxyAdd(e, logger)
120116 }
121117 a = logging(logger)(a)
118 a = instrument(requests, duration)(a)
122119
123120 // Server domain
124121 var e endpoint.Endpoint
144141 go func() {
145142 ctx, cancel := context.WithCancel(root)
146143 defer cancel()
147
148 field := metrics.Field{Key: "transport", Value: "http"}
149 before := httptransport.BindingBefore(zipkin.ToContext(zipkinSpanFunc))
150 after := httptransport.BindingAfter(httptransport.SetContentType("application/json"))
151 makeRequest := func() interface{} { return &addRequest{} }
152
153 var handler http.Handler
154 handler = httptransport.NewBinding(ctx, makeRequest, jsoncodec.New(), e, before, after)
155 handler = encoding.Gzip(handler)
156 handler = cors.Middleware(cors.Config{})(handler)
157 handler = httpInstrument(requests.With(field), duration.With(field))(handler)
158
159 mux := http.NewServeMux()
160 mux.Handle("/add", handler)
161 logger.Log("addr", *httpAddr, "transport", "HTTP")
162 errc <- http.ListenAndServe(*httpAddr, mux)
144 before := []httptransport.BeforeFunc{zipkin.ToContext(zipkinSpanFunc)}
145 after := []httptransport.AfterFunc{}
146 handler := makeHTTPBinding(ctx, e, before, after)
147 logger.Log("addr", *httpAddr, "transport", "HTTP/JSON")
148 errc <- http.ListenAndServe(*httpAddr, handler)
163149 }()
164150
165151 // Transport: gRPC
170156 return
171157 }
172158 s := grpc.NewServer() // uses its own context?
173 field := metrics.Field{Key: "transport", Value: "grpc"}
174
175 var addServer pb.AddServer
176 addServer = grpcBinding{e}
177 addServer = grpcInstrument(requests.With(field), duration.With(field))(addServer)
178
179 pb.RegisterAddServer(s, addServer)
159 pb.RegisterAddServer(s, grpcBinding{e})
180160 logger.Log("addr", *grpcAddr, "transport", "gRPC")
181161 errc <- s.Serve(ln)
162 }()
163
164 // Transport: net/rpc
165 go func() {
166 ctx, cancel := context.WithCancel(root)
167 defer cancel()
168 s := rpc.NewServer()
169 s.RegisterName("addsvc", NetrpcBinding{ctx, e})
170 s.HandleHTTP(rpc.DefaultRPCPath, rpc.DefaultDebugPath)
171 logger.Log("addr", *netrpcAddr, "transport", "net/rpc")
172 errc <- http.ListenAndServe(*netrpcAddr, s)
182173 }()
183174
184175 // Transport: Thrift
218209 return
219210 }
220211
221 field := metrics.Field{Key: "transport", Value: "thrift"}
222
223 var service thriftadd.AddService
224 service = thriftBinding{ctx, e}
225 service = thriftInstrument(requests.With(field), duration.With(field))(service)
226
227212 logger.Log("addr", *thriftAddr, "transport", "Thrift")
228213 errc <- thrift.NewTSimpleServer4(
229 thriftadd.NewAddServiceProcessor(service),
214 thriftadd.NewAddServiceProcessor(thriftBinding{ctx, e}),
230215 transport,
231216 transportFactory,
232217 protocolFactory,
0 package main
1
2 import (
3 "golang.org/x/net/context"
4
5 "github.com/go-kit/kit/addsvc/reqrep"
6 "github.com/go-kit/kit/endpoint"
7 )
8
9 // NetrpcBinding makes an endpoint usable over net/rpc. It needs to be
10 // exported to be picked up by net/rpc.
11 type NetrpcBinding struct {
12 ctx context.Context // has methods which should not be made available
13 endpoint.Endpoint
14 }
15
16 // Add implements the net/rpc method definition.
17 func (b NetrpcBinding) Add(request reqrep.AddRequest, response *reqrep.AddResponse) error {
18 var (
19 ctx, cancel = context.WithCancel(b.ctx)
20 errs = make(chan error, 1)
21 responses = make(chan reqrep.AddResponse, 1)
22 )
23 defer cancel()
24 go func() {
25 resp, err := b.Endpoint(ctx, request)
26 if err != nil {
27 errs <- err
28 return
29 }
30 addResp, ok := resp.(reqrep.AddResponse)
31 if !ok {
32 errs <- endpoint.ErrBadCast
33 return
34 }
35 responses <- addResp
36 }()
37 select {
38 case <-ctx.Done():
39 return context.DeadlineExceeded
40 case err := <-errs:
41 return err
42 case resp := <-responses:
43 (*response) = resp
44 return nil
45 }
46 }
0 package reqrep
1
2 // The concrete request and response types are defined for each method our
3 // service implements. Request types should be annotated sufficiently for all
4 // transports we intend to use.
5
6 // AddRequest is a request for the add method.
7 type AddRequest struct {
8 A int64 `json:"a"`
9 B int64 `json:"b"`
10 }
11
12 // AddResponse is a response to the add method.
13 type AddResponse struct {
14 V int64 `json:"v"`
15 }
+0
-14
addsvc/request_response.go less more
0 package main
1
2 // The concrete request and response types are defined for each method our
3 // service implements. Request types should be annotated sufficiently for all
4 // transports we intend to use.
5
6 type addRequest struct {
7 A int64 `json:"a"`
8 B int64 `json:"b"`
9 }
10
11 type addResponse struct {
12 V int64 `json:"v"`
13 }
00 package main
11
22 import (
3 "time"
4
53 "golang.org/x/net/context"
64
75 thriftadd "github.com/go-kit/kit/addsvc/_thrift/gen-go/add"
6 "github.com/go-kit/kit/addsvc/reqrep"
87 "github.com/go-kit/kit/endpoint"
9 "github.com/go-kit/kit/metrics"
108 )
119
1210 // A binding wraps an Endpoint so that it's usable by a transport.
1816
1917 // Add implements Thrift's AddService interface.
2018 func (tb thriftBinding) Add(a, b int64) (*thriftadd.AddReply, error) {
21 r, err := tb.Endpoint(tb.Context, addRequest{a, b})
22 if err != nil {
19 var (
20 ctx, cancel = context.WithCancel(tb.Context)
21 errs = make(chan error, 1)
22 replies = make(chan *thriftadd.AddReply, 1)
23 )
24 defer cancel()
25 go func() {
26 r, err := tb.Endpoint(ctx, reqrep.AddRequest{A: a, B: b})
27 if err != nil {
28 errs <- err
29 return
30 }
31 resp, ok := r.(reqrep.AddResponse)
32 if !ok {
33 errs <- endpoint.ErrBadCast
34 return
35 }
36 replies <- &thriftadd.AddReply{Value: resp.V}
37 }()
38 select {
39 case <-ctx.Done():
40 return nil, context.DeadlineExceeded
41 case err := <-errs:
2342 return nil, err
24 }
25
26 resp, ok := r.(*addResponse)
27 if !ok {
28 return nil, endpoint.ErrBadCast
29 }
30
31 return &thriftadd.AddReply{Value: resp.V}, nil
32 }
33
34 func thriftInstrument(requests metrics.Counter, duration metrics.Histogram) func(thriftadd.AddService) thriftadd.AddService {
35 return func(next thriftadd.AddService) thriftadd.AddService {
36 return thriftInstrumented{requests, duration, next}
43 case reply := <-replies:
44 return reply, nil
3745 }
3846 }
39
40 type thriftInstrumented struct {
41 requests metrics.Counter
42 duration metrics.Histogram
43 next thriftadd.AddService
44 }
45
46 func (i thriftInstrumented) Add(a, b int64) (*thriftadd.AddReply, error) {
47 i.requests.Add(1)
48 defer func(begin time.Time) { i.duration.Observe(time.Since(begin).Nanoseconds()) }(time.Now())
49 return i.next.Add(a, b)
50 }
0 package circuitbreaker
1
2 import (
3 "github.com/sony/gobreaker"
4 "golang.org/x/net/context"
5
6 "github.com/go-kit/kit/endpoint"
7 )
8
9 // Gobreaker returns an endpoint.Middleware that implements the circuit
10 // breaker pattern using the sony/gobreaker package. Only errors returned by
11 // the wrapped endpoint count against the circuit breaker's error count.
12 //
13 // See http://godoc.org/github.com/sony/gobreaker for more information.
14 func Gobreaker(cb *gobreaker.CircuitBreaker) endpoint.Middleware {
15 return func(next endpoint.Endpoint) endpoint.Endpoint {
16 return func(ctx context.Context, request interface{}) (interface{}, error) {
17 return cb.Execute(func() (interface{}, error) { return next(ctx, request) })
18 }
19 }
20 }
0 package circuitbreaker_test
1
2 import (
3 "testing"
4
5 "github.com/sony/gobreaker"
6
7 "github.com/go-kit/kit/circuitbreaker"
8 )
9
10 func TestGobreaker(t *testing.T) {
11 var (
12 breaker = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{}))
13 primeWith = 100
14 shouldPass = func(n int) bool { return n <= 5 } // https://github.com/sony/gobreaker/blob/bfa846d/gobreaker.go#L76
15 circuitOpenError = "circuit breaker is open"
16 )
17 testFailingEndpoint(t, breaker, primeWith, shouldPass, circuitOpenError)
18 }
0 package circuitbreaker
1
2 import (
3 "time"
4
5 "github.com/streadway/handy/breaker"
6 "golang.org/x/net/context"
7
8 "github.com/go-kit/kit/endpoint"
9 )
10
11 // HandyBreaker returns an endpoint.Middleware that implements the circuit
12 // breaker pattern using the streadway/handy/breaker package. Only errors
13 // returned by the wrapped endpoint count against the circuit breaker's error
14 // count.
15 //
16 // See http://godoc.org/github.com/streadway/handy/breaker for more
17 // information.
18 func HandyBreaker(cb breaker.Breaker) endpoint.Middleware {
19 return func(next endpoint.Endpoint) endpoint.Endpoint {
20 return func(ctx context.Context, request interface{}) (response interface{}, err error) {
21 if !cb.Allow() {
22 return nil, breaker.ErrCircuitOpen
23 }
24
25 defer func(begin time.Time) {
26 if err == nil {
27 cb.Success(time.Since(begin))
28 } else {
29 cb.Failure(time.Since(begin))
30 }
31 }(time.Now())
32
33 response, err = next(ctx, request)
34 return
35 }
36 }
37 }
0 package circuitbreaker_test
1
2 import (
3 "testing"
4
5 handybreaker "github.com/streadway/handy/breaker"
6
7 "github.com/go-kit/kit/circuitbreaker"
8 )
9
10 func TestHandyBreaker(t *testing.T) {
11 var (
12 failureRatio = 0.05
13 breaker = circuitbreaker.HandyBreaker(handybreaker.NewBreaker(failureRatio))
14 primeWith = handybreaker.DefaultMinObservations * 10
15 shouldPass = func(n int) bool { return (float64(n) / float64(primeWith+n)) <= failureRatio }
16 openCircuitError = handybreaker.ErrCircuitOpen.Error()
17 )
18 testFailingEndpoint(t, breaker, primeWith, shouldPass, openCircuitError)
19 }
0 package circuitbreaker
1
2 import (
3 "github.com/afex/hystrix-go/hystrix"
4 "golang.org/x/net/context"
5
6 "github.com/go-kit/kit/endpoint"
7 )
8
9 // Hystrix returns an endpoint.Middleware that implements the circuit
10 // breaker pattern using the afex/hystrix-go package.
11 //
12 // When using this circuit breaker, please configure your commands separately.
13 //
14 // See https://godoc.org/github.com/afex/hystrix-go/hystrix for more
15 // information.
16 func Hystrix(commandName string) endpoint.Middleware {
17 return func(next endpoint.Endpoint) endpoint.Endpoint {
18 return func(ctx context.Context, request interface{}) (response interface{}, err error) {
19 output := make(chan interface{}, 1)
20 errors := hystrix.Go(commandName, func() error {
21 resp, err := next(ctx, request)
22 if err == nil {
23 output <- resp
24 }
25 return err
26 }, nil)
27
28 select {
29 case resp := <-output:
30 return resp, nil
31 case err := <-errors:
32 return nil, err
33 }
34 }
35 }
36 }
0 package circuitbreaker_test
1
2 import (
3 stdlog "log"
4 "os"
5 "testing"
6
7 "github.com/afex/hystrix-go/hystrix"
8
9 "github.com/go-kit/kit/circuitbreaker"
10 kitlog "github.com/go-kit/kit/log"
11 )
12
13 func TestHystrix(t *testing.T) {
14 logger := kitlog.NewLogfmtLogger(os.Stderr)
15 stdlog.SetOutput(kitlog.NewStdlibAdapter(logger))
16
17 const (
18 commandName = "my-endpoint"
19 errorPercent = 5
20 maxConcurrent = 1000
21 )
22 hystrix.ConfigureCommand(commandName, hystrix.CommandConfig{
23 ErrorPercentThreshold: errorPercent,
24 MaxConcurrentRequests: maxConcurrent,
25 })
26
27 var (
28 breaker = circuitbreaker.Hystrix(commandName)
29 primeWith = hystrix.DefaultVolumeThreshold * 2
30 shouldPass = func(n int) bool { return (float64(n) / float64(primeWith+n)) <= (float64(errorPercent-1) / 100.0) }
31 openCircuitError = hystrix.ErrCircuitOpen.Error()
32 )
33 testFailingEndpoint(t, breaker, primeWith, shouldPass, openCircuitError)
34 }
0 package circuitbreaker_test
1
2 import (
3 "errors"
4 "testing"
5
6 "golang.org/x/net/context"
7
8 "github.com/go-kit/kit/endpoint"
9 )
10
11 func testFailingEndpoint(t *testing.T, breaker endpoint.Middleware, primeWith int, shouldPass func(int) bool, openCircuitError string) {
12 // Create a mock endpoint and wrap it with the breaker.
13 m := mock{}
14 var e endpoint.Endpoint
15 e = m.endpoint
16 e = breaker(e)
17
18 // Prime the endpoint with successful requests.
19 for i := 0; i < primeWith; i++ {
20 if _, err := e(context.Background(), struct{}{}); err != nil {
21 t.Fatalf("during priming, got error: %v", err)
22 }
23 }
24
25 // Switch the endpoint to start throwing errors.
26 m.err = errors.New("tragedy+disaster")
27 m.thru = 0
28
29 // The first several should be allowed through and yield our error.
30 for i := 0; shouldPass(i); i++ {
31 if _, err := e(context.Background(), struct{}{}); err != m.err {
32 t.Fatalf("want %v, have %v", m.err, err)
33 }
34 }
35 thru := m.thru
36
37 // But the rest should be blocked by an open circuit.
38 for i := 0; i < 10; i++ {
39 if _, err := e(context.Background(), struct{}{}); err.Error() != openCircuitError {
40 t.Fatalf("want %q, have %q", openCircuitError, err.Error())
41 }
42 }
43
44 // Make sure none of those got through.
45 if want, have := thru, m.thru; want != have {
46 t.Errorf("want %d, have %d", want, have)
47 }
48 }
49
50 type mock struct {
51 thru int
52 err error
53 }
54
55 func (m *mock) endpoint(context.Context, interface{}) (interface{}, error) {
56 m.thru++
57 return struct{}{}, m.err
58 }
0 # package loadbalancer
1
2 `package loadbalancer` provides a client-side load balancer abstraction.
3
4 A publisher is responsible for emitting the most recent set of endpoints for a
5 single logical service. Publishers exist for static endpoints, and endpoints
6 discovered via periodic DNS SRV lookups on a single logical name. Consul and
7 etcd publishers are planned.
8
9 Different load balancing strategies are implemented on top of publishers. Go
10 kit currently provides random and round-robin semantics. Smarter behaviors,
11 e.g. load balancing based on underlying endpoint priority/weight, is planned.
12
13 ## Rationale
14
15 TODO
16
17 ## Usage
18
19 In your client, define a publisher, wrap it with a balancing strategy, and pass
20 it to a retry strategy, which returns an endpoint. Use that endpoint to make
21 requests, or wrap it with other value-add middleware.
22
23 ```go
24 func main() {
25 var (
26 fooPublisher = loadbalancer.NewDNSSRVPublisher("foo.mynet.local", 5*time.Second, makeEndpoint)
27 fooBalancer = loadbalancer.RoundRobin(mysvcPublisher)
28 fooEndpoint = loadbalancer.Retry(3, time.Second, fooBalancer)
29 )
30 http.HandleFunc("/", handle(fooEndpoint))
31 log.Fatal(http.ListenAndServe(":8080", nil))
32 }
33
34 func makeEndpoint(hostport string) endpoint.Endpoint {
35 // Convert a host:port to a endpoint via your defined transport.
36 }
37
38 func handle(foo endpoint.Endpoint) http.HandlerFunc {
39 return func(w http.ResponseWriter, r *http.Request) {
40 // foo is usable as a load-balanced remote endpoint.
41 }
42 }
43 ```
0 package loadbalancer
1
2 import "github.com/go-kit/kit/endpoint"
3
4 type cache struct {
5 req chan []endpoint.Endpoint
6 cnt chan int
7 quit chan struct{}
8 }
9
10 func newCache(p Publisher) *cache {
11 c := &cache{
12 req: make(chan []endpoint.Endpoint),
13 cnt: make(chan int),
14 quit: make(chan struct{}),
15 }
16 go c.loop(p)
17 return c
18 }
19
20 func (c *cache) loop(p Publisher) {
21 e := make(chan []endpoint.Endpoint, 1)
22 p.Subscribe(e)
23 defer p.Unsubscribe(e)
24 endpoints := <-e
25 for {
26 select {
27 case endpoints = <-e:
28 case c.cnt <- len(endpoints):
29 case c.req <- endpoints:
30 case <-c.quit:
31 return
32 }
33 }
34 }
35
36 func (c *cache) count() int {
37 return <-c.cnt
38 }
39
40 func (c *cache) get() []endpoint.Endpoint {
41 return <-c.req
42 }
43
44 func (c *cache) stop() {
45 close(c.quit)
46 }
0 package loadbalancer
1
2 import (
3 "runtime"
4 "testing"
5
6 "golang.org/x/net/context"
7
8 "github.com/go-kit/kit/endpoint"
9 )
10
11 func TestCache(t *testing.T) {
12 e := func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil }
13 endpoints := []endpoint.Endpoint{e}
14
15 p := NewStaticPublisher(endpoints)
16 defer p.Stop()
17
18 c := newCache(p)
19 defer c.stop()
20
21 for _, n := range []int{2, 10, 0} {
22 endpoints = make([]endpoint.Endpoint, n)
23 for i := 0; i < n; i++ {
24 endpoints[i] = e
25 }
26 p.Replace(endpoints)
27 runtime.Gosched()
28 if want, have := len(endpoints), len(c.get()); want != have {
29 t.Errorf("want %d, have %d", want, have)
30 }
31 }
32 }
0 package loadbalancer
1
2 import (
3 "crypto/md5"
4 "fmt"
5 "net"
6 "sort"
7 "time"
8
9 "github.com/go-kit/kit/endpoint"
10 )
11
12 type dnssrvPublisher struct {
13 subscribe chan chan<- []endpoint.Endpoint
14 unsubscribe chan chan<- []endpoint.Endpoint
15 quit chan struct{}
16 }
17
18 // NewDNSSRVPublisher returns a publisher that resolves the SRV name every ttl, and
19 func NewDNSSRVPublisher(name string, ttl time.Duration, makeEndpoint func(hostport string) endpoint.Endpoint) Publisher {
20 p := &dnssrvPublisher{
21 subscribe: make(chan chan<- []endpoint.Endpoint),
22 unsubscribe: make(chan chan<- []endpoint.Endpoint),
23 quit: make(chan struct{}),
24 }
25 go p.loop(name, ttl, makeEndpoint)
26 return p
27 }
28
29 func (p *dnssrvPublisher) Subscribe(c chan<- []endpoint.Endpoint) {
30 p.subscribe <- c
31 }
32
33 func (p *dnssrvPublisher) Unsubscribe(c chan<- []endpoint.Endpoint) {
34 p.unsubscribe <- c
35 }
36
37 func (p *dnssrvPublisher) Stop() {
38 close(p.quit)
39 }
40
41 var newTicker = time.NewTicker
42
43 func (p *dnssrvPublisher) loop(name string, ttl time.Duration, makeEndpoint func(hostport string) endpoint.Endpoint) {
44 var (
45 subscriptions = map[chan<- []endpoint.Endpoint]struct{}{}
46 addrs, md5, _ = resolve(name)
47 endpoints = convert(addrs, makeEndpoint)
48 ticker = newTicker(ttl)
49 )
50 defer ticker.Stop()
51 for {
52 select {
53 case <-ticker.C:
54 addrs, newmd5, err := resolve(name)
55 if err == nil && newmd5 != md5 {
56 endpoints = convert(addrs, makeEndpoint)
57 for c := range subscriptions {
58 c <- endpoints
59 }
60 md5 = newmd5
61 }
62
63 case c := <-p.subscribe:
64 subscriptions[c] = struct{}{}
65 c <- endpoints
66
67 case c := <-p.unsubscribe:
68 delete(subscriptions, c)
69
70 case <-p.quit:
71 return
72 }
73 }
74 }
75
76 // Allow mocking in tests.
77 var resolve = func(name string) (addrs []*net.SRV, md5sum string, err error) {
78 _, addrs, err = net.LookupSRV("", "", name)
79 if err != nil {
80 return addrs, "", err
81 }
82 hostports := make([]string, len(addrs))
83 for i, addr := range addrs {
84 hostports[i] = fmt.Sprintf("%s:%d", addr.Target, addr.Port)
85 }
86 sort.Sort(sort.StringSlice(hostports))
87 h := md5.New()
88 for _, hostport := range hostports {
89 fmt.Fprintf(h, hostport)
90 }
91 return addrs, fmt.Sprintf("%x", h.Sum(nil)), nil
92 }
93
94 func convert(addrs []*net.SRV, makeEndpoint func(hostport string) endpoint.Endpoint) []endpoint.Endpoint {
95 endpoints := make([]endpoint.Endpoint, len(addrs))
96 for i, addr := range addrs {
97 endpoints[i] = makeEndpoint(addr2hostport(addr))
98 }
99 return endpoints
100 }
101
102 func addr2hostport(addr *net.SRV) string {
103 return net.JoinHostPort(addr.Target, fmt.Sprintf("%d", addr.Port))
104 }
0 package loadbalancer
1
2 import (
3 "fmt"
4 "net"
5 "testing"
6 "time"
7
8 "golang.org/x/net/context"
9
10 "github.com/go-kit/kit/endpoint"
11 )
12
13 func TestDNSSRVPublisher(t *testing.T) {
14 // Reset the vars when we're done
15 oldResolve := resolve
16 defer func() { resolve = oldResolve }()
17 oldNewTicker := newTicker
18 defer func() { newTicker = oldNewTicker }()
19
20 // Set up a fixture and swap the vars
21 a := []*net.SRV{
22 {Target: "foo", Port: 123},
23 {Target: "bar", Port: 456},
24 {Target: "baz", Port: 789},
25 }
26 ticker := make(chan time.Time)
27 resolve = func(string) ([]*net.SRV, string, error) { return a, fmt.Sprint(len(a)), nil }
28 newTicker = func(time.Duration) *time.Ticker { return &time.Ticker{C: ticker} }
29
30 // Construct endpoint
31 m := map[string]int{}
32 e := func(hostport string) endpoint.Endpoint {
33 return func(context.Context, interface{}) (interface{}, error) {
34 m[hostport]++
35 return struct{}{}, nil
36 }
37 }
38
39 // Build the publisher
40 var (
41 name = "irrelevant"
42 ttl = time.Second
43 makeEndpoint = func(hostport string) endpoint.Endpoint { return e(hostport) }
44 )
45 p := NewDNSSRVPublisher(name, ttl, makeEndpoint)
46 defer p.Stop()
47
48 // Subscribe
49 c := make(chan []endpoint.Endpoint, 1)
50 p.Subscribe(c)
51 defer p.Unsubscribe(c)
52
53 // Invoke all of the endpoints
54 for _, e := range <-c {
55 e(context.Background(), struct{}{})
56 }
57
58 // Make sure we invoked what we expected to
59 for _, addr := range a {
60 hostport := addr2hostport(addr)
61 if want, have := 1, m[hostport]; want != have {
62 t.Errorf("%q: want %d, have %d", name, want, have)
63 }
64 delete(m, hostport)
65 }
66 if want, have := 0, len(m); want != have {
67 t.Errorf("want %d, have %d", want, have)
68 }
69
70 // Reset the fixture, trigger the timer, count the endpoints
71 a = []*net.SRV{}
72 ticker <- time.Now()
73 if want, have := len(a), len(<-c); want != have {
74 t.Errorf("want %d, have %d", want, have)
75 }
76 }
0 package loadbalancer
1
2 import "github.com/go-kit/kit/endpoint"
3
4 type endpointCache struct {
5 requests chan []endpoint.Endpoint
6 quit chan struct{}
7 }
8
9 func newEndpointCache(p Publisher) *endpointCache {
10 c := &endpointCache{
11 requests: make(chan []endpoint.Endpoint),
12 quit: make(chan struct{}),
13 }
14 go c.loop(p)
15 return c
16 }
17
18 func (c *endpointCache) loop(p Publisher) {
19 updates := make(chan []endpoint.Endpoint, 1)
20 p.Subscribe(updates)
21 defer p.Unsubscribe(updates)
22 endpoints := <-updates
23
24 for {
25 select {
26 case endpoints = <-updates:
27 case c.requests <- endpoints:
28 case <-c.quit:
29 return
30 }
31 }
32 }
33
34 func (c *endpointCache) get() []endpoint.Endpoint {
35 return <-c.requests
36 }
37
38 func (c *endpointCache) stop() {
39 close(c.quit)
40 }
0 package loadbalancer
1
2 import (
3 "testing"
4
5 "golang.org/x/net/context"
6
7 "github.com/go-kit/kit/endpoint"
8 )
9
10 func TestEndpointCache(t *testing.T) {
11 endpoints := []endpoint.Endpoint{
12 func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil },
13 func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil },
14 func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil },
15 }
16
17 p := NewStaticPublisher(endpoints)
18 defer p.Stop()
19
20 c := newEndpointCache(p)
21 defer c.stop()
22
23 if want, have := len(endpoints), len(c.get()); want != have {
24 t.Errorf("want %d, have %d", want, have)
25 }
26 }
0 package loadbalancer
1
2 import (
3 "errors"
4
5 "github.com/go-kit/kit/endpoint"
6 )
7
8 // LoadBalancer yields endpoints one-by-one.
9 type LoadBalancer interface {
10 Count() int
11 Get() (endpoint.Endpoint, error)
12 }
13
14 // ErrNoEndpointsAvailable is given by a load balancer when no endpoints are
15 // available to be returned.
16 var ErrNoEndpointsAvailable = errors.New("no endpoints available")
0 package loadbalancer_test
1
2 import (
3 "runtime"
4 "sync"
5
6 "github.com/go-kit/kit/endpoint"
7 )
8
9 type mockPublisher struct {
10 sync.Mutex
11 e []endpoint.Endpoint
12 s map[chan<- []endpoint.Endpoint]struct{}
13 }
14
15 func newMockPublisher(endpoints []endpoint.Endpoint) *mockPublisher {
16 return &mockPublisher{
17 e: endpoints,
18 s: map[chan<- []endpoint.Endpoint]struct{}{},
19 }
20 }
21
22 func (p *mockPublisher) replace(endpoints []endpoint.Endpoint) {
23 p.Lock()
24 defer p.Unlock()
25 p.e = endpoints
26 for s := range p.s {
27 s <- p.e
28 }
29 runtime.Gosched()
30 }
31
32 func (p *mockPublisher) Subscribe(c chan<- []endpoint.Endpoint) {
33 p.Lock()
34 defer p.Unlock()
35 p.s[c] = struct{}{}
36 c <- p.e
37 }
38
39 func (p *mockPublisher) Unsubscribe(c chan<- []endpoint.Endpoint) {
40 p.Lock()
41 defer p.Unlock()
42 delete(p.s, c)
43 }
44
45 func (p *mockPublisher) Stop() {}
0 package loadbalancer
1
2 import "github.com/go-kit/kit/endpoint"
3
4 // Publisher produces endpoints.
5 type Publisher interface {
6 Subscribe(chan<- []endpoint.Endpoint)
7 Unsubscribe(chan<- []endpoint.Endpoint)
8 Stop()
9 }
0 package loadbalancer
1
2 import (
3 "math/rand"
4
5 "github.com/go-kit/kit/endpoint"
6 )
7
8 // Random returns a load balancer that yields random endpoints.
9 func Random(p Publisher) LoadBalancer {
10 return random{newCache(p)}
11 }
12
13 type random struct{ *cache }
14
15 func (r random) Count() int { return r.cache.count() }
16
17 func (r random) Get() (endpoint.Endpoint, error) {
18 endpoints := r.cache.get()
19 if len(endpoints) <= 0 {
20 return nil, ErrNoEndpointsAvailable
21 }
22 return endpoints[rand.Intn(len(endpoints))], nil
23 }
0 package loadbalancer_test
1
2 import (
3 "math"
4 "testing"
5
6 "github.com/go-kit/kit/endpoint"
7 "github.com/go-kit/kit/loadbalancer"
8 "golang.org/x/net/context"
9 )
10
11 func TestRandom(t *testing.T) {
12 p := loadbalancer.NewStaticPublisher([]endpoint.Endpoint{})
13 defer p.Stop()
14
15 lb := loadbalancer.Random(p)
16 if _, err := lb.Get(); err == nil {
17 t.Error("want error, got none")
18 }
19
20 counts := []int{0, 0, 0}
21 p.Replace([]endpoint.Endpoint{
22 func(context.Context, interface{}) (interface{}, error) { counts[0]++; return struct{}{}, nil },
23 func(context.Context, interface{}) (interface{}, error) { counts[1]++; return struct{}{}, nil },
24 func(context.Context, interface{}) (interface{}, error) { counts[2]++; return struct{}{}, nil },
25 })
26 assertLoadBalancerNotEmpty(t, lb)
27
28 n := 10000
29 for i := 0; i < n; i++ {
30 e, _ := lb.Get()
31 e(context.Background(), struct{}{})
32 }
33
34 want := float64(n) / float64(len(counts))
35 tolerance := (want / 100.0) * 5 // 5%
36 for _, have := range counts {
37 if math.Abs(want-float64(have)) > tolerance {
38 t.Errorf("want %.0f, have %d", want, have)
39 }
40 }
41 }
0 package loadbalancer
1
2 import (
3 "fmt"
4 "strings"
5 "time"
6
7 "golang.org/x/net/context"
8
9 "github.com/go-kit/kit/endpoint"
10 )
11
12 // Retry yields an endpoint that takes endpoints from the load balancer.
13 // Invocations that return errors will be retried until they succeed, up to
14 // max times, or until the timeout is elapsed, whichever comes first.
15 func Retry(max int, timeout time.Duration, lb LoadBalancer) endpoint.Endpoint {
16 return func(ctx context.Context, request interface{}) (interface{}, error) {
17 var (
18 newctx, cancel = context.WithTimeout(ctx, timeout)
19 responses = make(chan interface{}, 1)
20 errs = make(chan error, 1)
21 a = []string{}
22 )
23 defer cancel()
24 for i := 1; i <= max; i++ {
25 go func() {
26 e, err := lb.Get()
27 if err != nil {
28 errs <- err
29 return
30 }
31 response, err := e(newctx, request)
32 if err != nil {
33 errs <- err
34 return
35 }
36 responses <- response
37 }()
38
39 select {
40 case <-newctx.Done():
41 return nil, newctx.Err()
42 case response := <-responses:
43 return response, nil
44 case err := <-errs:
45 a = append(a, err.Error())
46 continue
47 }
48 }
49 return nil, fmt.Errorf("retry attempts exceeded (%s)", strings.Join(a, "; "))
50 }
51 }
0 package loadbalancer_test
1
2 import (
3 "errors"
4 "time"
5
6 "github.com/go-kit/kit/endpoint"
7 "github.com/go-kit/kit/loadbalancer"
8 "golang.org/x/net/context"
9
10 "testing"
11 )
12
13 func TestRetryMax(t *testing.T) {
14 var (
15 endpoints = []endpoint.Endpoint{}
16 p = loadbalancer.NewStaticPublisher(endpoints)
17 lb = loadbalancer.RoundRobin(p)
18 )
19
20 if _, err := loadbalancer.Retry(999, time.Second, lb)(context.Background(), struct{}{}); err == nil {
21 t.Errorf("expected error, got none")
22 }
23
24 endpoints = []endpoint.Endpoint{
25 func(context.Context, interface{}) (interface{}, error) { return nil, errors.New("error one") },
26 func(context.Context, interface{}) (interface{}, error) { return nil, errors.New("error two") },
27 func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil /* OK */ },
28 }
29 p.Replace(endpoints)
30 assertLoadBalancerNotEmpty(t, lb)
31
32 if _, err := loadbalancer.Retry(len(endpoints)-1, time.Second, lb)(context.Background(), struct{}{}); err == nil {
33 t.Errorf("expected error, got none")
34 }
35
36 if _, err := loadbalancer.Retry(len(endpoints), time.Second, lb)(context.Background(), struct{}{}); err != nil {
37 t.Error(err)
38 }
39 }
40
41 func TestRetryTimeout(t *testing.T) {
42 var (
43 step = make(chan struct{})
44 e = func(context.Context, interface{}) (interface{}, error) { <-step; return struct{}{}, nil }
45 timeout = time.Millisecond
46 retry = loadbalancer.Retry(999, timeout, loadbalancer.RoundRobin(loadbalancer.NewStaticPublisher([]endpoint.Endpoint{e})))
47 errs = make(chan error)
48 invoke = func() { _, err := retry(context.Background(), struct{}{}); errs <- err }
49 )
50
51 go invoke() // invoke the endpoint
52 step <- struct{}{} // tell the endpoint to return
53 if err := <-errs; err != nil { // that should succeed
54 t.Error(err)
55 }
56
57 go invoke() // invoke the endpoint
58 time.Sleep(2 * timeout) // wait
59 time.Sleep(2 * timeout) // wait again (CI servers!!)
60 step <- struct{}{} // tell the endpoint to return
61 if err := <-errs; err != context.DeadlineExceeded { // that should not succeed
62 t.Errorf("wanted error, got none")
63 }
64 }
0 package loadbalancer
1
2 import (
3 "sync/atomic"
4
5 "github.com/go-kit/kit/endpoint"
6 )
7
8 // RoundRobin returns a load balancer that yields endpoints in sequence.
9 func RoundRobin(p Publisher) LoadBalancer {
10 return &roundRobin{newCache(p), 0}
11 }
12
13 type roundRobin struct {
14 *cache
15 uint64
16 }
17
18 func (r *roundRobin) Count() int { return r.cache.count() }
19
20 func (r *roundRobin) Get() (endpoint.Endpoint, error) {
21 endpoints := r.cache.get()
22 if len(endpoints) <= 0 {
23 return nil, ErrNoEndpointsAvailable
24 }
25 var old uint64
26 for {
27 old = atomic.LoadUint64(&r.uint64)
28 if atomic.CompareAndSwapUint64(&r.uint64, old, old+1) {
29 break
30 }
31 }
32 return endpoints[old%uint64(len(endpoints))], nil
33 }
0 package loadbalancer_test
1
2 import (
3 "reflect"
4 "testing"
5
6 "github.com/go-kit/kit/endpoint"
7 "github.com/go-kit/kit/loadbalancer"
8 "golang.org/x/net/context"
9 )
10
11 func TestRoundRobin(t *testing.T) {
12 p := loadbalancer.NewStaticPublisher([]endpoint.Endpoint{})
13 defer p.Stop()
14
15 lb := loadbalancer.RoundRobin(p)
16 if _, err := lb.Get(); err == nil {
17 t.Error("want error, got none")
18 }
19
20 counts := []int{0, 0, 0}
21 p.Replace([]endpoint.Endpoint{
22 func(context.Context, interface{}) (interface{}, error) { counts[0]++; return struct{}{}, nil },
23 func(context.Context, interface{}) (interface{}, error) { counts[1]++; return struct{}{}, nil },
24 func(context.Context, interface{}) (interface{}, error) { counts[2]++; return struct{}{}, nil },
25 })
26 assertLoadBalancerNotEmpty(t, lb)
27
28 for i, want := range [][]int{
29 {1, 0, 0},
30 {1, 1, 0},
31 {1, 1, 1},
32 {2, 1, 1},
33 {2, 2, 1},
34 {2, 2, 2},
35 {3, 2, 2},
36 } {
37 e, _ := lb.Get()
38 e(context.Background(), struct{}{})
39 if have := counts; !reflect.DeepEqual(want, have) {
40 t.Errorf("%d: want %v, have %v", i+1, want, have)
41 }
42 }
43 }
0 package loadbalancer
1
2 import (
3 "sync"
4
5 "github.com/go-kit/kit/endpoint"
6 )
7
8 // NewStaticPublisher returns a publisher that yields a static set of
9 // endpoints, which can be completely replaced.
10 func NewStaticPublisher(endpoints []endpoint.Endpoint) *StaticPublisher {
11 return &StaticPublisher{
12 current: endpoints,
13 subscribers: map[chan<- []endpoint.Endpoint]struct{}{},
14 }
15 }
16
17 // StaticPublisher holds a static set of endpoints.
18 type StaticPublisher struct {
19 mu sync.Mutex
20 current []endpoint.Endpoint
21 subscribers map[chan<- []endpoint.Endpoint]struct{}
22 }
23
24 // Subscribe implements Publisher.
25 func (p *StaticPublisher) Subscribe(c chan<- []endpoint.Endpoint) {
26 p.mu.Lock()
27 defer p.mu.Unlock()
28 p.subscribers[c] = struct{}{}
29 c <- p.current
30 }
31
32 // Unsubscribe implements Publisher.
33 func (p *StaticPublisher) Unsubscribe(c chan<- []endpoint.Endpoint) {
34 p.mu.Lock()
35 defer p.mu.Unlock()
36 delete(p.subscribers, c)
37 }
38
39 // Stop implements Publisher, but is a no-op.
40 func (p *StaticPublisher) Stop() {}
41
42 // Replace replaces the endpoints and notifies all subscribers.
43 func (p *StaticPublisher) Replace(endpoints []endpoint.Endpoint) {
44 p.mu.Lock()
45 defer p.mu.Unlock()
46 p.current = endpoints
47 for c := range p.subscribers {
48 c <- p.current
49 }
50 }
0 package loadbalancer_test
1
2 import (
3 "testing"
4
5 "golang.org/x/net/context"
6
7 "github.com/go-kit/kit/endpoint"
8 "github.com/go-kit/kit/loadbalancer"
9 )
10
11 func TestStaticPublisher(t *testing.T) {
12 endpoints := []endpoint.Endpoint{
13 func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil },
14 }
15 p := loadbalancer.NewStaticPublisher(endpoints)
16 defer p.Stop()
17
18 c := make(chan []endpoint.Endpoint, 1)
19 p.Subscribe(c)
20 if want, have := len(endpoints), len(<-c); want != have {
21 t.Errorf("want %d, have %d", want, have)
22 }
23
24 endpoints = []endpoint.Endpoint{}
25 p.Replace(endpoints)
26 if want, have := len(endpoints), len(<-c); want != have {
27 t.Errorf("want %d, have %d", want, have)
28 }
29 }
0 package loadbalancer
1
2 import (
3 "errors"
4
5 "github.com/go-kit/kit/endpoint"
6 )
7
8 // Strategy yields endpoints to consumers according to some algorithm.
9 type Strategy interface {
10 Next() (endpoint.Endpoint, error)
11 Stop()
12 }
13
14 // ErrNoEndpoints is returned by a strategy when there are no endpoints
15 // available.
16 var ErrNoEndpoints = errors.New("no endpoints available")
0 package loadbalancer_test
1
2 import (
3 "fmt"
4 "testing"
5 "time"
6
7 "github.com/go-kit/kit/loadbalancer"
8 )
9
10 func assertLoadBalancerNotEmpty(t *testing.T, lb loadbalancer.LoadBalancer) {
11 if err := within(10*time.Millisecond, func() bool {
12 return lb.Count() > 0
13 }); err != nil {
14 t.Fatal("Publisher never updated endpoints")
15 }
16 }
17
18 func within(d time.Duration, f func() bool) error {
19 var (
20 deadline = time.After(d)
21 ticker = time.NewTicker(d / 10)
22 )
23 defer ticker.Stop()
24 for {
25 select {
26 case <-ticker.C:
27 if f() {
28 return nil
29 }
30 case <-deadline:
31 return fmt.Errorf("deadline exceeded")
32 }
33 }
34 }
66 )
77
88 func benchmarkRunner(b *testing.B, logger log.Logger, f func(log.Logger)) {
9 logger = log.With(logger, "common_key", "common_value")
9 lc := log.NewContext(logger).With("common_key", "common_value")
1010 b.ReportAllocs()
1111 b.ResetTimer()
1212 for i := 0; i < b.N; i++ {
13 f(logger)
13 f(lc)
1414 }
1515 }
1616
1717 var (
1818 baseMessage = func(logger log.Logger) { logger.Log("foo_key", "foo_value") }
19 withMessage = func(logger log.Logger) { log.With(logger, "a", "b").Log("c", "d") }
19 withMessage = func(logger log.Logger) { log.NewContext(logger).With("a", "b").Log("c", "d") }
2020 )
0 package log_test
1
2 import (
3 "os"
4
5 "github.com/go-kit/kit/log"
6 )
7
8 func ExampleContext() {
9 logger := log.NewLogfmtLogger(os.Stdout)
10 logger.Log("foo", 123)
11 ctx := log.NewContext(logger).With("level", "info")
12 ctx.Log()
13 ctx = ctx.With("msg", "hello")
14 ctx.Log()
15 ctx.With("a", 1).Log("b", 2)
16
17 // Output:
18 // foo=123
19 // level=info
20 // level=info msg=hello
21 // level=info msg=hello a=1 b=2
22 }
0 package levels
1
2 import "github.com/go-kit/kit/log"
3
4 // Levels provides a leveled logging wrapper around a logger. It has five
5 // levels: debug, info, warning (warn), error, and critical (crit). If you
6 // want a different set of levels, you can create your own levels type very
7 // easily, and you can elide the configuration.
8 type Levels struct {
9 ctx log.Context
10 levelKey string
11
12 // We have a choice between storing level values in string fields or
13 // making a separate context for each level. When using string fields the
14 // Log method must combine the base context, the level data, and the
15 // logged keyvals; but the With method only requires updating one context.
16 // If we instead keep a separate context for each level the Log method
17 // must only append the new keyvals; but the With method would have to
18 // update all five contexts.
19
20 // Roughly speaking, storing multiple contexts breaks even if the ratio of
21 // Log/With calls is more than the number of levels. We have chosen to
22 // make the With method cheap and the Log method a bit more costly because
23 // we do not expect most applications to Log more than five times for each
24 // call to With.
25
26 debugValue string
27 infoValue string
28 warnValue string
29 errorValue string
30 critValue string
31 }
32
33 // New creates a new leveled logger, wrapping the passed logger.
34 func New(logger log.Logger, options ...Option) Levels {
35 l := Levels{
36 ctx: log.NewContext(logger),
37 levelKey: "level",
38
39 debugValue: "debug",
40 infoValue: "info",
41 warnValue: "warn",
42 errorValue: "error",
43 critValue: "crit",
44 }
45 for _, option := range options {
46 option(&l)
47 }
48 return l
49 }
50
51 // With returns a new leveled logger that includes keyvals in all log events.
52 func (l Levels) With(keyvals ...interface{}) Levels {
53 return Levels{
54 ctx: l.ctx.With(keyvals...),
55 levelKey: l.levelKey,
56 debugValue: l.debugValue,
57 infoValue: l.infoValue,
58 warnValue: l.warnValue,
59 errorValue: l.errorValue,
60 critValue: l.critValue,
61 }
62 }
63
64 // Debug logs a debug event along with keyvals.
65 func (l Levels) Debug(keyvals ...interface{}) error {
66 return l.ctx.WithPrefix(l.levelKey, l.debugValue).Log(keyvals...)
67 }
68
69 // Info logs an info event along with keyvals.
70 func (l Levels) Info(keyvals ...interface{}) error {
71 return l.ctx.WithPrefix(l.levelKey, l.infoValue).Log(keyvals...)
72 }
73
74 // Warn logs a warn event along with keyvals.
75 func (l Levels) Warn(keyvals ...interface{}) error {
76 return l.ctx.WithPrefix(l.levelKey, l.warnValue).Log(keyvals...)
77 }
78
79 // Error logs an error event along with keyvals.
80 func (l Levels) Error(keyvals ...interface{}) error {
81 return l.ctx.WithPrefix(l.levelKey, l.errorValue).Log(keyvals...)
82 }
83
84 // Crit logs a crit event along with keyvals.
85 func (l Levels) Crit(keyvals ...interface{}) error {
86 return l.ctx.WithPrefix(l.levelKey, l.critValue).Log(keyvals...)
87 }
88
89 // Option sets a parameter for leveled loggers.
90 type Option func(*Levels)
91
92 // Key sets the key for the field used to indicate log level. By default,
93 // the key is "level".
94 func Key(key string) Option {
95 return func(l *Levels) { l.levelKey = key }
96 }
97
98 // DebugValue sets the value for the field used to indicate the debug log
99 // level. By default, the value is "debug".
100 func DebugValue(value string) Option {
101 return func(l *Levels) { l.debugValue = value }
102 }
103
104 // InfoValue sets the value for the field used to indicate the info log level.
105 // By default, the value is "info".
106 func InfoValue(value string) Option {
107 return func(l *Levels) { l.infoValue = value }
108 }
109
110 // WarnValue sets the value for the field used to indicate the warning log
111 // level. By default, the value is "warn".
112 func WarnValue(value string) Option {
113 return func(l *Levels) { l.warnValue = value }
114 }
115
116 // ErrorValue sets the value for the field used to indicate the error log
117 // level. By default, the value is "error".
118 func ErrorValue(value string) Option {
119 return func(l *Levels) { l.errorValue = value }
120 }
121
122 // CritValue sets the value for the field used to indicate the critical log
123 // level. By default, the value is "crit".
124 func CritValue(value string) Option {
125 return func(l *Levels) { l.critValue = value }
126 }
0 package levels_test
1
2 import (
3 "bytes"
4 "os"
5 "testing"
6
7 "github.com/go-kit/kit/log"
8 "github.com/go-kit/kit/log/levels"
9 )
10
11 func TestDefaultLevels(t *testing.T) {
12 buf := bytes.Buffer{}
13 logger := levels.New(log.NewLogfmtLogger(&buf))
14
15 logger.Debug("msg", "résumé") // of course you'd want to do this
16 if want, have := "level=debug msg=résumé\n", buf.String(); want != have {
17 t.Errorf("want %#v, have %#v", want, have)
18 }
19
20 buf.Reset()
21 logger.Info("msg", "Åhus")
22 if want, have := "level=info msg=Åhus\n", buf.String(); want != have {
23 t.Errorf("want %#v, have %#v", want, have)
24 }
25
26 buf.Reset()
27 logger.Error("msg", "© violation")
28 if want, have := "level=error msg=\"© violation\"\n", buf.String(); want != have {
29 t.Errorf("want %#v, have %#v", want, have)
30 }
31 }
32
33 func TestModifiedLevels(t *testing.T) {
34 buf := bytes.Buffer{}
35 logger := levels.New(
36 log.NewJSONLogger(&buf),
37 levels.Key("l"),
38 levels.DebugValue("dbg"),
39 )
40 logger.With("easter_island", "176°").Debug("msg", "moai")
41 if want, have := `{"easter_island":"176°","l":"dbg","msg":"moai"}`+"\n", buf.String(); want != have {
42 t.Errorf("want %#v, have %#v", want, have)
43 }
44 }
45
46 func ExampleLevels() {
47 logger := levels.New(log.NewLogfmtLogger(os.Stdout))
48 logger.Debug("msg", "hello")
49 logger.With("context", "foo").Warn("err", "error")
50
51 // Output:
52 // level=debug msg=hello
53 // level=warn context=foo err=error
54 }
+0
-60
log/levels.go less more
0 package log
1
2 // Levels provides a default set of leveled loggers.
3 type Levels struct {
4 Debug Logger
5 Info Logger
6 Error Logger
7 }
8
9 type levelOptions struct {
10 levelKey string
11 debugValue string
12 infoValue string
13 errorValue string
14 }
15
16 // LevelOption sets a parameter for leveled loggers.
17 type LevelOption func(*levelOptions)
18
19 // LevelKey sets the key for the field used to indicate log level. By default,
20 // the key is "level".
21 func LevelKey(key string) LevelOption {
22 return func(o *levelOptions) { o.levelKey = key }
23 }
24
25 // DebugLevelValue sets the value for the field used to indicate the debug log
26 // level. By default, the value is "DEBUG".
27 func DebugLevelValue(value string) LevelOption {
28 return func(o *levelOptions) { o.debugValue = value }
29 }
30
31 // InfoLevelValue sets the value for the field used to indicate the debug log
32 // level. By default, the value is "INFO".
33 func InfoLevelValue(value string) LevelOption {
34 return func(o *levelOptions) { o.infoValue = value }
35 }
36
37 // ErrorLevelValue sets the value for the field used to indicate the debug log
38 // level. By default, the value is "ERROR".
39 func ErrorLevelValue(value string) LevelOption {
40 return func(o *levelOptions) { o.errorValue = value }
41 }
42
43 // NewLevels returns a new set of leveled loggers based on the base logger.
44 func NewLevels(base Logger, options ...LevelOption) Levels {
45 opts := &levelOptions{
46 levelKey: "level",
47 debugValue: "DEBUG",
48 infoValue: "INFO",
49 errorValue: "ERROR",
50 }
51 for _, option := range options {
52 option(opts)
53 }
54 return Levels{
55 Debug: With(base, opts.levelKey, opts.debugValue),
56 Info: With(base, opts.levelKey, opts.infoValue),
57 Error: With(base, opts.levelKey, opts.errorValue),
58 }
59 }
+0
-45
log/levels_test.go less more
0 package log_test
1
2 import (
3 "bytes"
4 "testing"
5
6 "github.com/go-kit/kit/log"
7 )
8
9 func TestDefaultLevels(t *testing.T) {
10 buf := bytes.Buffer{}
11 levels := log.NewLevels(log.NewLogfmtLogger(&buf))
12
13 levels.Debug.Log("msg", "👨") // of course you'd want to do this
14 if want, have := "level=DEBUG msg=👨\n", buf.String(); want != have {
15 t.Errorf("want %#v, have %#v", want, have)
16 }
17
18 buf.Reset()
19 levels.Info.Log("msg", "🚀")
20 if want, have := "level=INFO msg=🚀\n", buf.String(); want != have {
21 t.Errorf("want %#v, have %#v", want, have)
22 }
23
24 buf.Reset()
25 levels.Error.Log("msg", "🍵")
26 if want, have := "level=ERROR msg=🍵\n", buf.String(); want != have {
27 t.Errorf("want %#v, have %#v", want, have)
28 }
29 }
30
31 func TestModifiedLevels(t *testing.T) {
32 buf := bytes.Buffer{}
33 levels := log.NewLevels(
34 log.NewJSONLogger(&buf),
35 log.LevelKey("l"),
36 log.DebugLevelValue("⛄"),
37 log.InfoLevelValue("🌜"),
38 log.ErrorLevelValue("🌊"),
39 )
40 log.With(levels.Debug, "easter_island", "🗿").Log("msg", "💃💃💃")
41 if want, have := `{"easter_island":"🗿","l":"⛄","msg":"💃💃💃"}`+"\n", buf.String(); want != have {
42 t.Errorf("want %#v, have %#v", want, have)
43 }
44 }
55
66 import "sync/atomic"
77
8 // Logger is the fundamental interface for all log operations. Implementations
9 // must be safe for concurrent use by multiple goroutines. Log creates a log
10 // event from keyvals, a variadic sequence of alternating keys and values.
8 // Logger is the fundamental interface for all log operations. Log creates a
9 // log event from keyvals, a variadic sequence of alternating keys and values.
10 // Implementations must be safe for concurrent use by multiple goroutines. In
11 // particular, any implementation of Logger that appends to keyvals or
12 // modifies any of its elements must make a copy first.
1113 type Logger interface {
1214 Log(keyvals ...interface{}) error
1315 }
1416
15 // With returns a new Logger that includes keyvals in all log events. The
16 // returned Logger replaces all value elements (odd indexes) containing a
17 // Valuer with their generated value for each call to its Log method.
18 func With(logger Logger, keyvals ...interface{}) Logger {
19 w, ok := logger.(*withLogger)
20 if !ok {
21 w = &withLogger{logger: logger}
17 // NewContext returns a new Context that logs to logger.
18 func NewContext(logger Logger) Context {
19 if c, ok := logger.(Context); ok {
20 return c
2221 }
23 return w.with(keyvals...)
22 return Context{logger: logger}
2423 }
2524
26 type withLogger struct {
25 // A Context wraps a Logger and holds keyvals that it includes in all log
26 // events. When logging, a Context replaces all value elements (odd indexes)
27 // containing a Valuer with their generated value for each call to its Log
28 // method.
29 type Context struct {
2730 logger Logger
2831 keyvals []interface{}
2932 hasValuer bool
3033 }
3134
32 func (l *withLogger) Log(keyvals ...interface{}) error {
35 // Log replaces all value elements (odd indexes) containing a Valuer in the
36 // stored context with their generated value, appends keyvals, and passes the
37 // result to the wrapped Logger.
38 func (l Context) Log(keyvals ...interface{}) error {
39 if len(keyvals)%2 != 0 {
40 panic("bad keyvals")
41 }
3342 kvs := append(l.keyvals, keyvals...)
3443 if l.hasValuer {
44 // If no keyvals were appended above then we must copy l.keyvals so
45 // that future log events will reevaluate the stored Valuers.
46 if len(keyvals) == 0 {
47 kvs = append([]interface{}{}, l.keyvals...)
48 }
3549 bindValues(kvs[:len(l.keyvals)])
3650 }
3751 return l.logger.Log(kvs...)
3852 }
3953
40 func (l *withLogger) with(keyvals ...interface{}) Logger {
54 // With returns a new Context with keyvals appended to those of the receiver.
55 func (l Context) With(keyvals ...interface{}) Context {
56 if len(keyvals) == 0 {
57 return l
58 }
59 if len(keyvals)%2 != 0 {
60 panic("bad keyvals")
61 }
4162 // Limiting the capacity of the stored keyvals ensures that a new
4263 // backing array is created if the slice must grow in Log or With.
4364 // Using the extra capacity without copying risks a data race that
4465 // would violate the Logger interface contract.
4566 n := len(l.keyvals) + len(keyvals)
46 return &withLogger{
67 return Context{
4768 logger: l.logger,
4869 keyvals: append(l.keyvals, keyvals...)[:n:n],
70 hasValuer: l.hasValuer || containsValuer(keyvals),
71 }
72 }
73
74 // WithPrefix returns a new Context with keyvals prepended to those of the
75 // receiver.
76 func (l Context) WithPrefix(keyvals ...interface{}) Context {
77 if len(keyvals) == 0 {
78 return l
79 }
80 if len(keyvals)%2 != 0 {
81 panic("bad keyvals")
82 }
83 // Limiting the capacity of the stored keyvals ensures that a new
84 // backing array is created if the slice must grow in Log or With.
85 // Using the extra capacity without copying risks a data race that
86 // would violate the Logger interface contract.
87 n := len(l.keyvals) + len(keyvals)
88 kvs := make([]interface{}, 0, n)
89 kvs = append(kvs, keyvals...)
90 kvs = append(kvs, l.keyvals...)
91 return Context{
92 logger: l.logger,
93 keyvals: kvs,
4994 hasValuer: l.hasValuer || containsValuer(keyvals),
5095 }
5196 }
99
1010 var discard = log.Logger(log.LoggerFunc(func(...interface{}) error { return nil }))
1111
12 func TestWith(t *testing.T) {
12 func TestContext(t *testing.T) {
13 buf := &bytes.Buffer{}
14 logger := log.NewLogfmtLogger(buf)
15
16 kvs := []interface{}{"a", 123}
17 lc := log.NewContext(logger).With(kvs...)
18 kvs[1] = 0 // With should copy its key values
19
20 lc = lc.With("b", "c") // With should stack
21 if err := lc.Log("msg", "message"); err != nil {
22 t.Fatal(err)
23 }
24 if want, have := "a=123 b=c msg=message\n", buf.String(); want != have {
25 t.Errorf("\nwant: %shave: %s", want, have)
26 }
27
28 buf.Reset()
29 lc = lc.WithPrefix("p", "first")
30 if err := lc.Log("msg", "message"); err != nil {
31 t.Fatal(err)
32 }
33 if want, have := "p=first a=123 b=c msg=message\n", buf.String(); want != have {
34 t.Errorf("\nwant: %shave: %s", want, have)
35 }
36 }
37
38 func TestContextWithPrefix(t *testing.T) {
1339 buf := &bytes.Buffer{}
1440 kvs := []interface{}{"a", 123}
1541 logger := log.NewJSONLogger(buf)
16 logger = log.With(logger, kvs...)
17 kvs[1] = 0 // With should copy its key values
18 logger = log.With(logger, "b", "c") // With should stack
19 if err := logger.Log("msg", "message"); err != nil {
42 lc := log.NewContext(logger).With(kvs...)
43 kvs[1] = 0 // WithPrefix should copy its key values
44 lc = lc.With("b", "c") // WithPrefix should stack
45 if err := lc.Log("msg", "message"); err != nil {
2046 t.Fatal(err)
2147 }
2248 if want, have := `{"a":123,"b":"c","msg":"message"}`+"\n", buf.String(); want != have {
4470
4571 // With must be careful about handling slices that can grow without
4672 // copying the underlying array, so give it a challenge.
47 l := log.With(logger, make([]interface{}, 0, 2)...)
73 l := log.NewContext(logger).With(make([]interface{}, 0, 2)...)
4874
4975 // Start logging concurrently. Each goroutine logs its id so the logger
5076 // can bucket the event counts.
79105
80106 func BenchmarkOneWith(b *testing.B) {
81107 logger := discard
82 logger = log.With(logger, "k", "v")
108 lc := log.NewContext(logger).With("k", "v")
83109 b.ReportAllocs()
84110 b.ResetTimer()
85111 for i := 0; i < b.N; i++ {
86 logger.Log("k", "v")
112 lc.Log("k", "v")
87113 }
88114 }
89115
90116 func BenchmarkTwoWith(b *testing.B) {
91117 logger := discard
92 for i := 0; i < 2; i++ {
93 logger = log.With(logger, "k", "v")
118 lc := log.NewContext(logger).With("k", "v")
119 for i := 1; i < 2; i++ {
120 lc = lc.With("k", "v")
94121 }
95122 b.ReportAllocs()
96123 b.ResetTimer()
97124 for i := 0; i < b.N; i++ {
98 logger.Log("k", "v")
125 lc.Log("k", "v")
99126 }
100127 }
101128
102129 func BenchmarkTenWith(b *testing.B) {
103130 logger := discard
104 for i := 0; i < 10; i++ {
105 logger = log.With(logger, "k", "v")
131 lc := log.NewContext(logger).With("k", "v")
132 for i := 1; i < 10; i++ {
133 lc = lc.With("k", "v")
106134 }
107135 b.ReportAllocs()
108136 b.ResetTimer()
109137 for i := 0; i < b.N; i++ {
110 logger.Log("k", "v")
138 lc.Log("k", "v")
111139 }
112140 }
113141
77 )
88
99 // StdlibWriter implements io.Writer by invoking the stdlib log.Print. It's
10 // designed to be passed to a gokit logger as the writer, for cases where it's
11 // necessary to redirect all gokit log output to the stdlib logger.
10 // designed to be passed to a Go kit logger as the writer, for cases where
11 // it's necessary to redirect all Go kit log output to the stdlib logger.
1212 //
1313 // If you have any choice in the matter, you shouldn't use this. Prefer to
14 // redirect the stdlib log to the gokit logger via NewStdlibAdapter.
14 // redirect the stdlib log to the Go kit logger via NewStdlibAdapter.
1515 type StdlibWriter struct{}
1616
1717 // Write implements io.Writer.
55 "gopkg.in/stack.v1"
66 )
77
8 // A Valuer generates a log value. When passed to With in a value element (odd
9 // indexes), it represents a dynamic value which is re-evaluated with each log
10 // event.
8 // A Valuer generates a log value. When passed to Context.With in a value
9 // element (odd indexes), it represents a dynamic value which is re-evaluated
10 // with each log event.
1111 type Valuer func() interface{}
1212
1313 // bindValues replaces all value elements (odd indexes) containing a Valuer
2222 return now
2323 }
2424
25 logger = log.With(logger, "ts", log.Timestamp(mocktime), "caller", log.DefaultCaller)
25 lc := log.NewContext(logger).With("ts", log.Timestamp(mocktime), "caller", log.DefaultCaller)
2626
27 logger.Log("foo", "bar")
27 lc.Log("foo", "bar")
2828 timestamp, ok := output[1].(time.Time)
2929 if !ok {
3030 t.Fatalf("want time.Time, have %T", output[1])
3737 }
3838
3939 // A second attempt to confirm the bindings are truly dynamic.
40 logger.Log("foo", "bar")
40 lc.Log("foo", "bar")
4141 timestamp, ok = output[1].(time.Time)
4242 if !ok {
4343 t.Fatalf("want time.Time, have %T", output[1])
5050 }
5151 }
5252
53 func TestValueBinding_loggingZeroKeyvals(t *testing.T) {
54 var output []interface{}
55
56 logger := log.Logger(log.LoggerFunc(func(keyvals ...interface{}) error {
57 output = keyvals
58 return nil
59 }))
60
61 start := time.Date(2015, time.April, 25, 0, 0, 0, 0, time.UTC)
62 now := start
63 mocktime := func() time.Time {
64 now = now.Add(time.Second)
65 return now
66 }
67
68 logger = log.NewContext(logger).With("ts", log.Timestamp(mocktime))
69
70 logger.Log()
71 timestamp, ok := output[1].(time.Time)
72 if !ok {
73 t.Fatalf("want time.Time, have %T", output[1])
74 }
75 if want, have := start.Add(time.Second), timestamp; want != have {
76 t.Errorf("output[1]: want %v, have %v", want, have)
77 }
78
79 // A second attempt to confirm the bindings are truly dynamic.
80 logger.Log()
81 timestamp, ok = output[1].(time.Time)
82 if !ok {
83 t.Fatalf("want time.Time, have %T", output[1])
84 }
85 if want, have := start.Add(2*time.Second), timestamp; want != have {
86 t.Errorf("output[1]: want %v, have %v", want, have)
87 }
88 }
89
5390 func BenchmarkValueBindingTimestamp(b *testing.B) {
5491 logger := discard
55 logger = log.With(logger, "ts", log.DefaultTimestamp)
92 lc := log.NewContext(logger).With("ts", log.DefaultTimestamp)
5693 b.ReportAllocs()
5794 b.ResetTimer()
5895 for i := 0; i < b.N; i++ {
59 logger.Log("k", "v")
96 lc.Log("k", "v")
6097 }
6198 }
6299
63100 func BenchmarkValueBindingCaller(b *testing.B) {
64101 logger := discard
65 logger = log.With(logger, "caller", log.DefaultCaller)
102 lc := log.NewContext(logger).With("caller", log.DefaultCaller)
66103 b.ReportAllocs()
67104 b.ResetTimer()
68105 for i := 0; i < b.N; i++ {
69 logger.Log("k", "v")
106 lc.Log("k", "v")
70107 }
71108 }
5151 // handle request
5252 }
5353 ```
54
55 A gauge for the number of goroutines currently running, exported via statsd.
56 ```go
57 import (
58 "net"
59 "os"
60 "runtime"
61 "time"
62
63 "github.com/go-kit/kit/metrics/statsd"
64 )
65
66 func main() {
67 statsdWriter, err := net.Dial("udp", "127.0.0.1:8126")
68 if err != nil {
69 os.Exit(1)
70 }
71
72 reportingDuration := 5 * time.Second
73 goroutines := statsd.NewGauge(statsdWriter, "total_goroutines", reportingDuration)
74 for range time.Tick(reportingDuration) {
75 goroutines.Set(float64(runtime.NumGoroutine()))
76 }
77 }
78
79 ```
0 package ratelimit
1
2 import (
3 "errors"
4 "time"
5
6 "github.com/juju/ratelimit"
7 "golang.org/x/net/context"
8
9 "github.com/go-kit/kit/endpoint"
10 )
11
12 // ErrLimited is returned in the request path when the rate limiter is
13 // triggered and the request is rejected.
14 var ErrLimited = errors.New("rate limit exceeded")
15
16 // NewTokenBucketLimiter returns an endpoint.Middleware that acts as a rate
17 // limiter based on a token-bucket algorithm. Requests that would exceed the
18 // maximum request rate are simply rejected with an error.
19 func NewTokenBucketLimiter(tb *ratelimit.Bucket) endpoint.Middleware {
20 return func(next endpoint.Endpoint) endpoint.Endpoint {
21 return func(ctx context.Context, request interface{}) (interface{}, error) {
22 if tb.TakeAvailable(1) == 0 {
23 return nil, ErrLimited
24 }
25 return next(ctx, request)
26 }
27 }
28 }
29
30 // NewTokenBucketThrottler returns an endpoint.Middleware that acts as a
31 // request throttler based on a token-bucket algorithm. Requests that would
32 // exceed the maximum request rate are delayed via the parameterized sleep
33 // function. By default you may pass time.Sleep.
34 func NewTokenBucketThrottler(tb *ratelimit.Bucket, sleep func(time.Duration)) endpoint.Middleware {
35 return func(next endpoint.Endpoint) endpoint.Endpoint {
36 return func(ctx context.Context, request interface{}) (interface{}, error) {
37 sleep(tb.Take(1))
38 return next(ctx, request)
39 }
40 }
41 }
0 package ratelimit_test
1
2 import (
3 "math"
4 "testing"
5 "time"
6
7 jujuratelimit "github.com/juju/ratelimit"
8 "golang.org/x/net/context"
9
10 "github.com/go-kit/kit/endpoint"
11 "github.com/go-kit/kit/ratelimit"
12 )
13
14 func TestTokenBucketLimiter(t *testing.T) {
15 e := func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil }
16 for _, n := range []int{1, 2, 100} {
17 tb := jujuratelimit.NewBucketWithRate(float64(n), int64(n))
18 testLimiter(t, ratelimit.NewTokenBucketLimiter(tb)(e), n)
19 }
20 }
21
22 func TestTokenBucketThrottler(t *testing.T) {
23 d := time.Duration(0)
24 s := func(d0 time.Duration) { d = d0 }
25
26 e := func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil }
27 e = ratelimit.NewTokenBucketThrottler(jujuratelimit.NewBucketWithRate(1, 1), s)(e)
28
29 // First request should go through with no delay.
30 e(context.Background(), struct{}{})
31 if want, have := time.Duration(0), d; want != have {
32 t.Errorf("want %s, have %s", want, have)
33 }
34
35 // Next request should request a ~1s sleep.
36 e(context.Background(), struct{}{})
37 if want, have, tol := time.Second, d, time.Millisecond; math.Abs(float64(want-have)) > float64(tol) {
38 t.Errorf("want %s, have %s", want, have)
39 }
40 }
41
42 func testLimiter(t *testing.T, e endpoint.Endpoint, rate int) {
43 // First <rate> requests should succeed.
44 for i := 0; i < rate; i++ {
45 if _, err := e(context.Background(), struct{}{}); err != nil {
46 t.Fatalf("rate=%d: request %d/%d failed: %v", rate, i+1, rate, err)
47 }
48 }
49
50 // Next request should fail.
51 if _, err := e(context.Background(), struct{}{}); err != ratelimit.ErrLimited {
52 t.Errorf("rate=%d: want %v, have %v", rate, ratelimit.ErrLimited, err)
53 }
54 }
+0
-13
server/README.md less more
0 # package server
1
2 `package server` is a very small package that collects interfaces used by services.
3 Most server-side functionality is actually implemented by surrounding packages.
4
5 # Rationale
6
7 TODO
8
9 # Usage
10
11 As currently defined, you shouldn't need to use `package server` directly.
12 Other gokit components integrate on `package server` interfaces.
5353 // and submits the span to the collector. If no span is found in the context,
5454 // a new span is generated and inserted.
5555 func AnnotateServer(newSpan NewSpanFunc, c Collector) endpoint.Middleware {
56 return func(e endpoint.Endpoint) endpoint.Endpoint {
56 return func(next endpoint.Endpoint) endpoint.Endpoint {
5757 return func(ctx context.Context, request interface{}) (interface{}, error) {
5858 span, ok := fromContext(ctx)
5959 if !ok {
6262 }
6363 span.Annotate(ServerReceive)
6464 defer func() { span.Annotate(ServerSend); c.Collect(span) }()
65 return e(ctx, request)
65 return next(ctx, request)
6666 }
6767 }
6868 }
7373 // collector. If no span is found in the context, a new span is generated and
7474 // inserted.
7575 func AnnotateClient(newSpan NewSpanFunc, c Collector) endpoint.Middleware {
76 return func(e endpoint.Endpoint) endpoint.Endpoint {
76 return func(next endpoint.Endpoint) endpoint.Endpoint {
7777 return func(ctx context.Context, request interface{}) (interface{}, error) {
7878 var clientSpan *Span
7979 parentSpan, ok := fromContext(ctx)
8686 defer func() { ctx = context.WithValue(ctx, SpanContextKey, parentSpan) }() // reset
8787 clientSpan.Annotate(ClientSend)
8888 defer func() { clientSpan.Annotate(ClientReceive); c.Collect(clientSpan) }()
89 return e(ctx, request)
89 return next(ctx, request)
9090 }
9191 }
9292 }
+0
-50
transport/README.md less more
0 # package transport
1
2 `package transport` defines interfaces for service transports and codecs.
3 It also provides implementations for transports and codecs that aren't already well-defined by other packages.
4 The most common use case for `package transport` is probably to bind a gokit [server.Endpoint][] with a stdlib [http.Handler][], via gokit's [http.Binding][].
5 Refer to the [addsvc][] example service to see how to make e.g. [Thrift][] or [gRPC][] transport bindings.
6
7 [server.Endpoint]: https://godoc.org/github.com/go-kit/kit/server/#Endpoint
8 [http.Handler]: https://golang.org/pkg/net/http/#Handler
9 [http.Binding]: https://godoc.org/github.com/go-kit/kit/transport/http/#Binding
10 [addsvc]: https://github.com/go-kit/kit/blob/319c1c7129a146b541bbbaf18e2502bf32c603c5/addsvc/main.go
11 [Thrift]: https://github.com/go-kit/kit/blob/319c1c7129a146b541bbbaf18e2502bf32c603c5/addsvc/main.go#L142-192
12 [gRPC]: https://github.com/go-kit/kit/blob/319c1c7129a146b541bbbaf18e2502bf32c603c5/addsvc/main.go#L102-119
13
14 ## Rationale
15
16 TODO
17
18 ## Usage
19
20 Bind a gokit [server.Endpoint][] with a stdlib [http.Handler][].
21
22 ```go
23 import (
24 "net/http"
25 "reflect"
26
27 "golang.org/x/net/context"
28
29 jsoncodec "github.com/go-kit/kit/transport/codec/json"
30 httptransport "github.com/go-kit/kit/transport/http"
31 )
32
33 type request struct{}
34
35 func main() {
36 var (
37 ctx = context.Background()
38 requestType = reflect.TypeOf(request{})
39 codec = jsoncodec.New()
40 e = makeEndpoint()
41 before = []httptransport.BeforeFunc{}
42 after = []httptransport.AfterFunc{}
43 )
44 handler := httptransport.NewBinding(ctx, requestType, codec, e, before, after)
45 mux := http.NewServeMux()
46 mux.Handle("/path", handler)
47 http.ListenAndServe(":8080", mux)
48 }
49 ```
+0
-15
transport/codec/codec.go less more
0 package codec
1
2 import (
3 "io"
4
5 "golang.org/x/net/context"
6 )
7
8 // Codec decodes and encodes requests and responses. Decode takes and returns
9 // a context because the request or response may be accompanied by information
10 // that needs to be applied there.
11 type Codec interface {
12 Decode(context.Context, io.Reader, interface{}) (context.Context, error)
13 Encode(io.Writer, interface{}) error
14 }
+0
-24
transport/codec/json/json.go less more
0 package json
1
2 import (
3 "encoding/json"
4 "io"
5
6 "golang.org/x/net/context"
7
8 "github.com/go-kit/kit/transport/codec"
9 )
10
11 type jsonCodec struct{}
12
13 // New returns a JSON codec. Request and response structures should have
14 // properly-tagged fields.
15 func New() codec.Codec { return jsonCodec{} }
16
17 func (jsonCodec) Decode(ctx context.Context, r io.Reader, v interface{}) (context.Context, error) {
18 return ctx, json.NewDecoder(r).Decode(v)
19 }
20
21 func (jsonCodec) Encode(w io.Writer, v interface{}) error {
22 return json.NewEncoder(w).Encode(v)
23 }
+0
-40
transport/codec/json/json_test.go less more
0 package json_test
1
2 import (
3 "bytes"
4 "testing"
5
6 "golang.org/x/net/context"
7
8 jsoncodec "github.com/go-kit/kit/transport/codec/json"
9 )
10
11 type request struct {
12 A int `json:"a"`
13 B string `json:"b"`
14 }
15
16 type response struct {
17 Values []string `json:"values"`
18 }
19
20 func TestDecode(t *testing.T) {
21 buf := bytes.NewBufferString(`{"a":1,"b":"2"}`)
22 var req request
23 if _, err := jsoncodec.New().Decode(context.Background(), buf, &req); err != nil {
24 t.Fatal(err)
25 }
26 if want, have := (request{A: 1, B: "2"}), req; want != have {
27 t.Errorf("want %v, have %v", want, have)
28 }
29 }
30
31 func TestEncode(t *testing.T) {
32 buf := &bytes.Buffer{}
33 if err := jsoncodec.New().Encode(buf, response{Values: []string{"a", "b", "c"}}); err != nil {
34 t.Fatal(err)
35 }
36 if want, have := `{"values":["a","b","c"]}`+"\n", buf.String(); want != have {
37 t.Errorf("want %q, have %q", want, have)
38 }
39 }
55 "golang.org/x/net/context"
66 )
77
8 // BeforeFunc may take information from a HTTP request and put it into a
9 // request context. BeforeFuncs are executed in HTTP bindings, prior to
10 // invoking the endpoint.
8 // BeforeFunc may take information from an HTTP request and put it into a
9 // request context. BeforeFuncs are executed in the handler, prior to invoking
10 // the endpoint.
1111 type BeforeFunc func(context.Context, *http.Request) context.Context
1212
1313 // AfterFunc may take information from a request context and use it to
14 // manipulate a ResponseWriter. AfterFuncs are executed in HTTP bindings,
15 // after invoking the endpoint but prior to writing a response.
14 // manipulate a ResponseWriter. AfterFuncs are executed in the handler, after
15 // invoking the endpoint but prior to writing a response.
1616 type AfterFunc func(context.Context, http.ResponseWriter)
1717
18 // SetContentType returns an AfterFunc that sets the HTTP Content-Type header
19 // to the provided value.
20 func SetContentType(value string) AfterFunc {
18 // SetContentType returns an AfterFunc that sets the Content-Type header to
19 // the provided value.
20 func SetContentType(contentType string) AfterFunc {
21 return SetHeader("Content-Type", contentType)
22 }
23
24 // SetHeader returns an AfterFunc that sets the specified header on the
25 // response.
26 func SetHeader(key, val string) AfterFunc {
2127 return func(_ context.Context, w http.ResponseWriter) {
22 w.Header().Set("Content-Type", value)
28 w.Header().Set(key, val)
2329 }
2430 }
88 httptransport "github.com/go-kit/kit/transport/http"
99 )
1010
11 func TestSetContentType(t *testing.T) {
12 contentType := "application/whatever"
13 rec := httptest.NewRecorder()
14 httptransport.SetContentType(contentType)(context.Background(), rec)
15 if want, have := contentType, rec.Header().Get("Content-Type"); want != have {
11 func TestSetHeader(t *testing.T) {
12 const (
13 key = "X-Foo"
14 val = "12345"
15 )
16 r := httptest.NewRecorder()
17 httptransport.SetHeader(key, val)(context.Background(), r)
18 if want, have := val, r.Header().Get(key); want != have {
1619 t.Errorf("want %q, have %q", want, have)
1720 }
1821 }
22
23 func TestSetContentType(t *testing.T) {
24 const contentType = "application/json"
25 r := httptest.NewRecorder()
26 httptransport.SetContentType(contentType)(context.Background(), r)
27 if want, have := contentType, r.Header().Get("Content-Type"); want != have {
28 t.Errorf("want %q, have %q", want, have)
29 }
30 }
+0
-83
transport/http/binding.go less more
0 package http
1
2 import (
3 "net/http"
4
5 "golang.org/x/net/context"
6
7 "github.com/go-kit/kit/endpoint"
8 "github.com/go-kit/kit/transport/codec"
9 )
10
11 type binding struct {
12 context.Context
13 makeRequest func() interface{}
14 codec.Codec
15 endpoint.Endpoint
16 before []BeforeFunc
17 after []AfterFunc
18 }
19
20 // NewBinding returns an HTTP handler that wraps the given endpoint.
21 func NewBinding(ctx context.Context, makeRequest func() interface{}, cdc codec.Codec, e endpoint.Endpoint, options ...BindingOption) http.Handler {
22 b := &binding{
23 Context: ctx,
24 makeRequest: makeRequest,
25 Codec: cdc,
26 Endpoint: e,
27 }
28 for _, option := range options {
29 option(b)
30 }
31 return b
32 }
33
34 func (b *binding) ServeHTTP(w http.ResponseWriter, r *http.Request) {
35 // Per-request context.
36 ctx, cancel := context.WithCancel(b.Context)
37 defer cancel()
38
39 // Prepare the RPC's context with details from the request.
40 for _, f := range b.before {
41 ctx = f(ctx, r)
42 }
43
44 // Decode request.
45 req := b.makeRequest()
46 ctx, err := b.Codec.Decode(ctx, r.Body, req)
47 if err != nil {
48 http.Error(w, err.Error(), http.StatusBadRequest)
49 return
50 }
51
52 // Execute RPC.
53 resp, err := b.Endpoint(ctx, req)
54 if err != nil {
55 http.Error(w, err.Error(), http.StatusInternalServerError)
56 return
57 }
58
59 // Prepare the ResponseWriter.
60 for _, f := range b.after {
61 f(ctx, w)
62 }
63
64 // Encode response.
65 if err := b.Codec.Encode(w, resp); err != nil {
66 http.Error(w, err.Error(), http.StatusInternalServerError)
67 return
68 }
69 }
70
71 // BindingOption sets a parameter for the HTTP binding.
72 type BindingOption func(*binding)
73
74 // BindingBefore adds pre-RPC BeforeFuncs to the HTTP binding.
75 func BindingBefore(funcs ...BeforeFunc) BindingOption {
76 return func(b *binding) { b.before = append(b.before, funcs...) }
77 }
78
79 // BindingAfter adds post-RPC AfterFuncs to the HTTP binding.
80 func BindingAfter(funcs ...AfterFunc) BindingOption {
81 return func(b *binding) { b.after = append(b.after, funcs...) }
82 }
+0
-66
transport/http/binding_test.go less more
0 package http_test
1
2 import (
3 "bytes"
4 "encoding/json"
5 "fmt"
6 "net/http"
7 "net/http/httptest"
8 "reflect"
9 "testing"
10
11 "golang.org/x/net/context"
12
13 jsoncodec "github.com/go-kit/kit/transport/codec/json"
14 httptransport "github.com/go-kit/kit/transport/http"
15 )
16
17 func TestBinding(t *testing.T) {
18 type myRequest struct {
19 In int `json:"in"`
20 }
21
22 type myResponse struct {
23 Out int `json:"out"`
24 }
25
26 transform := func(i int) int {
27 return 3 * i // doesn't matter, just do something
28 }
29
30 endpoint := func(_ context.Context, request interface{}) (interface{}, error) {
31 r, ok := request.(*myRequest)
32 if !ok {
33 return nil, fmt.Errorf("not myRequest (%s)", reflect.TypeOf(request))
34 }
35 return myResponse{transform(r.In)}, nil
36 }
37
38 ctx := context.Background()
39 makeRequest := func() interface{} { return &myRequest{} }
40 codec := jsoncodec.New()
41 binding := httptransport.NewBinding(ctx, makeRequest, codec, endpoint)
42 server := httptest.NewServer(binding)
43 defer server.Close()
44
45 n := 123
46 requestBody, err := json.Marshal(myRequest{n})
47 if err != nil {
48 t.Fatal(err)
49 }
50
51 resp, err := http.Post(server.URL, "application/json", bytes.NewBuffer(requestBody))
52 if err != nil {
53 t.Fatal(err)
54 }
55 defer resp.Body.Close()
56
57 var r myResponse
58 if _, err := codec.Decode(ctx, resp.Body, &r); err != nil {
59 t.Fatal(err)
60 }
61
62 if want, have := transform(n), r.Out; want != have {
63 t.Errorf("want %d, have %d", want, have)
64 }
65 }
+0
-90
transport/http/client.go less more
0 package http
1
2 import (
3 "bytes"
4 "net/http"
5 "net/url"
6
7 "golang.org/x/net/context"
8
9 "github.com/go-kit/kit/endpoint"
10 "github.com/go-kit/kit/transport/codec"
11 )
12
13 type httpClient struct {
14 *url.URL
15 codec.Codec
16 method string
17 *http.Client
18 before []BeforeFunc
19 makeResponse func() interface{}
20 }
21
22 // NewClient returns a client endpoint for a remote service. addr must be a
23 // valid, parseable URL, including the scheme and path.
24 func NewClient(addr string, cdc codec.Codec, makeResponse func() interface{}, options ...ClientOption) endpoint.Endpoint {
25 u, err := url.Parse(addr)
26 if err != nil {
27 panic(err)
28 }
29 c := httpClient{
30 URL: u,
31 Codec: cdc,
32 method: "GET",
33 Client: http.DefaultClient,
34 makeResponse: makeResponse,
35 }
36 for _, option := range options {
37 option(&c)
38 }
39 return c.endpoint
40 }
41
42 func (c httpClient) endpoint(ctx context.Context, request interface{}) (interface{}, error) {
43 var buf bytes.Buffer
44 if err := c.Codec.Encode(&buf, request); err != nil {
45 return nil, err
46 }
47
48 req, err := http.NewRequest(c.method, c.URL.String(), &buf)
49 if err != nil {
50 return nil, err
51 }
52
53 for _, f := range c.before {
54 f(ctx, req)
55 }
56
57 resp, err := c.Client.Do(req)
58 if err != nil {
59 return nil, err
60 }
61 defer resp.Body.Close()
62
63 response := c.makeResponse()
64 ctx, err = c.Codec.Decode(ctx, resp.Body, response)
65 if err != nil {
66 return nil, err
67 }
68
69 return response, nil
70 }
71
72 // ClientOption sets a parameter for the HTTP client.
73 type ClientOption func(*httpClient)
74
75 // ClientBefore adds pre-invocation BeforeFuncs to the HTTP client.
76 func ClientBefore(funcs ...BeforeFunc) ClientOption {
77 return func(c *httpClient) { c.before = append(c.before, funcs...) }
78 }
79
80 // ClientMethod sets the method used to invoke the RPC. By default, it's GET.
81 func ClientMethod(method string) ClientOption {
82 return func(c *httpClient) { c.method = method }
83 }
84
85 // SetClient sets the HTTP client struct used to invoke the RPC. By default,
86 // it's http.DefaultClient.
87 func SetClient(client *http.Client) ClientOption {
88 return func(c *httpClient) { c.Client = client }
89 }
+0
-38
transport/http/client_test.go less more
0 package http_test
1
2 import (
3 "net/http"
4 "net/http/httptest"
5 "reflect"
6 "testing"
7
8 "golang.org/x/net/context"
9
10 jsoncodec "github.com/go-kit/kit/transport/codec/json"
11 httptransport "github.com/go-kit/kit/transport/http"
12 )
13
14 func TestClient(t *testing.T) {
15 type myResponse struct {
16 V int `json:"v"`
17 }
18 const v = 123
19 codec := jsoncodec.New()
20 server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
21 codec.Encode(w, myResponse{v})
22 }))
23 defer server.Close()
24 makeResponse := func() interface{} { return &myResponse{} }
25 client := httptransport.NewClient(server.URL, codec, makeResponse)
26 resp, err := client(context.Background(), struct{}{})
27 if err != nil {
28 t.Fatal(err)
29 }
30 response, ok := resp.(*myResponse)
31 if !ok {
32 t.Fatalf("not myResponse (%s)", reflect.TypeOf(response))
33 }
34 if want, have := v, response.V; want != have {
35 t.Errorf("want %d, have %d", want, have)
36 }
37 }
0 package http
1
2 import "net/http"
3
4 // DecodeFunc converts an HTTP request (transport-domain) to a user request
5 // (business-domain). One straightforward DecodeFunc could be something that
6 // JSON-decodes the request body to a concrete request type.
7 type DecodeFunc func(*http.Request) (interface{}, error)
8
9 // EncodeFunc converts a user response (business-domain) to an HTTP response
10 // (transport-domain) by encoding the interface to the response writer.
11 type EncodeFunc func(http.ResponseWriter, interface{}) error
0 package http
1
2 import (
3 "net/http"
4
5 "golang.org/x/net/context"
6
7 "github.com/go-kit/kit/endpoint"
8 )
9
10 // Server wraps an endpoint and implements http.Handler.
11 type Server struct {
12 context.Context
13 endpoint.Endpoint
14 DecodeFunc
15 EncodeFunc
16 Before []BeforeFunc
17 After []AfterFunc
18 }
19
20 // ServeHTTP implements http.Handler.
21 func (b Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
22 type errcode struct {
23 error
24 int
25 }
26 var (
27 ctx, cancel = context.WithCancel(b.Context)
28 errcodes = make(chan errcode, 1)
29 done = make(chan struct{}, 1)
30 )
31 defer cancel()
32 go func() {
33 for _, f := range b.Before {
34 ctx = f(ctx, r)
35 }
36 request, err := b.DecodeFunc(r)
37 if err != nil {
38 errcodes <- errcode{err, http.StatusBadRequest}
39 return
40 }
41 response, err := b.Endpoint(ctx, request)
42 if err != nil {
43 errcodes <- errcode{err, http.StatusInternalServerError}
44 return
45 }
46 for _, f := range b.After {
47 f(ctx, w)
48 }
49 if err := b.EncodeFunc(w, response); err != nil {
50 errcodes <- errcode{err, http.StatusInternalServerError}
51 return
52 }
53 close(done)
54 }()
55 select {
56 case <-ctx.Done():
57 http.Error(w, context.DeadlineExceeded.Error(), http.StatusInternalServerError)
58 case errcode := <-errcodes:
59 http.Error(w, errcode.error.Error(), errcode.int)
60 case <-done:
61 return
62 }
63 }
0 package http_test
1
2 import (
3 "errors"
4 "io/ioutil"
5 "net/http"
6 "net/http/httptest"
7 "testing"
8
9 "golang.org/x/net/context"
10
11 httptransport "github.com/go-kit/kit/transport/http"
12 )
13
14 func TestServerBadDecode(t *testing.T) {
15 handler := httptransport.Server{
16 Context: context.Background(),
17 Endpoint: func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil },
18 DecodeFunc: func(*http.Request) (interface{}, error) { return struct{}{}, errors.New("dang") },
19 EncodeFunc: func(http.ResponseWriter, interface{}) error { return nil },
20 }
21 server := httptest.NewServer(handler)
22 defer server.Close()
23 resp, _ := http.Get(server.URL)
24 if want, have := http.StatusBadRequest, resp.StatusCode; want != have {
25 t.Errorf("want %d, have %d", want, have)
26 }
27 }
28
29 func TestServerBadEndpoint(t *testing.T) {
30 handler := httptransport.Server{
31 Context: context.Background(),
32 Endpoint: func(context.Context, interface{}) (interface{}, error) { return struct{}{}, errors.New("dang") },
33 DecodeFunc: func(*http.Request) (interface{}, error) { return struct{}{}, nil },
34 EncodeFunc: func(http.ResponseWriter, interface{}) error { return nil },
35 }
36 server := httptest.NewServer(handler)
37 defer server.Close()
38 resp, _ := http.Get(server.URL)
39 if want, have := http.StatusInternalServerError, resp.StatusCode; want != have {
40 t.Errorf("want %d, have %d", want, have)
41 }
42 }
43
44 func TestServerBadEncode(t *testing.T) {
45 handler := httptransport.Server{
46 Context: context.Background(),
47 Endpoint: func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil },
48 DecodeFunc: func(*http.Request) (interface{}, error) { return struct{}{}, nil },
49 EncodeFunc: func(http.ResponseWriter, interface{}) error { return errors.New("dang") },
50 }
51 server := httptest.NewServer(handler)
52 defer server.Close()
53 resp, _ := http.Get(server.URL)
54 if want, have := http.StatusInternalServerError, resp.StatusCode; want != have {
55 t.Errorf("want %d, have %d", want, have)
56 }
57 }
58
59 func TestServerHappyPath(t *testing.T) {
60 _, step, response := testServer(t)
61 step()
62 resp := <-response
63 defer resp.Body.Close()
64 buf, _ := ioutil.ReadAll(resp.Body)
65 if want, have := http.StatusOK, resp.StatusCode; want != have {
66 t.Errorf("want %d, have %d (%s)", want, have, buf)
67 }
68 }
69
70 func TestServerContextCancel(t *testing.T) {
71 cancel, _, response := testServer(t)
72 cancel()
73 resp := <-response
74 defer resp.Body.Close()
75 buf, _ := ioutil.ReadAll(resp.Body)
76 if want, have := http.StatusInternalServerError, resp.StatusCode; want != have {
77 t.Errorf("want %d, have %d (%s)", want, have, buf)
78 }
79 }
80
81 func testServer(t *testing.T) (cancel, step func(), resp <-chan *http.Response) {
82 var (
83 ctx, cancelfn = context.WithCancel(context.Background())
84 stepch = make(chan bool)
85 endpoint = func(context.Context, interface{}) (interface{}, error) { <-stepch; return struct{}{}, nil }
86 response = make(chan *http.Response)
87 handler = httptransport.Server{
88 Context: ctx,
89 Endpoint: endpoint,
90 DecodeFunc: func(*http.Request) (interface{}, error) { return struct{}{}, nil },
91 EncodeFunc: func(http.ResponseWriter, interface{}) error { return nil },
92 Before: []httptransport.BeforeFunc{func(ctx context.Context, r *http.Request) context.Context { return ctx }},
93 After: []httptransport.AfterFunc{func(ctx context.Context, w http.ResponseWriter) { return }},
94 }
95 )
96 go func() {
97 server := httptest.NewServer(handler)
98 defer server.Close()
99 resp, err := http.Get(server.URL)
100 if err != nil {
101 t.Error(err)
102 return
103 }
104 response <- resp
105 }()
106 return cancelfn, func() { stepch <- true }, response
107 }