package {client,server} => package endpoint
- The same definition applies to both "sides"
- Update addsvc to reflect the type changes
- Update zipkin tests to reflect the type changes
Peter Bourgon
8 years ago
2 | 2 | import ( |
3 | 3 | "golang.org/x/net/context" |
4 | 4 | |
5 | "github.com/go-kit/kit/client" | |
5 | "github.com/go-kit/kit/endpoint" | |
6 | 6 | "github.com/go-kit/kit/log" |
7 | 7 | ) |
8 | 8 | |
15 | 15 | func pureAdd(_ context.Context, a, b int64) int64 { return a + b } |
16 | 16 | |
17 | 17 | // proxyAdd implements Add by invoking a remote Add service. |
18 | func proxyAdd(e client.Endpoint) Add { | |
18 | func proxyAdd(e endpoint.Endpoint) Add { | |
19 | 19 | return func(ctx context.Context, a, b int64) int64 { |
20 | 20 | resp, err := e(ctx, &addRequest{a, b}) |
21 | 21 | if err != nil { |
24 | 24 | } |
25 | 25 | addResp, ok := resp.(*addResponse) |
26 | 26 | if !ok { |
27 | log.DefaultLogger.Log("err", client.ErrBadCast) | |
27 | log.DefaultLogger.Log("err", endpoint.ErrBadCast) | |
28 | 28 | return 0 |
29 | 29 | } |
30 | 30 | return addResp.V |
2 | 2 | import ( |
3 | 3 | "golang.org/x/net/context" |
4 | 4 | |
5 | "github.com/go-kit/kit/server" | |
5 | "github.com/go-kit/kit/endpoint" | |
6 | 6 | ) |
7 | 7 | |
8 | // makeEndpoint returns a server.Endpoint wrapping the passed Add. If Add were | |
9 | // an interface with multiple methods, we'd need individual endpoints for | |
10 | // each. | |
8 | // makeEndpoint returns an endpoint wrapping the passed Add. If Add were an | |
9 | // interface with multiple methods, we'd need individual endpoints for each. | |
11 | 10 | // |
12 | 11 | // This function is just boiler-plate; in theory, it could be generated. |
13 | func makeEndpoint(a Add) server.Endpoint { | |
12 | func makeEndpoint(a Add) endpoint.Endpoint { | |
14 | 13 | return func(ctx context.Context, request interface{}) (interface{}, error) { |
15 | 14 | select { |
15 | default: | |
16 | 16 | case <-ctx.Done(): |
17 | return nil, server.ErrContextCanceled | |
18 | default: | |
17 | return nil, endpoint.ErrContextCanceled | |
19 | 18 | } |
20 | 19 | |
21 | 20 | addReq, ok := request.(*addRequest) |
22 | 21 | if !ok { |
23 | return nil, server.ErrBadCast | |
22 | return nil, endpoint.ErrBadCast | |
24 | 23 | } |
25 | 24 | |
26 | 25 | v := a(ctx, addReq.A, addReq.B) |
5 | 5 | "golang.org/x/net/context" |
6 | 6 | |
7 | 7 | "github.com/go-kit/kit/addsvc/pb" |
8 | "github.com/go-kit/kit/endpoint" | |
8 | 9 | "github.com/go-kit/kit/metrics" |
9 | "github.com/go-kit/kit/server" | |
10 | 10 | ) |
11 | 11 | |
12 | 12 | // A binding wraps an Endpoint so that it's usable by a transport. grpcBinding |
13 | 13 | // makes an Endpoint usable over gRPC. |
14 | type grpcBinding struct{ server.Endpoint } | |
14 | type grpcBinding struct{ endpoint.Endpoint } | |
15 | 15 | |
16 | 16 | // Add implements the proto3 AddServer by forwarding to the wrapped Endpoint. |
17 | 17 | // |
27 | 27 | |
28 | 28 | resp, ok := r.(*addResponse) |
29 | 29 | if !ok { |
30 | return nil, server.ErrBadCast | |
30 | return nil, endpoint.ErrBadCast | |
31 | 31 | } |
32 | 32 | |
33 | 33 | return &pb.AddReply{ |
24 | 24 | |
25 | 25 | thriftadd "github.com/go-kit/kit/addsvc/_thrift/gen-go/add" |
26 | 26 | "github.com/go-kit/kit/addsvc/pb" |
27 | "github.com/go-kit/kit/client" | |
27 | "github.com/go-kit/kit/endpoint" | |
28 | 28 | kitlog "github.com/go-kit/kit/log" |
29 | 29 | "github.com/go-kit/kit/metrics" |
30 | 30 | "github.com/go-kit/kit/metrics/expvar" |
31 | 31 | "github.com/go-kit/kit/metrics/prometheus" |
32 | 32 | "github.com/go-kit/kit/metrics/statsd" |
33 | "github.com/go-kit/kit/server" | |
34 | 33 | "github.com/go-kit/kit/tracing/zipkin" |
35 | 34 | jsoncodec "github.com/go-kit/kit/transport/codec/json" |
36 | 35 | httptransport "github.com/go-kit/kit/transport/http" |
96 | 95 | codec := jsoncodec.New() |
97 | 96 | makeResponse := func() interface{} { return &addResponse{} } |
98 | 97 | |
99 | var e client.Endpoint | |
98 | var e endpoint.Endpoint | |
100 | 99 | e = httptransport.NewClient(*proxyHTTPAddr, codec, makeResponse, httptransport.ClientBefore(zipkin.ToRequest(zipkinSpanFunc))) |
101 | 100 | e = zipkin.AnnotateClient(zipkinSpanFunc, zipkinCollector)(e) |
102 | 101 | |
104 | 103 | } |
105 | 104 | a = logging(logger)(a) |
106 | 105 | |
107 | // `package server` domain | |
108 | var e server.Endpoint | |
106 | // Server domain | |
107 | var e endpoint.Endpoint | |
109 | 108 | e = makeEndpoint(a) |
110 | 109 | e = zipkin.AnnotateServer(zipkinSpanFunc, zipkinCollector)(e) |
111 | 110 |
5 | 5 | "golang.org/x/net/context" |
6 | 6 | |
7 | 7 | thriftadd "github.com/go-kit/kit/addsvc/_thrift/gen-go/add" |
8 | "github.com/go-kit/kit/endpoint" | |
8 | 9 | "github.com/go-kit/kit/metrics" |
9 | "github.com/go-kit/kit/server" | |
10 | 10 | ) |
11 | 11 | |
12 | 12 | // A binding wraps an Endpoint so that it's usable by a transport. |
13 | 13 | // thriftBinding makes an Endpoint usable over Thrift. |
14 | 14 | type thriftBinding struct { |
15 | 15 | context.Context |
16 | server.Endpoint | |
16 | endpoint.Endpoint | |
17 | 17 | } |
18 | 18 | |
19 | 19 | // Add implements Thrift's AddService interface. |
25 | 25 | |
26 | 26 | resp, ok := r.(*addResponse) |
27 | 27 | if !ok { |
28 | return nil, server.ErrBadCast | |
28 | return nil, endpoint.ErrBadCast | |
29 | 29 | } |
30 | 30 | |
31 | 31 | return &thriftadd.AddReply{Value: resp.V}, nil |
0 | package client | |
1 | ||
2 | import ( | |
3 | "errors" | |
4 | ||
5 | "golang.org/x/net/context" | |
6 | ) | |
7 | ||
8 | // Endpoint is the fundamental building block of package client. | |
9 | // It represents a single RPC method on a remote service. | |
10 | type Endpoint func(ctx context.Context, request interface{}) (response interface{}, err error) | |
11 | ||
12 | // Middleware is a chainable behavior modifier. | |
13 | type Middleware func(Endpoint) Endpoint | |
14 | ||
15 | // ErrBadCast indicates a type error during decoding or encoding. | |
16 | var ErrBadCast = errors.New("bad cast") |
0 | package endpoint | |
1 | ||
2 | import ( | |
3 | "errors" | |
4 | ||
5 | "golang.org/x/net/context" | |
6 | ) | |
7 | ||
8 | // Endpoint is the fundamental building block of packages server and client. | |
9 | // It represents a single RPC method. | |
10 | type Endpoint func(ctx context.Context, request interface{}) (response interface{}, err error) | |
11 | ||
12 | // Middleware is a chainable behavior modifier. | |
13 | type Middleware func(Endpoint) Endpoint | |
14 | ||
15 | // ErrBadCast indicates an unexpected concrete request or response struct was | |
16 | // received from an endpoint. | |
17 | var ErrBadCast = errors.New("bad cast") | |
18 | ||
19 | // ContextCanceled indicates the request context was canceled. | |
20 | var ErrContextCanceled = errors.New("context canceled") |
0 | package server | |
1 | ||
2 | import ( | |
3 | "errors" | |
4 | ||
5 | "golang.org/x/net/context" | |
6 | ) | |
7 | ||
8 | // Endpoint is the fundamental building block of package server. | |
9 | // It represents a single RPC method. | |
10 | type Endpoint func(ctx context.Context, request interface{}) (response interface{}, err error) | |
11 | ||
12 | // Middleware is a chainable behavior modifier. | |
13 | type Middleware func(Endpoint) Endpoint | |
14 | ||
15 | // ErrBadCast indicates a type error during decoding or encoding. | |
16 | var ErrBadCast = errors.New("bad cast") | |
17 | ||
18 | // ErrContextCanceled indicates a controlling context was canceled before the | |
19 | // request could be served. | |
20 | var ErrContextCanceled = errors.New("context was canceled") |
4 | 4 | "net/http" |
5 | 5 | "strconv" |
6 | 6 | |
7 | "github.com/go-kit/kit/client" | |
7 | "github.com/go-kit/kit/endpoint" | |
8 | 8 | "github.com/go-kit/kit/log" |
9 | "github.com/go-kit/kit/server" | |
10 | 9 | |
11 | 10 | "golang.org/x/net/context" |
12 | 11 | ) |
49 | 48 | // context, adds server-receive and server-send annotations at the boundaries, |
50 | 49 | // and submits the span to the collector. If no span is found in the context, |
51 | 50 | // a new span is generated and inserted. |
52 | func AnnotateServer(newSpan NewSpanFunc, c Collector) server.Middleware { | |
53 | return func(e server.Endpoint) server.Endpoint { | |
51 | func AnnotateServer(newSpan NewSpanFunc, c Collector) endpoint.Middleware { | |
52 | return func(e endpoint.Endpoint) endpoint.Endpoint { | |
54 | 53 | return func(ctx context.Context, request interface{}) (interface{}, error) { |
55 | 54 | span, ok := fromContext(ctx) |
56 | 55 | if !ok { |
64 | 63 | } |
65 | 64 | } |
66 | 65 | |
67 | // AnnotateClient returns a client.Middleware that extracts a parent span from | |
68 | // the context, produces a client (child) span from it, adds client-send and | |
66 | // AnnotateClient returns a middleware that extracts a parent span from the | |
67 | // context, produces a client (child) span from it, adds client-send and | |
69 | 68 | // client-receive annotations at the boundaries, and submits the span to the |
70 | 69 | // collector. If no span is found in the context, a new span is generated and |
71 | 70 | // inserted. |
72 | func AnnotateClient(newSpan NewSpanFunc, c Collector) client.Middleware { | |
73 | return func(e client.Endpoint) client.Endpoint { | |
71 | func AnnotateClient(newSpan NewSpanFunc, c Collector) endpoint.Middleware { | |
72 | return func(e endpoint.Endpoint) endpoint.Endpoint { | |
74 | 73 | return func(ctx context.Context, request interface{}) (interface{}, error) { |
75 | 74 | var clientSpan *Span |
76 | 75 | parentSpan, ok := fromContext(ctx) |
0 | 0 | package zipkin_test |
1 | 1 | |
2 | 2 | import ( |
3 | "fmt" | |
3 | 4 | "net/http" |
4 | 5 | "reflect" |
5 | 6 | "runtime" |
6 | 7 | "strconv" |
7 | 8 | "strings" |
8 | "sync/atomic" | |
9 | 9 | "testing" |
10 | 10 | |
11 | 11 | "golang.org/x/net/context" |
12 | 12 | |
13 | "github.com/go-kit/kit/client" | |
14 | "github.com/go-kit/kit/server" | |
13 | "github.com/go-kit/kit/endpoint" | |
15 | 14 | "github.com/go-kit/kit/tracing/zipkin" |
16 | 15 | ) |
17 | 16 | |
86 | 85 | } |
87 | 86 | |
88 | 87 | func TestAnnotateServer(t *testing.T) { |
88 | if err := testAnnotate(zipkin.AnnotateServer, zipkin.ServerReceive, zipkin.ServerSend); err != nil { | |
89 | t.Fatal(err) | |
90 | } | |
91 | } | |
92 | ||
93 | func TestAnnotateClient(t *testing.T) { | |
94 | if err := testAnnotate(zipkin.AnnotateClient, zipkin.ClientSend, zipkin.ClientReceive); err != nil { | |
95 | t.Fatal(err) | |
96 | } | |
97 | } | |
98 | ||
99 | func testAnnotate( | |
100 | annotate func(newSpan zipkin.NewSpanFunc, c zipkin.Collector) endpoint.Middleware, | |
101 | wantAnnotations ...string, | |
102 | ) error { | |
89 | 103 | const ( |
90 | 104 | hostport = "1.2.3.4:1234" |
91 | 105 | serviceName = "some-service" |
92 | 106 | methodName = "some-method" |
93 | 107 | ) |
94 | 108 | |
95 | f := zipkin.MakeNewSpanFunc(hostport, serviceName, methodName) | |
96 | c := &countingCollector{} | |
109 | newSpan := zipkin.MakeNewSpanFunc(hostport, serviceName, methodName) | |
110 | collector := &countingCollector{} | |
97 | 111 | |
98 | var e server.Endpoint | |
112 | var e endpoint.Endpoint | |
99 | 113 | e = func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil } |
100 | e = zipkin.AnnotateServer(f, c)(e) | |
114 | e = annotate(newSpan, collector)(e) | |
101 | 115 | |
102 | if want, have := int32(0), atomic.LoadInt32(&(c.int32)); want != have { | |
103 | t.Errorf("want %d, have %d", want, have) | |
116 | if want, have := 0, len(collector.annotations); want != have { | |
117 | return fmt.Errorf("pre-invocation: want %d, have %d", want, have) | |
104 | 118 | } |
105 | 119 | if _, err := e(context.Background(), struct{}{}); err != nil { |
106 | t.Fatal(err) | |
120 | return fmt.Errorf("during invocation: %v", err) | |
107 | 121 | } |
108 | if want, have := int32(1), atomic.LoadInt32(&(c.int32)); want != have { | |
109 | t.Errorf("want %d, have %d", want, have) | |
122 | if want, have := wantAnnotations, collector.annotations; !reflect.DeepEqual(want, have) { | |
123 | return fmt.Errorf("after invocation: want %v, have %v", want, have) | |
110 | 124 | } |
125 | ||
126 | return nil | |
111 | 127 | } |
112 | 128 | |
113 | func TestAnnotateClient(t *testing.T) { | |
114 | const ( | |
115 | hostport = "192.168.1.100:53" | |
116 | serviceName = "client-service" | |
117 | methodName = "client-method" | |
118 | ) | |
129 | type countingCollector struct{ annotations []string } | |
119 | 130 | |
120 | f := zipkin.MakeNewSpanFunc(hostport, serviceName, methodName) | |
121 | c := &countingCollector{} | |
122 | ||
123 | var e client.Endpoint | |
124 | e = func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil } | |
125 | e = zipkin.AnnotateClient(f, c)(e) | |
126 | ||
127 | if want, have := int32(0), atomic.LoadInt32(&(c.int32)); want != have { | |
128 | t.Errorf("want %d, have %d", want, have) | |
131 | func (c *countingCollector) Collect(s *zipkin.Span) error { | |
132 | for _, annotation := range s.Encode().GetAnnotations() { | |
133 | c.annotations = append(c.annotations, annotation.GetValue()) | |
129 | 134 | } |
130 | if _, err := e(context.Background(), struct{}{}); err != nil { | |
131 | t.Fatal(err) | |
132 | } | |
133 | if want, have := int32(1), atomic.LoadInt32(&(c.int32)); want != have { | |
134 | t.Errorf("want %d, have %d", want, have) | |
135 | } | |
135 | return nil | |
136 | 136 | } |
137 | ||
138 | type countingCollector struct{ int32 } | |
139 | ||
140 | func (c *countingCollector) Collect(*zipkin.Span) error { atomic.AddInt32(&(c.int32), 1); return nil } |
4 | 4 | |
5 | 5 | "golang.org/x/net/context" |
6 | 6 | |
7 | "github.com/go-kit/kit/server" | |
7 | "github.com/go-kit/kit/endpoint" | |
8 | 8 | "github.com/go-kit/kit/transport/codec" |
9 | 9 | ) |
10 | 10 | |
12 | 12 | context.Context |
13 | 13 | makeRequest func() interface{} |
14 | 14 | codec.Codec |
15 | server.Endpoint | |
15 | endpoint.Endpoint | |
16 | 16 | before []BeforeFunc |
17 | 17 | after []AfterFunc |
18 | 18 | } |
19 | 19 | |
20 | 20 | // NewBinding returns an HTTP handler that wraps the given endpoint. |
21 | func NewBinding(ctx context.Context, makeRequest func() interface{}, cdc codec.Codec, endpoint server.Endpoint, options ...BindingOption) http.Handler { | |
21 | func NewBinding(ctx context.Context, makeRequest func() interface{}, cdc codec.Codec, e endpoint.Endpoint, options ...BindingOption) http.Handler { | |
22 | 22 | b := &binding{ |
23 | 23 | Context: ctx, |
24 | 24 | makeRequest: makeRequest, |
25 | 25 | Codec: cdc, |
26 | Endpoint: endpoint, | |
26 | Endpoint: e, | |
27 | 27 | } |
28 | 28 | for _, option := range options { |
29 | 29 | option(b) |
6 | 6 | |
7 | 7 | "golang.org/x/net/context" |
8 | 8 | |
9 | "github.com/go-kit/kit/client" | |
9 | "github.com/go-kit/kit/endpoint" | |
10 | 10 | "github.com/go-kit/kit/transport/codec" |
11 | 11 | ) |
12 | 12 | |
21 | 21 | |
22 | 22 | // NewClient returns a client endpoint for a remote service. addr must be a |
23 | 23 | // valid, parseable URL, including the scheme and path. |
24 | func NewClient(addr string, cdc codec.Codec, makeResponse func() interface{}, options ...ClientOption) client.Endpoint { | |
24 | func NewClient(addr string, cdc codec.Codec, makeResponse func() interface{}, options ...ClientOption) endpoint.Endpoint { | |
25 | 25 | u, err := url.Parse(addr) |
26 | 26 | if err != nil { |
27 | 27 | panic(err) |