Package list golang-github-go-kit-kit / 22e82b4
examples/addsvc2: rename pkg/xxx to pkg/addxxx Peter Bourgon 4 years ago
17 changed file(s) with 868 addition(s) and 868 deletion(s). Raw diff Collapse all Expand all
1919
2020 "github.com/go-kit/kit/log"
2121
22 addservice "github.com/go-kit/kit/examples/addsvc2/pkg/service"
23 addtransport "github.com/go-kit/kit/examples/addsvc2/pkg/transport"
22 "github.com/go-kit/kit/examples/addsvc2/pkg/addservice"
23 "github.com/go-kit/kit/examples/addsvc2/pkg/addtransport"
2424 addthrift "github.com/go-kit/kit/examples/addsvc2/thrift/gen-go/addsvc"
2525 )
2626
2626 "github.com/go-kit/kit/metrics/prometheus"
2727
2828 addpb "github.com/go-kit/kit/examples/addsvc2/pb"
29 addendpoint "github.com/go-kit/kit/examples/addsvc2/pkg/endpoint"
30 addservice "github.com/go-kit/kit/examples/addsvc2/pkg/service"
31 addtransport "github.com/go-kit/kit/examples/addsvc2/pkg/transport"
29 "github.com/go-kit/kit/examples/addsvc2/pkg/addendpoint"
30 "github.com/go-kit/kit/examples/addsvc2/pkg/addservice"
31 "github.com/go-kit/kit/examples/addsvc2/pkg/addtransport"
3232 addthrift "github.com/go-kit/kit/examples/addsvc2/thrift/gen-go/addsvc"
3333 )
3434
00 package main
11
22 import (
3 "context"
43 "io/ioutil"
54 "net/http"
65 "net/http/httptest"
76 "strings"
87 "testing"
98
9 "github.com/opentracing/opentracing-go"
10
1011 "github.com/go-kit/kit/log"
1112 "github.com/go-kit/kit/metrics/discard"
12 "github.com/opentracing/opentracing-go"
1313
14 addendpoint "github.com/go-kit/kit/examples/addsvc2/pkg/endpoint"
15 addservice "github.com/go-kit/kit/examples/addsvc2/pkg/service"
16 addtransport "github.com/go-kit/kit/examples/addsvc2/pkg/transport"
14 "github.com/go-kit/kit/examples/addsvc2/pkg/addendpoint"
15 "github.com/go-kit/kit/examples/addsvc2/pkg/addservice"
16 "github.com/go-kit/kit/examples/addsvc2/pkg/addtransport"
1717 )
1818
1919 func TestHTTP(t *testing.T) {
2020 svc := addservice.New(log.NewNopLogger(), discard.NewCounter(), discard.NewCounter())
2121 eps := addendpoint.New(svc, log.NewNopLogger(), discard.NewHistogram(), opentracing.GlobalTracer())
22 mux := addtransport.NewHTTPHandler(context.Background(), eps, log.NewNopLogger(), opentracing.GlobalTracer())
22 mux := addtransport.NewHTTPHandler(eps, opentracing.GlobalTracer(), log.NewNopLogger())
2323 srv := httptest.NewServer(mux)
2424 defer srv.Close()
2525
0 package addendpoint
1
2 import (
3 "context"
4 "fmt"
5 "time"
6
7 "github.com/go-kit/kit/endpoint"
8 "github.com/go-kit/kit/log"
9 "github.com/go-kit/kit/metrics"
10 )
11
12 // InstrumentingMiddleware returns an endpoint middleware that records
13 // the duration of each invocation to the passed histogram. The middleware adds
14 // a single field: "success", which is "true" if no error is returned, and
15 // "false" otherwise.
16 func InstrumentingMiddleware(duration metrics.Histogram) endpoint.Middleware {
17 return func(next endpoint.Endpoint) endpoint.Endpoint {
18 return func(ctx context.Context, request interface{}) (response interface{}, err error) {
19
20 defer func(begin time.Time) {
21 duration.With("success", fmt.Sprint(err == nil)).Observe(time.Since(begin).Seconds())
22 }(time.Now())
23 return next(ctx, request)
24
25 }
26 }
27 }
28
29 // LoggingMiddleware returns an endpoint middleware that logs the
30 // duration of each invocation, and the resulting error, if any.
31 func LoggingMiddleware(logger log.Logger) endpoint.Middleware {
32 return func(next endpoint.Endpoint) endpoint.Endpoint {
33 return func(ctx context.Context, request interface{}) (response interface{}, err error) {
34
35 defer func(begin time.Time) {
36 logger.Log("transport_error", err, "took", time.Since(begin))
37 }(time.Now())
38 return next(ctx, request)
39
40 }
41 }
42 }
0 package addendpoint
1
2 import (
3 "context"
4
5 rl "github.com/juju/ratelimit"
6 stdopentracing "github.com/opentracing/opentracing-go"
7 "github.com/sony/gobreaker"
8
9 "github.com/go-kit/kit/circuitbreaker"
10 "github.com/go-kit/kit/endpoint"
11 "github.com/go-kit/kit/log"
12 "github.com/go-kit/kit/metrics"
13 "github.com/go-kit/kit/ratelimit"
14 "github.com/go-kit/kit/tracing/opentracing"
15
16 "github.com/go-kit/kit/examples/addsvc2/pkg/addservice"
17 )
18
19 // Set collects all of the endpoints that compose an add service. It's meant to
20 // be used as a helper struct, to collect all of the endpoints into a single
21 // parameter.
22 type Set struct {
23 SumEndpoint endpoint.Endpoint
24 ConcatEndpoint endpoint.Endpoint
25 }
26
27 // New returns a Set that wraps the provided server, and wires in all of the
28 // expected endpoint middlewares via the various parameters.
29 func New(svc addservice.Service, logger log.Logger, duration metrics.Histogram, trace stdopentracing.Tracer) Set {
30 var sumEndpoint endpoint.Endpoint
31 {
32 sumEndpoint = MakeSumEndpoint(svc)
33 sumEndpoint = ratelimit.NewTokenBucketLimiter(rl.NewBucketWithRate(1, 1))(sumEndpoint)
34 sumEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{}))(sumEndpoint)
35 sumEndpoint = opentracing.TraceServer(trace, "Sum")(sumEndpoint)
36 sumEndpoint = LoggingMiddleware(log.With(logger, "method", "Sum"))(sumEndpoint)
37 sumEndpoint = InstrumentingMiddleware(duration.With("method", "Sum"))(sumEndpoint)
38 }
39 var concatEndpoint endpoint.Endpoint
40 {
41 concatEndpoint = MakeConcatEndpoint(svc)
42 concatEndpoint = ratelimit.NewTokenBucketLimiter(rl.NewBucketWithRate(100, 100))(concatEndpoint)
43 concatEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{}))(concatEndpoint)
44 concatEndpoint = opentracing.TraceServer(trace, "Concat")(concatEndpoint)
45 concatEndpoint = LoggingMiddleware(log.With(logger, "method", "Concat"))(concatEndpoint)
46 concatEndpoint = InstrumentingMiddleware(duration.With("method", "Concat"))(concatEndpoint)
47 }
48 return Set{
49 SumEndpoint: sumEndpoint,
50 ConcatEndpoint: concatEndpoint,
51 }
52 }
53
54 // Sum implements the service interface, so Set may be used as a service.
55 // This is primarily useful in the context of a client library.
56 func (s Set) Sum(ctx context.Context, a, b int) (int, error) {
57 resp, err := s.SumEndpoint(ctx, SumRequest{A: a, B: b})
58 if err != nil {
59 return 0, err
60 }
61 response := resp.(SumResponse)
62 return response.V, response.Err
63 }
64
65 // Concat implements the service interface, so Set may be used as a
66 // service. This is primarily useful in the context of a client library.
67 func (s Set) Concat(ctx context.Context, a, b string) (string, error) {
68 resp, err := s.ConcatEndpoint(ctx, ConcatRequest{A: a, B: b})
69 if err != nil {
70 return "", err
71 }
72 response := resp.(ConcatResponse)
73 return response.V, response.Err
74 }
75
76 // MakeSumEndpoint constructs a Sum endpoint wrapping the service.
77 func MakeSumEndpoint(s addservice.Service) endpoint.Endpoint {
78 return func(ctx context.Context, request interface{}) (response interface{}, err error) {
79 req := request.(SumRequest)
80 v, err := s.Sum(ctx, req.A, req.B)
81 return SumResponse{V: v, Err: err}, nil
82 }
83 }
84
85 // MakeConcatEndpoint constructs a Concat endpoint wrapping the service.
86 func MakeConcatEndpoint(s addservice.Service) endpoint.Endpoint {
87 return func(ctx context.Context, request interface{}) (response interface{}, err error) {
88 req := request.(ConcatRequest)
89 v, err := s.Concat(ctx, req.A, req.B)
90 return ConcatResponse{V: v, Err: err}, nil
91 }
92 }
93
94 // Failer is an interface that should be implemented by response types.
95 // Response encoders can check if responses are Failer, and if so if they've
96 // failed, and if so encode them using a separate write path based on the error.
97 type Failer interface {
98 Failed() error
99 }
100
101 // SumRequest collects the request parameters for the Sum method.
102 type SumRequest struct {
103 A, B int
104 }
105
106 // SumResponse collects the response values for the Sum method.
107 type SumResponse struct {
108 V int `json:"v"`
109 Err error `json:"-"` // should be intercepted by Failed/errorEncoder
110 }
111
112 // Failed implements Failer.
113 func (r SumResponse) Failed() error { return r.Err }
114
115 // ConcatRequest collects the request parameters for the Concat method.
116 type ConcatRequest struct {
117 A, B string
118 }
119
120 // ConcatResponse collects the response values for the Concat method.
121 type ConcatResponse struct {
122 V string `json:"v"`
123 Err error `json:"-"`
124 }
125
126 // Failed implements Failer.
127 func (r ConcatResponse) Failed() error { return r.Err }
0 package addservice
1
2 import (
3 "context"
4
5 "github.com/go-kit/kit/log"
6 "github.com/go-kit/kit/metrics"
7 )
8
9 // Middleware describes a service (as opposed to endpoint) middleware.
10 type Middleware func(Service) Service
11
12 // LoggingMiddleware takes a logger as a dependency
13 // and returns a ServiceMiddleware.
14 func LoggingMiddleware(logger log.Logger) Middleware {
15 return func(next Service) Service {
16 return loggingMiddleware{logger, next}
17 }
18 }
19
20 type loggingMiddleware struct {
21 logger log.Logger
22 next Service
23 }
24
25 func (mw loggingMiddleware) Sum(ctx context.Context, a, b int) (v int, err error) {
26 defer func() {
27 mw.logger.Log("method", "Sum", "a", a, "b", b, "v", v, "err", err)
28 }()
29 return mw.next.Sum(ctx, a, b)
30 }
31
32 func (mw loggingMiddleware) Concat(ctx context.Context, a, b string) (v string, err error) {
33 defer func() {
34 mw.logger.Log("method", "Concat", "a", a, "b", b, "v", v, "err", err)
35 }()
36 return mw.next.Concat(ctx, a, b)
37 }
38
39 // InstrumentingMiddleware returns a service middleware that instruments
40 // the number of integers summed and characters concatenated over the lifetime of
41 // the service.
42 func InstrumentingMiddleware(ints, chars metrics.Counter) Middleware {
43 return func(next Service) Service {
44 return instrumentingMiddleware{
45 ints: ints,
46 chars: chars,
47 next: next,
48 }
49 }
50 }
51
52 type instrumentingMiddleware struct {
53 ints metrics.Counter
54 chars metrics.Counter
55 next Service
56 }
57
58 func (mw instrumentingMiddleware) Sum(ctx context.Context, a, b int) (int, error) {
59 v, err := mw.next.Sum(ctx, a, b)
60 mw.ints.Add(float64(v))
61 return v, err
62 }
63
64 func (mw instrumentingMiddleware) Concat(ctx context.Context, a, b string) (string, error) {
65 v, err := mw.next.Concat(ctx, a, b)
66 mw.chars.Add(float64(len(v)))
67 return v, err
68 }
0 package addservice
1
2 import (
3 "context"
4 "errors"
5
6 "github.com/go-kit/kit/log"
7 "github.com/go-kit/kit/metrics"
8 )
9
10 // Service describes a service that adds things together.
11 type Service interface {
12 Sum(ctx context.Context, a, b int) (int, error)
13 Concat(ctx context.Context, a, b string) (string, error)
14 }
15
16 // New returns a basic Service with all of the expected middlewares wired in.
17 func New(logger log.Logger, ints, chars metrics.Counter) Service {
18 var svc Service
19 {
20 svc = NewBasicService()
21 svc = LoggingMiddleware(logger)(svc)
22 svc = InstrumentingMiddleware(ints, chars)(svc)
23 }
24 return svc
25 }
26
27 var (
28 // ErrTwoZeroes is an arbitrary business rule for the Add method.
29 ErrTwoZeroes = errors.New("can't sum two zeroes")
30
31 // ErrIntOverflow protects the Add method. We've decided that this error
32 // indicates a misbehaving service and should count against e.g. circuit
33 // breakers. So, we return it directly in endpoints, to illustrate the
34 // difference. In a real service, this probably wouldn't be the case.
35 ErrIntOverflow = errors.New("integer overflow")
36
37 // ErrMaxSizeExceeded protects the Concat method.
38 ErrMaxSizeExceeded = errors.New("result exceeds maximum size")
39 )
40
41 // NewBasicService returns a naïve, stateless implementation of Service.
42 func NewBasicService() Service {
43 return basicService{}
44 }
45
46 type basicService struct{}
47
48 const (
49 intMax = 1<<31 - 1
50 intMin = -(intMax + 1)
51 maxLen = 10
52 )
53
54 func (s basicService) Sum(_ context.Context, a, b int) (int, error) {
55 if a == 0 && b == 0 {
56 return 0, ErrTwoZeroes
57 }
58 if (b > 0 && a > (intMax-b)) || (b < 0 && a < (intMin-b)) {
59 return 0, ErrIntOverflow
60 }
61 return a + b, nil
62 }
63
64 // Concat implements Service.
65 func (s basicService) Concat(_ context.Context, a, b string) (string, error) {
66 if len(a)+len(b) > maxLen {
67 return "", ErrMaxSizeExceeded
68 }
69 return a + b, nil
70 }
0 package addtransport
1
2 import (
3 "context"
4 "errors"
5 "time"
6
7 "google.golang.org/grpc"
8
9 jujuratelimit "github.com/juju/ratelimit"
10 stdopentracing "github.com/opentracing/opentracing-go"
11 "github.com/sony/gobreaker"
12 oldcontext "golang.org/x/net/context"
13
14 "github.com/go-kit/kit/circuitbreaker"
15 "github.com/go-kit/kit/endpoint"
16 "github.com/go-kit/kit/log"
17 "github.com/go-kit/kit/ratelimit"
18 "github.com/go-kit/kit/tracing/opentracing"
19 grpctransport "github.com/go-kit/kit/transport/grpc"
20
21 "github.com/go-kit/kit/examples/addsvc2/pb"
22 "github.com/go-kit/kit/examples/addsvc2/pkg/addendpoint"
23 "github.com/go-kit/kit/examples/addsvc2/pkg/addservice"
24 )
25
26 type grpcServer struct {
27 sum grpctransport.Handler
28 concat grpctransport.Handler
29 }
30
31 // NewGRPCServer makes a set of endpoints available as a gRPC AddServer.
32 func NewGRPCServer(endpoints addendpoint.Set, tracer stdopentracing.Tracer, logger log.Logger) pb.AddServer {
33 options := []grpctransport.ServerOption{
34 grpctransport.ServerErrorLogger(logger),
35 }
36 return &grpcServer{
37 sum: grpctransport.NewServer(
38 endpoints.SumEndpoint,
39 decodeGRPCSumRequest,
40 encodeGRPCSumResponse,
41 append(options, grpctransport.ServerBefore(opentracing.FromGRPCRequest(tracer, "Sum", logger)))...,
42 ),
43 concat: grpctransport.NewServer(
44 endpoints.ConcatEndpoint,
45 decodeGRPCConcatRequest,
46 encodeGRPCConcatResponse,
47 append(options, grpctransport.ServerBefore(opentracing.FromGRPCRequest(tracer, "Concat", logger)))...,
48 ),
49 }
50 }
51
52 func (s *grpcServer) Sum(ctx oldcontext.Context, req *pb.SumRequest) (*pb.SumReply, error) {
53 _, rep, err := s.sum.ServeGRPC(ctx, req)
54 if err != nil {
55 return nil, err
56 }
57 return rep.(*pb.SumReply), nil
58 }
59
60 func (s *grpcServer) Concat(ctx oldcontext.Context, req *pb.ConcatRequest) (*pb.ConcatReply, error) {
61 _, rep, err := s.concat.ServeGRPC(ctx, req)
62 if err != nil {
63 return nil, err
64 }
65 return rep.(*pb.ConcatReply), nil
66 }
67
68 // NewGRPCClient returns an AddService backed by a gRPC server at the other end
69 // of the conn. The caller is responsible for constructing the conn, and
70 // eventually closing the underlying transport.
71 func NewGRPCClient(conn *grpc.ClientConn, tracer stdopentracing.Tracer, logger log.Logger) addservice.Service {
72 // We construct a single ratelimiter middleware, to limit the total outgoing
73 // QPS from this client to all methods on the remote instance. We also
74 // construct per-endpoint circuitbreaker middlewares to demonstrate how
75 // that's done, although they could easily be combined into a single breaker
76 // for the entire remote instance, too.
77 limiter := ratelimit.NewTokenBucketLimiter(jujuratelimit.NewBucketWithRate(100, 100))
78
79 // Each individual endpoint is an http/transport.Client (which implements
80 // endpoint.Endpoint) that gets wrapped with various middlewares. If you
81 // made your own client library, you'd do this work there, so your server
82 // could rely on a consistent set of client behavior.
83 var sumEndpoint endpoint.Endpoint
84 {
85 sumEndpoint = grpctransport.NewClient(
86 conn,
87 "pb.Add",
88 "Sum",
89 encodeGRPCSumRequest,
90 decodeGRPCSumResponse,
91 pb.SumReply{},
92 grpctransport.ClientBefore(opentracing.ToGRPCRequest(tracer, logger)),
93 ).Endpoint()
94 sumEndpoint = opentracing.TraceClient(tracer, "Sum")(sumEndpoint)
95 sumEndpoint = limiter(sumEndpoint)
96 sumEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{
97 Name: "Sum",
98 Timeout: 30 * time.Second,
99 }))(sumEndpoint)
100 }
101
102 // The Concat endpoint is the same thing, with slightly different
103 // middlewares to demonstrate how to specialize per-endpoint.
104 var concatEndpoint endpoint.Endpoint
105 {
106 concatEndpoint = grpctransport.NewClient(
107 conn,
108 "pb.Add",
109 "Concat",
110 encodeGRPCConcatRequest,
111 decodeGRPCConcatResponse,
112 pb.ConcatReply{},
113 grpctransport.ClientBefore(opentracing.ToGRPCRequest(tracer, logger)),
114 ).Endpoint()
115 concatEndpoint = opentracing.TraceClient(tracer, "Concat")(concatEndpoint)
116 concatEndpoint = limiter(concatEndpoint)
117 concatEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{
118 Name: "Concat",
119 Timeout: 10 * time.Second,
120 }))(concatEndpoint)
121 }
122
123 // Returning the endpoint.Set as a service.Service relies on the
124 // endpoint.Set implementing the Service methods. That's just a simple bit
125 // of glue code.
126 return addendpoint.Set{
127 SumEndpoint: sumEndpoint,
128 ConcatEndpoint: concatEndpoint,
129 }
130 }
131
132 // decodeGRPCSumRequest is a transport/grpc.DecodeRequestFunc that converts a
133 // gRPC sum request to a user-domain sum request. Primarily useful in a server.
134 func decodeGRPCSumRequest(_ context.Context, grpcReq interface{}) (interface{}, error) {
135 req := grpcReq.(*pb.SumRequest)
136 return addendpoint.SumRequest{A: int(req.A), B: int(req.B)}, nil
137 }
138
139 // decodeGRPCConcatRequest is a transport/grpc.DecodeRequestFunc that converts a
140 // gRPC concat request to a user-domain concat request. Primarily useful in a
141 // server.
142 func decodeGRPCConcatRequest(_ context.Context, grpcReq interface{}) (interface{}, error) {
143 req := grpcReq.(*pb.ConcatRequest)
144 return addendpoint.ConcatRequest{A: req.A, B: req.B}, nil
145 }
146
147 // decodeGRPCSumResponse is a transport/grpc.DecodeResponseFunc that converts a
148 // gRPC sum reply to a user-domain sum response. Primarily useful in a client.
149 func decodeGRPCSumResponse(_ context.Context, grpcReply interface{}) (interface{}, error) {
150 reply := grpcReply.(*pb.SumReply)
151 return addendpoint.SumResponse{V: int(reply.V), Err: str2err(reply.Err)}, nil
152 }
153
154 // decodeGRPCConcatResponse is a transport/grpc.DecodeResponseFunc that converts
155 // a gRPC concat reply to a user-domain concat response. Primarily useful in a
156 // client.
157 func decodeGRPCConcatResponse(_ context.Context, grpcReply interface{}) (interface{}, error) {
158 reply := grpcReply.(*pb.ConcatReply)
159 return addendpoint.ConcatResponse{V: reply.V, Err: str2err(reply.Err)}, nil
160 }
161
162 // encodeGRPCSumResponse is a transport/grpc.EncodeResponseFunc that converts a
163 // user-domain sum response to a gRPC sum reply. Primarily useful in a server.
164 func encodeGRPCSumResponse(_ context.Context, response interface{}) (interface{}, error) {
165 resp := response.(addendpoint.SumResponse)
166 return &pb.SumReply{V: int64(resp.V), Err: err2str(resp.Err)}, nil
167 }
168
169 // encodeGRPCConcatResponse is a transport/grpc.EncodeResponseFunc that converts
170 // a user-domain concat response to a gRPC concat reply. Primarily useful in a
171 // server.
172 func encodeGRPCConcatResponse(_ context.Context, response interface{}) (interface{}, error) {
173 resp := response.(addendpoint.ConcatResponse)
174 return &pb.ConcatReply{V: resp.V, Err: err2str(resp.Err)}, nil
175 }
176
177 // encodeGRPCSumRequest is a transport/grpc.EncodeRequestFunc that converts a
178 // user-domain sum request to a gRPC sum request. Primarily useful in a client.
179 func encodeGRPCSumRequest(_ context.Context, request interface{}) (interface{}, error) {
180 req := request.(addendpoint.SumRequest)
181 return &pb.SumRequest{A: int64(req.A), B: int64(req.B)}, nil
182 }
183
184 // encodeGRPCConcatRequest is a transport/grpc.EncodeRequestFunc that converts a
185 // user-domain concat request to a gRPC concat request. Primarily useful in a
186 // client.
187 func encodeGRPCConcatRequest(_ context.Context, request interface{}) (interface{}, error) {
188 req := request.(addendpoint.ConcatRequest)
189 return &pb.ConcatRequest{A: req.A, B: req.B}, nil
190 }
191
192 // These annoying helper functions are required to translate Go error types to
193 // and from strings, which is the type we use in our IDLs to represent errors.
194 // There is special casing to treat empty strings as nil errors.
195
196 func str2err(s string) error {
197 if s == "" {
198 return nil
199 }
200 return errors.New(s)
201 }
202
203 func err2str(err error) string {
204 if err == nil {
205 return ""
206 }
207 return err.Error()
208 }
0 package addtransport
1
2 import (
3 "bytes"
4 "context"
5 "encoding/json"
6 "errors"
7 "io/ioutil"
8 "net/http"
9 "net/url"
10 "strings"
11 "time"
12
13 jujuratelimit "github.com/juju/ratelimit"
14 stdopentracing "github.com/opentracing/opentracing-go"
15 "github.com/sony/gobreaker"
16
17 "github.com/go-kit/kit/circuitbreaker"
18 "github.com/go-kit/kit/endpoint"
19 "github.com/go-kit/kit/log"
20 "github.com/go-kit/kit/ratelimit"
21 "github.com/go-kit/kit/tracing/opentracing"
22 httptransport "github.com/go-kit/kit/transport/http"
23
24 "github.com/go-kit/kit/examples/addsvc2/pkg/addendpoint"
25 "github.com/go-kit/kit/examples/addsvc2/pkg/addservice"
26 )
27
28 // NewHTTPHandler returns an HTTP handler that makes a set of endpoints
29 // available on predefined paths.
30 func NewHTTPHandler(endpoints addendpoint.Set, tracer stdopentracing.Tracer, logger log.Logger) http.Handler {
31 options := []httptransport.ServerOption{
32 httptransport.ServerErrorEncoder(errorEncoder),
33 httptransport.ServerErrorLogger(logger),
34 }
35 m := http.NewServeMux()
36 m.Handle("/sum", httptransport.NewServer(
37 endpoints.SumEndpoint,
38 decodeHTTPSumRequest,
39 encodeHTTPGenericResponse,
40 append(options, httptransport.ServerBefore(opentracing.FromHTTPRequest(tracer, "Sum", logger)))...,
41 ))
42 m.Handle("/concat", httptransport.NewServer(
43 endpoints.ConcatEndpoint,
44 decodeHTTPConcatRequest,
45 encodeHTTPGenericResponse,
46 append(options, httptransport.ServerBefore(opentracing.FromHTTPRequest(tracer, "Concat", logger)))...,
47 ))
48 return m
49 }
50
51 // NewHTTPClient returns an AddService backed by an HTTP server living at the
52 // remote instance. We expect instance to come from a service discovery system,
53 // so likely of the form "host:port".
54 func NewHTTPClient(instance string, tracer stdopentracing.Tracer, logger log.Logger) (addservice.Service, error) {
55 // Quickly sanitize the instance string.
56 if !strings.HasPrefix(instance, "http") {
57 instance = "http://" + instance
58 }
59 u, err := url.Parse(instance)
60 if err != nil {
61 return nil, err
62 }
63
64 // We construct a single ratelimiter middleware, to limit the total outgoing
65 // QPS from this client to all methods on the remote instance. We also
66 // construct per-endpoint circuitbreaker middlewares to demonstrate how
67 // that's done, although they could easily be combined into a single breaker
68 // for the entire remote instance, too.
69 limiter := ratelimit.NewTokenBucketLimiter(jujuratelimit.NewBucketWithRate(100, 100))
70
71 // Each individual endpoint is an http/transport.Client (which implements
72 // endpoint.Endpoint) that gets wrapped with various middlewares. If you
73 // made your own client library, you'd do this work there, so your server
74 // could rely on a consistent set of client behavior.
75 var sumEndpoint endpoint.Endpoint
76 {
77 sumEndpoint = httptransport.NewClient(
78 "POST",
79 copyURL(u, "/sum"),
80 encodeHTTPGenericRequest,
81 decodeHTTPSumResponse,
82 httptransport.ClientBefore(opentracing.ToHTTPRequest(tracer, logger)),
83 ).Endpoint()
84 sumEndpoint = opentracing.TraceClient(tracer, "Sum")(sumEndpoint)
85 sumEndpoint = limiter(sumEndpoint)
86 sumEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{
87 Name: "Sum",
88 Timeout: 30 * time.Second,
89 }))(sumEndpoint)
90 }
91
92 // The Concat endpoint is the same thing, with slightly different
93 // middlewares to demonstrate how to specialize per-endpoint.
94 var concatEndpoint endpoint.Endpoint
95 {
96 concatEndpoint = httptransport.NewClient(
97 "POST",
98 copyURL(u, "/concat"),
99 encodeHTTPGenericRequest,
100 decodeHTTPConcatResponse,
101 httptransport.ClientBefore(opentracing.ToHTTPRequest(tracer, logger)),
102 ).Endpoint()
103 concatEndpoint = opentracing.TraceClient(tracer, "Concat")(concatEndpoint)
104 concatEndpoint = limiter(concatEndpoint)
105 concatEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{
106 Name: "Concat",
107 Timeout: 10 * time.Second,
108 }))(concatEndpoint)
109 }
110
111 // Returning the endpoint.Set as a service.Service relies on the
112 // endpoint.Set implementing the Service methods. That's just a simple bit
113 // of glue code.
114 return addendpoint.Set{
115 SumEndpoint: sumEndpoint,
116 ConcatEndpoint: concatEndpoint,
117 }, nil
118 }
119
120 func copyURL(base *url.URL, path string) *url.URL {
121 next := *base
122 next.Path = path
123 return &next
124 }
125
126 func errorEncoder(_ context.Context, err error, w http.ResponseWriter) {
127 w.WriteHeader(err2code(err))
128 json.NewEncoder(w).Encode(errorWrapper{Error: err.Error()})
129 }
130
131 func err2code(err error) int {
132 switch err {
133 case addservice.ErrTwoZeroes, addservice.ErrMaxSizeExceeded, addservice.ErrIntOverflow:
134 return http.StatusBadRequest
135 }
136 return http.StatusInternalServerError
137 }
138
139 func errorDecoder(r *http.Response) error {
140 var w errorWrapper
141 if err := json.NewDecoder(r.Body).Decode(&w); err != nil {
142 return err
143 }
144 return errors.New(w.Error)
145 }
146
147 type errorWrapper struct {
148 Error string `json:"error"`
149 }
150
151 // decodeHTTPSumRequest is a transport/http.DecodeRequestFunc that decodes a
152 // JSON-encoded sum request from the HTTP request body. Primarily useful in a
153 // server.
154 func decodeHTTPSumRequest(_ context.Context, r *http.Request) (interface{}, error) {
155 var req addendpoint.SumRequest
156 err := json.NewDecoder(r.Body).Decode(&req)
157 return req, err
158 }
159
160 // decodeHTTPConcatRequest is a transport/http.DecodeRequestFunc that decodes a
161 // JSON-encoded concat request from the HTTP request body. Primarily useful in a
162 // server.
163 func decodeHTTPConcatRequest(_ context.Context, r *http.Request) (interface{}, error) {
164 var req addendpoint.ConcatRequest
165 err := json.NewDecoder(r.Body).Decode(&req)
166 return req, err
167 }
168
169 // decodeHTTPSumResponse is a transport/http.DecodeResponseFunc that decodes a
170 // JSON-encoded sum response from the HTTP response body. If the response has a
171 // non-200 status code, we will interpret that as an error and attempt to decode
172 // the specific error message from the response body. Primarily useful in a
173 // client.
174 func decodeHTTPSumResponse(_ context.Context, r *http.Response) (interface{}, error) {
175 if r.StatusCode != http.StatusOK {
176 return nil, errors.New(r.Status)
177 }
178 var resp addendpoint.SumResponse
179 err := json.NewDecoder(r.Body).Decode(&resp)
180 return resp, err
181 }
182
183 // decodeHTTPConcatResponse is a transport/http.DecodeResponseFunc that decodes
184 // a JSON-encoded concat response from the HTTP response body. If the response
185 // has a non-200 status code, we will interpret that as an error and attempt to
186 // decode the specific error message from the response body. Primarily useful in
187 // a client.
188 func decodeHTTPConcatResponse(_ context.Context, r *http.Response) (interface{}, error) {
189 if r.StatusCode != http.StatusOK {
190 return nil, errors.New(r.Status)
191 }
192 var resp addendpoint.ConcatResponse
193 err := json.NewDecoder(r.Body).Decode(&resp)
194 return resp, err
195 }
196
197 // encodeHTTPGenericRequest is a transport/http.EncodeRequestFunc that
198 // JSON-encodes any request to the request body. Primarily useful in a client.
199 func encodeHTTPGenericRequest(_ context.Context, r *http.Request, request interface{}) error {
200 var buf bytes.Buffer
201 if err := json.NewEncoder(&buf).Encode(request); err != nil {
202 return err
203 }
204 r.Body = ioutil.NopCloser(&buf)
205 return nil
206 }
207
208 // encodeHTTPGenericResponse is a transport/http.EncodeResponseFunc that encodes
209 // the response as JSON to the response writer. Primarily useful in a server.
210 func encodeHTTPGenericResponse(ctx context.Context, w http.ResponseWriter, response interface{}) error {
211 if f, ok := response.(addendpoint.Failer); ok && f.Failed() != nil {
212 errorEncoder(ctx, f.Failed(), w)
213 return nil
214 }
215 w.Header().Set("Content-Type", "application/json; charset=utf-8")
216 return json.NewEncoder(w).Encode(response)
217 }
0 package addtransport
1
2 import (
3 "context"
4 "time"
5
6 jujuratelimit "github.com/juju/ratelimit"
7 "github.com/sony/gobreaker"
8
9 "github.com/go-kit/kit/circuitbreaker"
10 "github.com/go-kit/kit/endpoint"
11 "github.com/go-kit/kit/ratelimit"
12
13 "github.com/go-kit/kit/examples/addsvc2/pkg/addendpoint"
14 "github.com/go-kit/kit/examples/addsvc2/pkg/addservice"
15 thriftadd "github.com/go-kit/kit/examples/addsvc2/thrift/gen-go/addsvc"
16 )
17
18 type thriftServer struct {
19 ctx context.Context
20 endpoints addendpoint.Set
21 }
22
23 // NewThriftServer makes a set of endpoints available as a Thrift service.
24 func NewThriftServer(ctx context.Context, endpoints addendpoint.Set) thriftadd.AddService {
25 return &thriftServer{
26 ctx: ctx,
27 endpoints: endpoints,
28 }
29 }
30
31 func (s *thriftServer) Sum(a int64, b int64) (*thriftadd.SumReply, error) {
32 request := addendpoint.SumRequest{A: int(a), B: int(b)}
33 response, err := s.endpoints.SumEndpoint(s.ctx, request)
34 if err != nil {
35 return nil, err
36 }
37 resp := response.(addendpoint.SumResponse)
38 return &thriftadd.SumReply{Value: int64(resp.V), Err: err2str(resp.Err)}, nil
39 }
40
41 func (s *thriftServer) Concat(a string, b string) (*thriftadd.ConcatReply, error) {
42 request := addendpoint.ConcatRequest{A: a, B: b}
43 response, err := s.endpoints.ConcatEndpoint(s.ctx, request)
44 if err != nil {
45 return nil, err
46 }
47 resp := response.(addendpoint.ConcatResponse)
48 return &thriftadd.ConcatReply{Value: resp.V, Err: err2str(resp.Err)}, nil
49 }
50
51 // NewThriftClient returns an AddService backed by a Thrift server described by
52 // the provided client. The caller is responsible for constructing the client,
53 // and eventually closing the underlying transport.
54 func NewThriftClient(client *thriftadd.AddServiceClient) addservice.Service {
55 // We construct a single ratelimiter middleware, to limit the total outgoing
56 // QPS from this client to all methods on the remote instance. We also
57 // construct per-endpoint circuitbreaker middlewares to demonstrate how
58 // that's done, although they could easily be combined into a single breaker
59 // for the entire remote instance, too.
60 limiter := ratelimit.NewTokenBucketLimiter(jujuratelimit.NewBucketWithRate(100, 100))
61
62 // Each individual endpoint is an http/transport.Client (which implements
63 // endpoint.Endpoint) that gets wrapped with various middlewares. If you
64 // could rely on a consistent set of client behavior.
65 var sumEndpoint endpoint.Endpoint
66 {
67 sumEndpoint = MakeThriftSumEndpoint(client)
68 sumEndpoint = limiter(sumEndpoint)
69 sumEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{
70 Name: "Sum",
71 Timeout: 30 * time.Second,
72 }))(sumEndpoint)
73 }
74
75 // The Concat endpoint is the same thing, with slightly different
76 // middlewares to demonstrate how to specialize per-endpoint.
77 var concatEndpoint endpoint.Endpoint
78 {
79 concatEndpoint = MakeThriftConcatEndpoint(client)
80 concatEndpoint = limiter(concatEndpoint)
81 concatEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{
82 Name: "Concat",
83 Timeout: 10 * time.Second,
84 }))(concatEndpoint)
85 }
86
87 // Returning the endpoint.Set as a service.Service relies on the
88 // endpoint.Set implementing the Service methods. That's just a simple bit
89 // of glue code.
90 return addendpoint.Set{
91 SumEndpoint: sumEndpoint,
92 ConcatEndpoint: concatEndpoint,
93 }
94 }
95
96 // MakeThriftSumEndpoint returns an endpoint that invokes the passed Thrift client.
97 // Useful only in clients, and only until a proper transport/thrift.Client exists.
98 func MakeThriftSumEndpoint(client *thriftadd.AddServiceClient) endpoint.Endpoint {
99 return func(ctx context.Context, request interface{}) (interface{}, error) {
100 req := request.(addendpoint.SumRequest)
101 reply, err := client.Sum(int64(req.A), int64(req.B))
102 if err == addservice.ErrIntOverflow {
103 return nil, err // special case; see comment on ErrIntOverflow
104 }
105 return addendpoint.SumResponse{V: int(reply.Value), Err: err}, nil
106 }
107 }
108
109 // MakeThriftConcatEndpoint returns an endpoint that invokes the passed Thrift
110 // client. Useful only in clients, and only until a proper
111 // transport/thrift.Client exists.
112 func MakeThriftConcatEndpoint(client *thriftadd.AddServiceClient) endpoint.Endpoint {
113 return func(ctx context.Context, request interface{}) (interface{}, error) {
114 req := request.(addendpoint.ConcatRequest)
115 reply, err := client.Concat(req.A, req.B)
116 return addendpoint.ConcatResponse{V: reply.Value, Err: err}, nil
117 }
118 }
+0
-43
examples/addsvc2/pkg/endpoint/middleware.go less more
0 package endpoint
1
2 import (
3 "context"
4 "fmt"
5 "time"
6
7 "github.com/go-kit/kit/endpoint"
8 "github.com/go-kit/kit/log"
9 "github.com/go-kit/kit/metrics"
10 )
11
12 // InstrumentingMiddleware returns an endpoint middleware that records
13 // the duration of each invocation to the passed histogram. The middleware adds
14 // a single field: "success", which is "true" if no error is returned, and
15 // "false" otherwise.
16 func InstrumentingMiddleware(duration metrics.Histogram) endpoint.Middleware {
17 return func(next endpoint.Endpoint) endpoint.Endpoint {
18 return func(ctx context.Context, request interface{}) (response interface{}, err error) {
19
20 defer func(begin time.Time) {
21 duration.With("success", fmt.Sprint(err == nil)).Observe(time.Since(begin).Seconds())
22 }(time.Now())
23 return next(ctx, request)
24
25 }
26 }
27 }
28
29 // LoggingMiddleware returns an endpoint middleware that logs the
30 // duration of each invocation, and the resulting error, if any.
31 func LoggingMiddleware(logger log.Logger) endpoint.Middleware {
32 return func(next endpoint.Endpoint) endpoint.Endpoint {
33 return func(ctx context.Context, request interface{}) (response interface{}, err error) {
34
35 defer func(begin time.Time) {
36 logger.Log("transport_error", err, "took", time.Since(begin))
37 }(time.Now())
38 return next(ctx, request)
39
40 }
41 }
42 }
+0
-128
examples/addsvc2/pkg/endpoint/set.go less more
0 package endpoint
1
2 import (
3 "context"
4
5 rl "github.com/juju/ratelimit"
6 stdopentracing "github.com/opentracing/opentracing-go"
7 "github.com/sony/gobreaker"
8
9 "github.com/go-kit/kit/circuitbreaker"
10 "github.com/go-kit/kit/endpoint"
11 "github.com/go-kit/kit/log"
12 "github.com/go-kit/kit/metrics"
13 "github.com/go-kit/kit/ratelimit"
14 "github.com/go-kit/kit/tracing/opentracing"
15
16 "github.com/go-kit/kit/examples/addsvc2/pkg/service"
17 )
18
19 // Set collects all of the endpoints that compose an add service. It's meant to
20 // be used as a helper struct, to collect all of the endpoints into a single
21 // parameter.
22 type Set struct {
23 SumEndpoint endpoint.Endpoint
24 ConcatEndpoint endpoint.Endpoint
25 }
26
27 // New returns a Set that wraps the provided server, and wires in all of the
28 // expected endpoint middlewares via the various parameters.
29 func New(svc service.Service, logger log.Logger, duration metrics.Histogram, trace stdopentracing.Tracer) Set {
30 var sumEndpoint endpoint.Endpoint
31 {
32 sumEndpoint = MakeSumEndpoint(svc)
33 sumEndpoint = ratelimit.NewTokenBucketLimiter(rl.NewBucketWithRate(1, 1))(sumEndpoint)
34 sumEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{}))(sumEndpoint)
35 sumEndpoint = opentracing.TraceServer(trace, "Sum")(sumEndpoint)
36 sumEndpoint = LoggingMiddleware(log.With(logger, "method", "Sum"))(sumEndpoint)
37 sumEndpoint = InstrumentingMiddleware(duration.With("method", "Sum"))(sumEndpoint)
38 }
39 var concatEndpoint endpoint.Endpoint
40 {
41 concatEndpoint = MakeConcatEndpoint(svc)
42 concatEndpoint = ratelimit.NewTokenBucketLimiter(rl.NewBucketWithRate(100, 100))(concatEndpoint)
43 concatEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{}))(concatEndpoint)
44 concatEndpoint = opentracing.TraceServer(trace, "Concat")(concatEndpoint)
45 concatEndpoint = LoggingMiddleware(log.With(logger, "method", "Concat"))(concatEndpoint)
46 concatEndpoint = InstrumentingMiddleware(duration.With("method", "Concat"))(concatEndpoint)
47 }
48 return Set{
49 SumEndpoint: sumEndpoint,
50 ConcatEndpoint: concatEndpoint,
51 }
52 }
53
54 // Sum implements the service interface, so Set may be used as a service.
55 // This is primarily useful in the context of a client library.
56 func (s Set) Sum(ctx context.Context, a, b int) (int, error) {
57 resp, err := s.SumEndpoint(ctx, SumRequest{A: a, B: b})
58 if err != nil {
59 return 0, err
60 }
61 response := resp.(SumResponse)
62 return response.V, response.Err
63 }
64
65 // Concat implements the service interface, so Set may be used as a
66 // service. This is primarily useful in the context of a client library.
67 func (s Set) Concat(ctx context.Context, a, b string) (string, error) {
68 resp, err := s.ConcatEndpoint(ctx, ConcatRequest{A: a, B: b})
69 if err != nil {
70 return "", err
71 }
72 response := resp.(ConcatResponse)
73 return response.V, response.Err
74 }
75
76 // MakeSumEndpoint constructs a Sum endpoint wrapping the service.
77 func MakeSumEndpoint(s service.Service) endpoint.Endpoint {
78 return func(ctx context.Context, request interface{}) (response interface{}, err error) {
79 req := request.(SumRequest)
80 v, err := s.Sum(ctx, req.A, req.B)
81 return SumResponse{V: v, Err: err}, nil
82 }
83 }
84
85 // MakeConcatEndpoint constructs a Concat endpoint wrapping the service.
86 func MakeConcatEndpoint(s service.Service) endpoint.Endpoint {
87 return func(ctx context.Context, request interface{}) (response interface{}, err error) {
88 req := request.(ConcatRequest)
89 v, err := s.Concat(ctx, req.A, req.B)
90 return ConcatResponse{V: v, Err: err}, nil
91 }
92 }
93
94 // Failer is an interface that should be implemented by response types.
95 // Response encoders can check if responses are Failer, and if so if they've
96 // failed, and if so encode them using a separate write path based on the error.
97 type Failer interface {
98 Failed() error
99 }
100
101 // SumRequest collects the request parameters for the Sum method.
102 type SumRequest struct {
103 A, B int
104 }
105
106 // SumResponse collects the response values for the Sum method.
107 type SumResponse struct {
108 V int `json:"v"`
109 Err error `json:"-"` // should be intercepted by Failed/errorEncoder
110 }
111
112 // Failed implements Failer.
113 func (r SumResponse) Failed() error { return r.Err }
114
115 // ConcatRequest collects the request parameters for the Concat method.
116 type ConcatRequest struct {
117 A, B string
118 }
119
120 // ConcatResponse collects the response values for the Concat method.
121 type ConcatResponse struct {
122 V string `json:"v"`
123 Err error `json:"-"`
124 }
125
126 // Failed implements Failer.
127 func (r ConcatResponse) Failed() error { return r.Err }
+0
-69
examples/addsvc2/pkg/service/middleware.go less more
0 package service
1
2 import (
3 "context"
4
5 "github.com/go-kit/kit/log"
6 "github.com/go-kit/kit/metrics"
7 )
8
9 // Middleware describes a service (as opposed to endpoint) middleware.
10 type Middleware func(Service) Service
11
12 // LoggingMiddleware takes a logger as a dependency
13 // and returns a ServiceMiddleware.
14 func LoggingMiddleware(logger log.Logger) Middleware {
15 return func(next Service) Service {
16 return loggingMiddleware{logger, next}
17 }
18 }
19
20 type loggingMiddleware struct {
21 logger log.Logger
22 next Service
23 }
24
25 func (mw loggingMiddleware) Sum(ctx context.Context, a, b int) (v int, err error) {
26 defer func() {
27 mw.logger.Log("method", "Sum", "a", a, "b", b, "v", v, "err", err)
28 }()
29 return mw.next.Sum(ctx, a, b)
30 }
31
32 func (mw loggingMiddleware) Concat(ctx context.Context, a, b string) (v string, err error) {
33 defer func() {
34 mw.logger.Log("method", "Concat", "a", a, "b", b, "v", v, "err", err)
35 }()
36 return mw.next.Concat(ctx, a, b)
37 }
38
39 // InstrumentingMiddleware returns a service middleware that instruments
40 // the number of integers summed and characters concatenated over the lifetime of
41 // the service.
42 func InstrumentingMiddleware(ints, chars metrics.Counter) Middleware {
43 return func(next Service) Service {
44 return instrumentingMiddleware{
45 ints: ints,
46 chars: chars,
47 next: next,
48 }
49 }
50 }
51
52 type instrumentingMiddleware struct {
53 ints metrics.Counter
54 chars metrics.Counter
55 next Service
56 }
57
58 func (mw instrumentingMiddleware) Sum(ctx context.Context, a, b int) (int, error) {
59 v, err := mw.next.Sum(ctx, a, b)
60 mw.ints.Add(float64(v))
61 return v, err
62 }
63
64 func (mw instrumentingMiddleware) Concat(ctx context.Context, a, b string) (string, error) {
65 v, err := mw.next.Concat(ctx, a, b)
66 mw.chars.Add(float64(len(v)))
67 return v, err
68 }
+0
-71
examples/addsvc2/pkg/service/service.go less more
0 package service
1
2 import (
3 "context"
4 "errors"
5
6 "github.com/go-kit/kit/log"
7 "github.com/go-kit/kit/metrics"
8 )
9
10 // Service describes a service that adds things together.
11 type Service interface {
12 Sum(ctx context.Context, a, b int) (int, error)
13 Concat(ctx context.Context, a, b string) (string, error)
14 }
15
16 // New returns a basic Service with all of the expected middlewares wired in.
17 func New(logger log.Logger, ints, chars metrics.Counter) Service {
18 var svc Service
19 {
20 svc = NewBasicService()
21 svc = LoggingMiddleware(logger)(svc)
22 svc = InstrumentingMiddleware(ints, chars)(svc)
23 }
24 return svc
25 }
26
27 var (
28 // ErrTwoZeroes is an arbitrary business rule for the Add method.
29 ErrTwoZeroes = errors.New("can't sum two zeroes")
30
31 // ErrIntOverflow protects the Add method. We've decided that this error
32 // indicates a misbehaving service and should count against e.g. circuit
33 // breakers. So, we return it directly in endpoints, to illustrate the
34 // difference. In a real service, this probably wouldn't be the case.
35 ErrIntOverflow = errors.New("integer overflow")
36
37 // ErrMaxSizeExceeded protects the Concat method.
38 ErrMaxSizeExceeded = errors.New("result exceeds maximum size")
39 )
40
41 // NewBasicService returns a naïve, stateless implementation of Service.
42 func NewBasicService() Service {
43 return basicService{}
44 }
45
46 type basicService struct{}
47
48 const (
49 intMax = 1<<31 - 1
50 intMin = -(intMax + 1)
51 maxLen = 10
52 )
53
54 func (s basicService) Sum(_ context.Context, a, b int) (int, error) {
55 if a == 0 && b == 0 {
56 return 0, ErrTwoZeroes
57 }
58 if (b > 0 && a > (intMax-b)) || (b < 0 && a < (intMin-b)) {
59 return 0, ErrIntOverflow
60 }
61 return a + b, nil
62 }
63
64 // Concat implements Service.
65 func (s basicService) Concat(_ context.Context, a, b string) (string, error) {
66 if len(a)+len(b) > maxLen {
67 return "", ErrMaxSizeExceeded
68 }
69 return a + b, nil
70 }
+0
-209
examples/addsvc2/pkg/transport/grpc.go less more
0 package transport
1
2 import (
3 "context"
4 "errors"
5 "time"
6
7 "google.golang.org/grpc"
8
9 jujuratelimit "github.com/juju/ratelimit"
10 stdopentracing "github.com/opentracing/opentracing-go"
11 "github.com/sony/gobreaker"
12 oldcontext "golang.org/x/net/context"
13
14 "github.com/go-kit/kit/circuitbreaker"
15 "github.com/go-kit/kit/endpoint"
16 "github.com/go-kit/kit/log"
17 "github.com/go-kit/kit/ratelimit"
18 "github.com/go-kit/kit/tracing/opentracing"
19 grpctransport "github.com/go-kit/kit/transport/grpc"
20
21 "github.com/go-kit/kit/examples/addsvc2/pb"
22 addendpoint "github.com/go-kit/kit/examples/addsvc2/pkg/endpoint"
23 addservice "github.com/go-kit/kit/examples/addsvc2/pkg/service"
24 )
25
26 type grpcServer struct {
27 sum grpctransport.Handler
28 concat grpctransport.Handler
29 }
30
31 // NewGRPCServer makes a set of endpoints available as a gRPC AddServer.
32 func NewGRPCServer(endpoints addendpoint.Set, tracer stdopentracing.Tracer, logger log.Logger) pb.AddServer {
33 options := []grpctransport.ServerOption{
34 grpctransport.ServerErrorLogger(logger),
35 }
36 return &grpcServer{
37 sum: grpctransport.NewServer(
38 endpoints.SumEndpoint,
39 decodeGRPCSumRequest,
40 encodeGRPCSumResponse,
41 append(options, grpctransport.ServerBefore(opentracing.FromGRPCRequest(tracer, "Sum", logger)))...,
42 ),
43 concat: grpctransport.NewServer(
44 endpoints.ConcatEndpoint,
45 decodeGRPCConcatRequest,
46 encodeGRPCConcatResponse,
47 append(options, grpctransport.ServerBefore(opentracing.FromGRPCRequest(tracer, "Concat", logger)))...,
48 ),
49 }
50 }
51
52 func (s *grpcServer) Sum(ctx oldcontext.Context, req *pb.SumRequest) (*pb.SumReply, error) {
53 _, rep, err := s.sum.ServeGRPC(ctx, req)
54 if err != nil {
55 return nil, err
56 }
57 return rep.(*pb.SumReply), nil
58 }
59
60 func (s *grpcServer) Concat(ctx oldcontext.Context, req *pb.ConcatRequest) (*pb.ConcatReply, error) {
61 _, rep, err := s.concat.ServeGRPC(ctx, req)
62 if err != nil {
63 return nil, err
64 }
65 return rep.(*pb.ConcatReply), nil
66 }
67
68 // NewGRPCClient returns an AddService backed by a gRPC server at the other end
69 // of the conn. The caller is responsible for constructing the conn, and
70 // eventually closing the underlying transport.
71 func NewGRPCClient(conn *grpc.ClientConn, tracer stdopentracing.Tracer, logger log.Logger) addservice.Service {
72 // We construct a single ratelimiter middleware, to limit the total outgoing
73 // QPS from this client to all methods on the remote instance. We also
74 // construct per-endpoint circuitbreaker middlewares to demonstrate how
75 // that's done, although they could easily be combined into a single breaker
76 // for the entire remote instance, too.
77 limiter := ratelimit.NewTokenBucketLimiter(jujuratelimit.NewBucketWithRate(100, 100))
78
79 // Each individual endpoint is an http/transport.Client (which implements
80 // endpoint.Endpoint) that gets wrapped with various middlewares. If you
81 // made your own client library, you'd do this work there, so your server
82 // could rely on a consistent set of client behavior.
83 var sumEndpoint endpoint.Endpoint
84 {
85 sumEndpoint = grpctransport.NewClient(
86 conn,
87 "pb.Add",
88 "Sum",
89 encodeGRPCSumRequest,
90 decodeGRPCSumResponse,
91 pb.SumReply{},
92 grpctransport.ClientBefore(opentracing.ToGRPCRequest(tracer, logger)),
93 ).Endpoint()
94 sumEndpoint = opentracing.TraceClient(tracer, "Sum")(sumEndpoint)
95 sumEndpoint = limiter(sumEndpoint)
96 sumEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{
97 Name: "Sum",
98 Timeout: 30 * time.Second,
99 }))(sumEndpoint)
100 }
101
102 // The Concat endpoint is the same thing, with slightly different
103 // middlewares to demonstrate how to specialize per-endpoint.
104 var concatEndpoint endpoint.Endpoint
105 {
106 concatEndpoint = grpctransport.NewClient(
107 conn,
108 "pb.Add",
109 "Concat",
110 encodeGRPCConcatRequest,
111 decodeGRPCConcatResponse,
112 pb.ConcatReply{},
113 grpctransport.ClientBefore(opentracing.ToGRPCRequest(tracer, logger)),
114 ).Endpoint()
115 concatEndpoint = opentracing.TraceClient(tracer, "Concat")(concatEndpoint)
116 concatEndpoint = limiter(concatEndpoint)
117 concatEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{
118 Name: "Concat",
119 Timeout: 10 * time.Second,
120 }))(concatEndpoint)
121 }
122
123 // Returning the endpoint.Set as a service.Service relies on the
124 // endpoint.Set implementing the Service methods. That's just a simple bit
125 // of glue code.
126 return addendpoint.Set{
127 SumEndpoint: sumEndpoint,
128 ConcatEndpoint: concatEndpoint,
129 }
130 }
131
132 // decodeGRPCSumRequest is a transport/grpc.DecodeRequestFunc that converts a
133 // gRPC sum request to a user-domain sum request. Primarily useful in a server.
134 func decodeGRPCSumRequest(_ context.Context, grpcReq interface{}) (interface{}, error) {
135 req := grpcReq.(*pb.SumRequest)
136 return addendpoint.SumRequest{A: int(req.A), B: int(req.B)}, nil
137 }
138
139 // decodeGRPCConcatRequest is a transport/grpc.DecodeRequestFunc that converts a
140 // gRPC concat request to a user-domain concat request. Primarily useful in a
141 // server.
142 func decodeGRPCConcatRequest(_ context.Context, grpcReq interface{}) (interface{}, error) {
143 req := grpcReq.(*pb.ConcatRequest)
144 return addendpoint.ConcatRequest{A: req.A, B: req.B}, nil
145 }
146
147 // decodeGRPCSumResponse is a transport/grpc.DecodeResponseFunc that converts a
148 // gRPC sum reply to a user-domain sum response. Primarily useful in a client.
149 func decodeGRPCSumResponse(_ context.Context, grpcReply interface{}) (interface{}, error) {
150 reply := grpcReply.(*pb.SumReply)
151 return addendpoint.SumResponse{V: int(reply.V), Err: str2err(reply.Err)}, nil
152 }
153
154 // decodeGRPCConcatResponse is a transport/grpc.DecodeResponseFunc that converts
155 // a gRPC concat reply to a user-domain concat response. Primarily useful in a
156 // client.
157 func decodeGRPCConcatResponse(_ context.Context, grpcReply interface{}) (interface{}, error) {
158 reply := grpcReply.(*pb.ConcatReply)
159 return addendpoint.ConcatResponse{V: reply.V, Err: str2err(reply.Err)}, nil
160 }
161
162 // encodeGRPCSumResponse is a transport/grpc.EncodeResponseFunc that converts a
163 // user-domain sum response to a gRPC sum reply. Primarily useful in a server.
164 func encodeGRPCSumResponse(_ context.Context, response interface{}) (interface{}, error) {
165 resp := response.(addendpoint.SumResponse)
166 return &pb.SumReply{V: int64(resp.V), Err: err2str(resp.Err)}, nil
167 }
168
169 // encodeGRPCConcatResponse is a transport/grpc.EncodeResponseFunc that converts
170 // a user-domain concat response to a gRPC concat reply. Primarily useful in a
171 // server.
172 func encodeGRPCConcatResponse(_ context.Context, response interface{}) (interface{}, error) {
173 resp := response.(addendpoint.ConcatResponse)
174 return &pb.ConcatReply{V: resp.V, Err: err2str(resp.Err)}, nil
175 }
176
177 // encodeGRPCSumRequest is a transport/grpc.EncodeRequestFunc that converts a
178 // user-domain sum request to a gRPC sum request. Primarily useful in a client.
179 func encodeGRPCSumRequest(_ context.Context, request interface{}) (interface{}, error) {
180 req := request.(addendpoint.SumRequest)
181 return &pb.SumRequest{A: int64(req.A), B: int64(req.B)}, nil
182 }
183
184 // encodeGRPCConcatRequest is a transport/grpc.EncodeRequestFunc that converts a
185 // user-domain concat request to a gRPC concat request. Primarily useful in a
186 // client.
187 func encodeGRPCConcatRequest(_ context.Context, request interface{}) (interface{}, error) {
188 req := request.(addendpoint.ConcatRequest)
189 return &pb.ConcatRequest{A: req.A, B: req.B}, nil
190 }
191
192 // These annoying helper functions are required to translate Go error types to
193 // and from strings, which is the type we use in our IDLs to represent errors.
194 // There is special casing to treat empty strings as nil errors.
195
196 func str2err(s string) error {
197 if s == "" {
198 return nil
199 }
200 return errors.New(s)
201 }
202
203 func err2str(err error) string {
204 if err == nil {
205 return ""
206 }
207 return err.Error()
208 }
+0
-218
examples/addsvc2/pkg/transport/http.go less more
0 package transport
1
2 import (
3 "bytes"
4 "context"
5 "encoding/json"
6 "errors"
7 "io/ioutil"
8 "net/http"
9 "net/url"
10 "strings"
11 "time"
12
13 jujuratelimit "github.com/juju/ratelimit"
14 stdopentracing "github.com/opentracing/opentracing-go"
15 "github.com/sony/gobreaker"
16
17 "github.com/go-kit/kit/circuitbreaker"
18 "github.com/go-kit/kit/endpoint"
19 "github.com/go-kit/kit/log"
20 "github.com/go-kit/kit/ratelimit"
21 "github.com/go-kit/kit/tracing/opentracing"
22 httptransport "github.com/go-kit/kit/transport/http"
23
24 addendpoint "github.com/go-kit/kit/examples/addsvc2/pkg/endpoint"
25 addservice "github.com/go-kit/kit/examples/addsvc2/pkg/service"
26 )
27
28 // NewHTTPHandler returns an HTTP handler that makes a set of endpoints
29 // available on predefined paths.
30 func NewHTTPHandler(endpoints addendpoint.Set, tracer stdopentracing.Tracer, logger log.Logger) http.Handler {
31 options := []httptransport.ServerOption{
32 httptransport.ServerErrorEncoder(errorEncoder),
33 httptransport.ServerErrorLogger(logger),
34 }
35 m := http.NewServeMux()
36 m.Handle("/sum", httptransport.NewServer(
37 endpoints.SumEndpoint,
38 decodeHTTPSumRequest,
39 encodeHTTPGenericResponse,
40 append(options, httptransport.ServerBefore(opentracing.FromHTTPRequest(tracer, "Sum", logger)))...,
41 ))
42 m.Handle("/concat", httptransport.NewServer(
43 endpoints.ConcatEndpoint,
44 decodeHTTPConcatRequest,
45 encodeHTTPGenericResponse,
46 append(options, httptransport.ServerBefore(opentracing.FromHTTPRequest(tracer, "Concat", logger)))...,
47 ))
48 return m
49 }
50
51 // NewHTTPClient returns an AddService backed by an HTTP server living at the
52 // remote instance. We expect instance to come from a service discovery system,
53 // so likely of the form "host:port".
54 func NewHTTPClient(instance string, tracer stdopentracing.Tracer, logger log.Logger) (addservice.Service, error) {
55 // Quickly sanitize the instance string.
56 if !strings.HasPrefix(instance, "http") {
57 instance = "http://" + instance
58 }
59 u, err := url.Parse(instance)
60 if err != nil {
61 return nil, err
62 }
63
64 // We construct a single ratelimiter middleware, to limit the total outgoing
65 // QPS from this client to all methods on the remote instance. We also
66 // construct per-endpoint circuitbreaker middlewares to demonstrate how
67 // that's done, although they could easily be combined into a single breaker
68 // for the entire remote instance, too.
69 limiter := ratelimit.NewTokenBucketLimiter(jujuratelimit.NewBucketWithRate(100, 100))
70
71 // Each individual endpoint is an http/transport.Client (which implements
72 // endpoint.Endpoint) that gets wrapped with various middlewares. If you
73 // made your own client library, you'd do this work there, so your server
74 // could rely on a consistent set of client behavior.
75 var sumEndpoint endpoint.Endpoint
76 {
77 sumEndpoint = httptransport.NewClient(
78 "POST",
79 copyURL(u, "/sum"),
80 encodeHTTPGenericRequest,
81 decodeHTTPSumResponse,
82 httptransport.ClientBefore(opentracing.ToHTTPRequest(tracer, logger)),
83 ).Endpoint()
84 sumEndpoint = opentracing.TraceClient(tracer, "Sum")(sumEndpoint)
85 sumEndpoint = limiter(sumEndpoint)
86 sumEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{
87 Name: "Sum",
88 Timeout: 30 * time.Second,
89 }))(sumEndpoint)
90 }
91
92 // The Concat endpoint is the same thing, with slightly different
93 // middlewares to demonstrate how to specialize per-endpoint.
94 var concatEndpoint endpoint.Endpoint
95 {
96 concatEndpoint = httptransport.NewClient(
97 "POST",
98 copyURL(u, "/concat"),
99 encodeHTTPGenericRequest,
100 decodeHTTPConcatResponse,
101 httptransport.ClientBefore(opentracing.ToHTTPRequest(tracer, logger)),
102 ).Endpoint()
103 concatEndpoint = opentracing.TraceClient(tracer, "Concat")(concatEndpoint)
104 concatEndpoint = limiter(concatEndpoint)
105 concatEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{
106 Name: "Concat",
107 Timeout: 10 * time.Second,
108 }))(concatEndpoint)
109 }
110
111 // Returning the endpoint.Set as a service.Service relies on the
112 // endpoint.Set implementing the Service methods. That's just a simple bit
113 // of glue code.
114 return addendpoint.Set{
115 SumEndpoint: sumEndpoint,
116 ConcatEndpoint: concatEndpoint,
117 }, nil
118 }
119
120 func copyURL(base *url.URL, path string) *url.URL {
121 next := *base
122 next.Path = path
123 return &next
124 }
125
126 func errorEncoder(_ context.Context, err error, w http.ResponseWriter) {
127 w.WriteHeader(err2code(err))
128 json.NewEncoder(w).Encode(errorWrapper{Error: err.Error()})
129 }
130
131 func err2code(err error) int {
132 switch err {
133 case addservice.ErrTwoZeroes, addservice.ErrMaxSizeExceeded, addservice.ErrIntOverflow:
134 return http.StatusBadRequest
135 }
136 return http.StatusInternalServerError
137 }
138
139 func errorDecoder(r *http.Response) error {
140 var w errorWrapper
141 if err := json.NewDecoder(r.Body).Decode(&w); err != nil {
142 return err
143 }
144 return errors.New(w.Error)
145 }
146
147 type errorWrapper struct {
148 Error string `json:"error"`
149 }
150
151 // decodeHTTPSumRequest is a transport/http.DecodeRequestFunc that decodes a
152 // JSON-encoded sum request from the HTTP request body. Primarily useful in a
153 // server.
154 func decodeHTTPSumRequest(_ context.Context, r *http.Request) (interface{}, error) {
155 var req addendpoint.SumRequest
156 err := json.NewDecoder(r.Body).Decode(&req)
157 return req, err
158 }
159
160 // decodeHTTPConcatRequest is a transport/http.DecodeRequestFunc that decodes a
161 // JSON-encoded concat request from the HTTP request body. Primarily useful in a
162 // server.
163 func decodeHTTPConcatRequest(_ context.Context, r *http.Request) (interface{}, error) {
164 var req addendpoint.ConcatRequest
165 err := json.NewDecoder(r.Body).Decode(&req)
166 return req, err
167 }
168
169 // decodeHTTPSumResponse is a transport/http.DecodeResponseFunc that decodes a
170 // JSON-encoded sum response from the HTTP response body. If the response has a
171 // non-200 status code, we will interpret that as an error and attempt to decode
172 // the specific error message from the response body. Primarily useful in a
173 // client.
174 func decodeHTTPSumResponse(_ context.Context, r *http.Response) (interface{}, error) {
175 if r.StatusCode != http.StatusOK {
176 return nil, errors.New(r.Status)
177 }
178 var resp addendpoint.SumResponse
179 err := json.NewDecoder(r.Body).Decode(&resp)
180 return resp, err
181 }
182
183 // decodeHTTPConcatResponse is a transport/http.DecodeResponseFunc that decodes
184 // a JSON-encoded concat response from the HTTP response body. If the response
185 // has a non-200 status code, we will interpret that as an error and attempt to
186 // decode the specific error message from the response body. Primarily useful in
187 // a client.
188 func decodeHTTPConcatResponse(_ context.Context, r *http.Response) (interface{}, error) {
189 if r.StatusCode != http.StatusOK {
190 return nil, errors.New(r.Status)
191 }
192 var resp addendpoint.ConcatResponse
193 err := json.NewDecoder(r.Body).Decode(&resp)
194 return resp, err
195 }
196
197 // encodeHTTPGenericRequest is a transport/http.EncodeRequestFunc that
198 // JSON-encodes any request to the request body. Primarily useful in a client.
199 func encodeHTTPGenericRequest(_ context.Context, r *http.Request, request interface{}) error {
200 var buf bytes.Buffer
201 if err := json.NewEncoder(&buf).Encode(request); err != nil {
202 return err
203 }
204 r.Body = ioutil.NopCloser(&buf)
205 return nil
206 }
207
208 // encodeHTTPGenericResponse is a transport/http.EncodeResponseFunc that encodes
209 // the response as JSON to the response writer. Primarily useful in a server.
210 func encodeHTTPGenericResponse(ctx context.Context, w http.ResponseWriter, response interface{}) error {
211 if f, ok := response.(addendpoint.Failer); ok && f.Failed() != nil {
212 errorEncoder(ctx, f.Failed(), w)
213 return nil
214 }
215 w.Header().Set("Content-Type", "application/json; charset=utf-8")
216 return json.NewEncoder(w).Encode(response)
217 }
+0
-119
examples/addsvc2/pkg/transport/thrift.go less more
0 package transport
1
2 import (
3 "context"
4 "time"
5
6 jujuratelimit "github.com/juju/ratelimit"
7 "github.com/sony/gobreaker"
8
9 "github.com/go-kit/kit/circuitbreaker"
10 "github.com/go-kit/kit/endpoint"
11 "github.com/go-kit/kit/ratelimit"
12
13 addendpoint "github.com/go-kit/kit/examples/addsvc2/pkg/endpoint"
14 addservice "github.com/go-kit/kit/examples/addsvc2/pkg/service"
15 thriftadd "github.com/go-kit/kit/examples/addsvc2/thrift/gen-go/addsvc"
16 )
17
18 type thriftServer struct {
19 ctx context.Context
20 endpoints addendpoint.Set
21 }
22
23 // NewThriftServer makes a set of endpoints available as a Thrift service.
24 func NewThriftServer(ctx context.Context, endpoints addendpoint.Set) thriftadd.AddService {
25 return &thriftServer{
26 ctx: ctx,
27 endpoints: endpoints,
28 }
29 }
30
31 func (s *thriftServer) Sum(a int64, b int64) (*thriftadd.SumReply, error) {
32 request := addendpoint.SumRequest{A: int(a), B: int(b)}
33 response, err := s.endpoints.SumEndpoint(s.ctx, request)
34 if err != nil {
35 return nil, err
36 }
37 resp := response.(addendpoint.SumResponse)
38 return &thriftadd.SumReply{Value: int64(resp.V), Err: err2str(resp.Err)}, nil
39 }
40
41 func (s *thriftServer) Concat(a string, b string) (*thriftadd.ConcatReply, error) {
42 request := addendpoint.ConcatRequest{A: a, B: b}
43 response, err := s.endpoints.ConcatEndpoint(s.ctx, request)
44 if err != nil {
45 return nil, err
46 }
47 resp := response.(addendpoint.ConcatResponse)
48 return &thriftadd.ConcatReply{Value: resp.V, Err: err2str(resp.Err)}, nil
49 }
50
51 // NewThriftClient returns an AddService backed by a Thrift server described by
52 // the provided client. The caller is responsible for constructing the client,
53 // and eventually closing the underlying transport.
54 func NewThriftClient(client *thriftadd.AddServiceClient) addservice.Service {
55 // We construct a single ratelimiter middleware, to limit the total outgoing
56 // QPS from this client to all methods on the remote instance. We also
57 // construct per-endpoint circuitbreaker middlewares to demonstrate how
58 // that's done, although they could easily be combined into a single breaker
59 // for the entire remote instance, too.
60 limiter := ratelimit.NewTokenBucketLimiter(jujuratelimit.NewBucketWithRate(100, 100))
61
62 // Each individual endpoint is an http/transport.Client (which implements
63 // endpoint.Endpoint) that gets wrapped with various middlewares. If you
64 // could rely on a consistent set of client behavior.
65 var sumEndpoint endpoint.Endpoint
66 {
67 sumEndpoint = MakeThriftSumEndpoint(client)
68 sumEndpoint = limiter(sumEndpoint)
69 sumEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{
70 Name: "Sum",
71 Timeout: 30 * time.Second,
72 }))(sumEndpoint)
73 }
74
75 // The Concat endpoint is the same thing, with slightly different
76 // middlewares to demonstrate how to specialize per-endpoint.
77 var concatEndpoint endpoint.Endpoint
78 {
79 concatEndpoint = MakeThriftConcatEndpoint(client)
80 concatEndpoint = limiter(concatEndpoint)
81 concatEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{
82 Name: "Concat",
83 Timeout: 10 * time.Second,
84 }))(concatEndpoint)
85 }
86
87 // Returning the endpoint.Set as a service.Service relies on the
88 // endpoint.Set implementing the Service methods. That's just a simple bit
89 // of glue code.
90 return addendpoint.Set{
91 SumEndpoint: sumEndpoint,
92 ConcatEndpoint: concatEndpoint,
93 }
94 }
95
96 // MakeThriftSumEndpoint returns an endpoint that invokes the passed Thrift client.
97 // Useful only in clients, and only until a proper transport/thrift.Client exists.
98 func MakeThriftSumEndpoint(client *thriftadd.AddServiceClient) endpoint.Endpoint {
99 return func(ctx context.Context, request interface{}) (interface{}, error) {
100 req := request.(addendpoint.SumRequest)
101 reply, err := client.Sum(int64(req.A), int64(req.B))
102 if err == addservice.ErrIntOverflow {
103 return nil, err // special case; see comment on ErrIntOverflow
104 }
105 return addendpoint.SumResponse{V: int(reply.Value), Err: err}, nil
106 }
107 }
108
109 // MakeThriftConcatEndpoint returns an endpoint that invokes the passed Thrift
110 // client. Useful only in clients, and only until a proper
111 // transport/thrift.Client exists.
112 func MakeThriftConcatEndpoint(client *thriftadd.AddServiceClient) endpoint.Endpoint {
113 return func(ctx context.Context, request interface{}) (interface{}, error) {
114 req := request.(addendpoint.ConcatRequest)
115 reply, err := client.Concat(req.A, req.B)
116 return addendpoint.ConcatResponse{V: reply.Value, Err: err}, nil
117 }
118 }