diff --git a/examples/addsvc2/cmd/addcli/addcli.go b/examples/addsvc2/cmd/addcli/addcli.go index 01ee31f..7af3ce0 100644 --- a/examples/addsvc2/cmd/addcli/addcli.go +++ b/examples/addsvc2/cmd/addcli/addcli.go @@ -20,8 +20,8 @@ "github.com/go-kit/kit/log" - addservice "github.com/go-kit/kit/examples/addsvc2/pkg/service" - addtransport "github.com/go-kit/kit/examples/addsvc2/pkg/transport" + "github.com/go-kit/kit/examples/addsvc2/pkg/addservice" + "github.com/go-kit/kit/examples/addsvc2/pkg/addtransport" addthrift "github.com/go-kit/kit/examples/addsvc2/thrift/gen-go/addsvc" ) diff --git a/examples/addsvc2/cmd/addsvc/addsvc.go b/examples/addsvc2/cmd/addsvc/addsvc.go index d7e3500..f5dc077 100644 --- a/examples/addsvc2/cmd/addsvc/addsvc.go +++ b/examples/addsvc2/cmd/addsvc/addsvc.go @@ -27,9 +27,9 @@ "github.com/go-kit/kit/metrics/prometheus" addpb "github.com/go-kit/kit/examples/addsvc2/pb" - addendpoint "github.com/go-kit/kit/examples/addsvc2/pkg/endpoint" - addservice "github.com/go-kit/kit/examples/addsvc2/pkg/service" - addtransport "github.com/go-kit/kit/examples/addsvc2/pkg/transport" + "github.com/go-kit/kit/examples/addsvc2/pkg/addendpoint" + "github.com/go-kit/kit/examples/addsvc2/pkg/addservice" + "github.com/go-kit/kit/examples/addsvc2/pkg/addtransport" addthrift "github.com/go-kit/kit/examples/addsvc2/thrift/gen-go/addsvc" ) diff --git a/examples/addsvc2/cmd/addsvc/wiring_test.go b/examples/addsvc2/cmd/addsvc/wiring_test.go index e2133f4..fb3dbe0 100644 --- a/examples/addsvc2/cmd/addsvc/wiring_test.go +++ b/examples/addsvc2/cmd/addsvc/wiring_test.go @@ -1,26 +1,26 @@ package main import ( - "context" "io/ioutil" "net/http" "net/http/httptest" "strings" "testing" + "github.com/opentracing/opentracing-go" + "github.com/go-kit/kit/log" "github.com/go-kit/kit/metrics/discard" - "github.com/opentracing/opentracing-go" - addendpoint "github.com/go-kit/kit/examples/addsvc2/pkg/endpoint" - addservice "github.com/go-kit/kit/examples/addsvc2/pkg/service" - addtransport "github.com/go-kit/kit/examples/addsvc2/pkg/transport" + "github.com/go-kit/kit/examples/addsvc2/pkg/addendpoint" + "github.com/go-kit/kit/examples/addsvc2/pkg/addservice" + "github.com/go-kit/kit/examples/addsvc2/pkg/addtransport" ) func TestHTTP(t *testing.T) { svc := addservice.New(log.NewNopLogger(), discard.NewCounter(), discard.NewCounter()) eps := addendpoint.New(svc, log.NewNopLogger(), discard.NewHistogram(), opentracing.GlobalTracer()) - mux := addtransport.NewHTTPHandler(context.Background(), eps, log.NewNopLogger(), opentracing.GlobalTracer()) + mux := addtransport.NewHTTPHandler(eps, opentracing.GlobalTracer(), log.NewNopLogger()) srv := httptest.NewServer(mux) defer srv.Close() diff --git a/examples/addsvc2/pkg/addendpoint/middleware.go b/examples/addsvc2/pkg/addendpoint/middleware.go new file mode 100644 index 0000000..c83047b --- /dev/null +++ b/examples/addsvc2/pkg/addendpoint/middleware.go @@ -0,0 +1,43 @@ +package addendpoint + +import ( + "context" + "fmt" + "time" + + "github.com/go-kit/kit/endpoint" + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/metrics" +) + +// InstrumentingMiddleware returns an endpoint middleware that records +// the duration of each invocation to the passed histogram. The middleware adds +// a single field: "success", which is "true" if no error is returned, and +// "false" otherwise. +func InstrumentingMiddleware(duration metrics.Histogram) endpoint.Middleware { + return func(next endpoint.Endpoint) endpoint.Endpoint { + return func(ctx context.Context, request interface{}) (response interface{}, err error) { + + defer func(begin time.Time) { + duration.With("success", fmt.Sprint(err == nil)).Observe(time.Since(begin).Seconds()) + }(time.Now()) + return next(ctx, request) + + } + } +} + +// LoggingMiddleware returns an endpoint middleware that logs the +// duration of each invocation, and the resulting error, if any. +func LoggingMiddleware(logger log.Logger) endpoint.Middleware { + return func(next endpoint.Endpoint) endpoint.Endpoint { + return func(ctx context.Context, request interface{}) (response interface{}, err error) { + + defer func(begin time.Time) { + logger.Log("transport_error", err, "took", time.Since(begin)) + }(time.Now()) + return next(ctx, request) + + } + } +} diff --git a/examples/addsvc2/pkg/addendpoint/set.go b/examples/addsvc2/pkg/addendpoint/set.go new file mode 100644 index 0000000..c8c1c6f --- /dev/null +++ b/examples/addsvc2/pkg/addendpoint/set.go @@ -0,0 +1,128 @@ +package addendpoint + +import ( + "context" + + rl "github.com/juju/ratelimit" + stdopentracing "github.com/opentracing/opentracing-go" + "github.com/sony/gobreaker" + + "github.com/go-kit/kit/circuitbreaker" + "github.com/go-kit/kit/endpoint" + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/metrics" + "github.com/go-kit/kit/ratelimit" + "github.com/go-kit/kit/tracing/opentracing" + + "github.com/go-kit/kit/examples/addsvc2/pkg/addservice" +) + +// Set collects all of the endpoints that compose an add service. It's meant to +// be used as a helper struct, to collect all of the endpoints into a single +// parameter. +type Set struct { + SumEndpoint endpoint.Endpoint + ConcatEndpoint endpoint.Endpoint +} + +// New returns a Set that wraps the provided server, and wires in all of the +// expected endpoint middlewares via the various parameters. +func New(svc addservice.Service, logger log.Logger, duration metrics.Histogram, trace stdopentracing.Tracer) Set { + var sumEndpoint endpoint.Endpoint + { + sumEndpoint = MakeSumEndpoint(svc) + sumEndpoint = ratelimit.NewTokenBucketLimiter(rl.NewBucketWithRate(1, 1))(sumEndpoint) + sumEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{}))(sumEndpoint) + sumEndpoint = opentracing.TraceServer(trace, "Sum")(sumEndpoint) + sumEndpoint = LoggingMiddleware(log.With(logger, "method", "Sum"))(sumEndpoint) + sumEndpoint = InstrumentingMiddleware(duration.With("method", "Sum"))(sumEndpoint) + } + var concatEndpoint endpoint.Endpoint + { + concatEndpoint = MakeConcatEndpoint(svc) + concatEndpoint = ratelimit.NewTokenBucketLimiter(rl.NewBucketWithRate(100, 100))(concatEndpoint) + concatEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{}))(concatEndpoint) + concatEndpoint = opentracing.TraceServer(trace, "Concat")(concatEndpoint) + concatEndpoint = LoggingMiddleware(log.With(logger, "method", "Concat"))(concatEndpoint) + concatEndpoint = InstrumentingMiddleware(duration.With("method", "Concat"))(concatEndpoint) + } + return Set{ + SumEndpoint: sumEndpoint, + ConcatEndpoint: concatEndpoint, + } +} + +// Sum implements the service interface, so Set may be used as a service. +// This is primarily useful in the context of a client library. +func (s Set) Sum(ctx context.Context, a, b int) (int, error) { + resp, err := s.SumEndpoint(ctx, SumRequest{A: a, B: b}) + if err != nil { + return 0, err + } + response := resp.(SumResponse) + return response.V, response.Err +} + +// Concat implements the service interface, so Set may be used as a +// service. This is primarily useful in the context of a client library. +func (s Set) Concat(ctx context.Context, a, b string) (string, error) { + resp, err := s.ConcatEndpoint(ctx, ConcatRequest{A: a, B: b}) + if err != nil { + return "", err + } + response := resp.(ConcatResponse) + return response.V, response.Err +} + +// MakeSumEndpoint constructs a Sum endpoint wrapping the service. +func MakeSumEndpoint(s addservice.Service) endpoint.Endpoint { + return func(ctx context.Context, request interface{}) (response interface{}, err error) { + req := request.(SumRequest) + v, err := s.Sum(ctx, req.A, req.B) + return SumResponse{V: v, Err: err}, nil + } +} + +// MakeConcatEndpoint constructs a Concat endpoint wrapping the service. +func MakeConcatEndpoint(s addservice.Service) endpoint.Endpoint { + return func(ctx context.Context, request interface{}) (response interface{}, err error) { + req := request.(ConcatRequest) + v, err := s.Concat(ctx, req.A, req.B) + return ConcatResponse{V: v, Err: err}, nil + } +} + +// Failer is an interface that should be implemented by response types. +// Response encoders can check if responses are Failer, and if so if they've +// failed, and if so encode them using a separate write path based on the error. +type Failer interface { + Failed() error +} + +// SumRequest collects the request parameters for the Sum method. +type SumRequest struct { + A, B int +} + +// SumResponse collects the response values for the Sum method. +type SumResponse struct { + V int `json:"v"` + Err error `json:"-"` // should be intercepted by Failed/errorEncoder +} + +// Failed implements Failer. +func (r SumResponse) Failed() error { return r.Err } + +// ConcatRequest collects the request parameters for the Concat method. +type ConcatRequest struct { + A, B string +} + +// ConcatResponse collects the response values for the Concat method. +type ConcatResponse struct { + V string `json:"v"` + Err error `json:"-"` +} + +// Failed implements Failer. +func (r ConcatResponse) Failed() error { return r.Err } diff --git a/examples/addsvc2/pkg/addservice/middleware.go b/examples/addsvc2/pkg/addservice/middleware.go new file mode 100644 index 0000000..5a1d6ee --- /dev/null +++ b/examples/addsvc2/pkg/addservice/middleware.go @@ -0,0 +1,69 @@ +package addservice + +import ( + "context" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/metrics" +) + +// Middleware describes a service (as opposed to endpoint) middleware. +type Middleware func(Service) Service + +// LoggingMiddleware takes a logger as a dependency +// and returns a ServiceMiddleware. +func LoggingMiddleware(logger log.Logger) Middleware { + return func(next Service) Service { + return loggingMiddleware{logger, next} + } +} + +type loggingMiddleware struct { + logger log.Logger + next Service +} + +func (mw loggingMiddleware) Sum(ctx context.Context, a, b int) (v int, err error) { + defer func() { + mw.logger.Log("method", "Sum", "a", a, "b", b, "v", v, "err", err) + }() + return mw.next.Sum(ctx, a, b) +} + +func (mw loggingMiddleware) Concat(ctx context.Context, a, b string) (v string, err error) { + defer func() { + mw.logger.Log("method", "Concat", "a", a, "b", b, "v", v, "err", err) + }() + return mw.next.Concat(ctx, a, b) +} + +// InstrumentingMiddleware returns a service middleware that instruments +// the number of integers summed and characters concatenated over the lifetime of +// the service. +func InstrumentingMiddleware(ints, chars metrics.Counter) Middleware { + return func(next Service) Service { + return instrumentingMiddleware{ + ints: ints, + chars: chars, + next: next, + } + } +} + +type instrumentingMiddleware struct { + ints metrics.Counter + chars metrics.Counter + next Service +} + +func (mw instrumentingMiddleware) Sum(ctx context.Context, a, b int) (int, error) { + v, err := mw.next.Sum(ctx, a, b) + mw.ints.Add(float64(v)) + return v, err +} + +func (mw instrumentingMiddleware) Concat(ctx context.Context, a, b string) (string, error) { + v, err := mw.next.Concat(ctx, a, b) + mw.chars.Add(float64(len(v))) + return v, err +} diff --git a/examples/addsvc2/pkg/addservice/service.go b/examples/addsvc2/pkg/addservice/service.go new file mode 100644 index 0000000..d884373 --- /dev/null +++ b/examples/addsvc2/pkg/addservice/service.go @@ -0,0 +1,71 @@ +package addservice + +import ( + "context" + "errors" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/metrics" +) + +// Service describes a service that adds things together. +type Service interface { + Sum(ctx context.Context, a, b int) (int, error) + Concat(ctx context.Context, a, b string) (string, error) +} + +// New returns a basic Service with all of the expected middlewares wired in. +func New(logger log.Logger, ints, chars metrics.Counter) Service { + var svc Service + { + svc = NewBasicService() + svc = LoggingMiddleware(logger)(svc) + svc = InstrumentingMiddleware(ints, chars)(svc) + } + return svc +} + +var ( + // ErrTwoZeroes is an arbitrary business rule for the Add method. + ErrTwoZeroes = errors.New("can't sum two zeroes") + + // ErrIntOverflow protects the Add method. We've decided that this error + // indicates a misbehaving service and should count against e.g. circuit + // breakers. So, we return it directly in endpoints, to illustrate the + // difference. In a real service, this probably wouldn't be the case. + ErrIntOverflow = errors.New("integer overflow") + + // ErrMaxSizeExceeded protects the Concat method. + ErrMaxSizeExceeded = errors.New("result exceeds maximum size") +) + +// NewBasicService returns a naïve, stateless implementation of Service. +func NewBasicService() Service { + return basicService{} +} + +type basicService struct{} + +const ( + intMax = 1<<31 - 1 + intMin = -(intMax + 1) + maxLen = 10 +) + +func (s basicService) Sum(_ context.Context, a, b int) (int, error) { + if a == 0 && b == 0 { + return 0, ErrTwoZeroes + } + if (b > 0 && a > (intMax-b)) || (b < 0 && a < (intMin-b)) { + return 0, ErrIntOverflow + } + return a + b, nil +} + +// Concat implements Service. +func (s basicService) Concat(_ context.Context, a, b string) (string, error) { + if len(a)+len(b) > maxLen { + return "", ErrMaxSizeExceeded + } + return a + b, nil +} diff --git a/examples/addsvc2/pkg/addtransport/grpc.go b/examples/addsvc2/pkg/addtransport/grpc.go new file mode 100644 index 0000000..f8e4c2b --- /dev/null +++ b/examples/addsvc2/pkg/addtransport/grpc.go @@ -0,0 +1,209 @@ +package addtransport + +import ( + "context" + "errors" + "time" + + "google.golang.org/grpc" + + jujuratelimit "github.com/juju/ratelimit" + stdopentracing "github.com/opentracing/opentracing-go" + "github.com/sony/gobreaker" + oldcontext "golang.org/x/net/context" + + "github.com/go-kit/kit/circuitbreaker" + "github.com/go-kit/kit/endpoint" + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/ratelimit" + "github.com/go-kit/kit/tracing/opentracing" + grpctransport "github.com/go-kit/kit/transport/grpc" + + "github.com/go-kit/kit/examples/addsvc2/pb" + "github.com/go-kit/kit/examples/addsvc2/pkg/addendpoint" + "github.com/go-kit/kit/examples/addsvc2/pkg/addservice" +) + +type grpcServer struct { + sum grpctransport.Handler + concat grpctransport.Handler +} + +// NewGRPCServer makes a set of endpoints available as a gRPC AddServer. +func NewGRPCServer(endpoints addendpoint.Set, tracer stdopentracing.Tracer, logger log.Logger) pb.AddServer { + options := []grpctransport.ServerOption{ + grpctransport.ServerErrorLogger(logger), + } + return &grpcServer{ + sum: grpctransport.NewServer( + endpoints.SumEndpoint, + decodeGRPCSumRequest, + encodeGRPCSumResponse, + append(options, grpctransport.ServerBefore(opentracing.FromGRPCRequest(tracer, "Sum", logger)))..., + ), + concat: grpctransport.NewServer( + endpoints.ConcatEndpoint, + decodeGRPCConcatRequest, + encodeGRPCConcatResponse, + append(options, grpctransport.ServerBefore(opentracing.FromGRPCRequest(tracer, "Concat", logger)))..., + ), + } +} + +func (s *grpcServer) Sum(ctx oldcontext.Context, req *pb.SumRequest) (*pb.SumReply, error) { + _, rep, err := s.sum.ServeGRPC(ctx, req) + if err != nil { + return nil, err + } + return rep.(*pb.SumReply), nil +} + +func (s *grpcServer) Concat(ctx oldcontext.Context, req *pb.ConcatRequest) (*pb.ConcatReply, error) { + _, rep, err := s.concat.ServeGRPC(ctx, req) + if err != nil { + return nil, err + } + return rep.(*pb.ConcatReply), nil +} + +// NewGRPCClient returns an AddService backed by a gRPC server at the other end +// of the conn. The caller is responsible for constructing the conn, and +// eventually closing the underlying transport. +func NewGRPCClient(conn *grpc.ClientConn, tracer stdopentracing.Tracer, logger log.Logger) addservice.Service { + // We construct a single ratelimiter middleware, to limit the total outgoing + // QPS from this client to all methods on the remote instance. We also + // construct per-endpoint circuitbreaker middlewares to demonstrate how + // that's done, although they could easily be combined into a single breaker + // for the entire remote instance, too. + limiter := ratelimit.NewTokenBucketLimiter(jujuratelimit.NewBucketWithRate(100, 100)) + + // Each individual endpoint is an http/transport.Client (which implements + // endpoint.Endpoint) that gets wrapped with various middlewares. If you + // made your own client library, you'd do this work there, so your server + // could rely on a consistent set of client behavior. + var sumEndpoint endpoint.Endpoint + { + sumEndpoint = grpctransport.NewClient( + conn, + "pb.Add", + "Sum", + encodeGRPCSumRequest, + decodeGRPCSumResponse, + pb.SumReply{}, + grpctransport.ClientBefore(opentracing.ToGRPCRequest(tracer, logger)), + ).Endpoint() + sumEndpoint = opentracing.TraceClient(tracer, "Sum")(sumEndpoint) + sumEndpoint = limiter(sumEndpoint) + sumEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{ + Name: "Sum", + Timeout: 30 * time.Second, + }))(sumEndpoint) + } + + // The Concat endpoint is the same thing, with slightly different + // middlewares to demonstrate how to specialize per-endpoint. + var concatEndpoint endpoint.Endpoint + { + concatEndpoint = grpctransport.NewClient( + conn, + "pb.Add", + "Concat", + encodeGRPCConcatRequest, + decodeGRPCConcatResponse, + pb.ConcatReply{}, + grpctransport.ClientBefore(opentracing.ToGRPCRequest(tracer, logger)), + ).Endpoint() + concatEndpoint = opentracing.TraceClient(tracer, "Concat")(concatEndpoint) + concatEndpoint = limiter(concatEndpoint) + concatEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{ + Name: "Concat", + Timeout: 10 * time.Second, + }))(concatEndpoint) + } + + // Returning the endpoint.Set as a service.Service relies on the + // endpoint.Set implementing the Service methods. That's just a simple bit + // of glue code. + return addendpoint.Set{ + SumEndpoint: sumEndpoint, + ConcatEndpoint: concatEndpoint, + } +} + +// decodeGRPCSumRequest is a transport/grpc.DecodeRequestFunc that converts a +// gRPC sum request to a user-domain sum request. Primarily useful in a server. +func decodeGRPCSumRequest(_ context.Context, grpcReq interface{}) (interface{}, error) { + req := grpcReq.(*pb.SumRequest) + return addendpoint.SumRequest{A: int(req.A), B: int(req.B)}, nil +} + +// decodeGRPCConcatRequest is a transport/grpc.DecodeRequestFunc that converts a +// gRPC concat request to a user-domain concat request. Primarily useful in a +// server. +func decodeGRPCConcatRequest(_ context.Context, grpcReq interface{}) (interface{}, error) { + req := grpcReq.(*pb.ConcatRequest) + return addendpoint.ConcatRequest{A: req.A, B: req.B}, nil +} + +// decodeGRPCSumResponse is a transport/grpc.DecodeResponseFunc that converts a +// gRPC sum reply to a user-domain sum response. Primarily useful in a client. +func decodeGRPCSumResponse(_ context.Context, grpcReply interface{}) (interface{}, error) { + reply := grpcReply.(*pb.SumReply) + return addendpoint.SumResponse{V: int(reply.V), Err: str2err(reply.Err)}, nil +} + +// decodeGRPCConcatResponse is a transport/grpc.DecodeResponseFunc that converts +// a gRPC concat reply to a user-domain concat response. Primarily useful in a +// client. +func decodeGRPCConcatResponse(_ context.Context, grpcReply interface{}) (interface{}, error) { + reply := grpcReply.(*pb.ConcatReply) + return addendpoint.ConcatResponse{V: reply.V, Err: str2err(reply.Err)}, nil +} + +// encodeGRPCSumResponse is a transport/grpc.EncodeResponseFunc that converts a +// user-domain sum response to a gRPC sum reply. Primarily useful in a server. +func encodeGRPCSumResponse(_ context.Context, response interface{}) (interface{}, error) { + resp := response.(addendpoint.SumResponse) + return &pb.SumReply{V: int64(resp.V), Err: err2str(resp.Err)}, nil +} + +// encodeGRPCConcatResponse is a transport/grpc.EncodeResponseFunc that converts +// a user-domain concat response to a gRPC concat reply. Primarily useful in a +// server. +func encodeGRPCConcatResponse(_ context.Context, response interface{}) (interface{}, error) { + resp := response.(addendpoint.ConcatResponse) + return &pb.ConcatReply{V: resp.V, Err: err2str(resp.Err)}, nil +} + +// encodeGRPCSumRequest is a transport/grpc.EncodeRequestFunc that converts a +// user-domain sum request to a gRPC sum request. Primarily useful in a client. +func encodeGRPCSumRequest(_ context.Context, request interface{}) (interface{}, error) { + req := request.(addendpoint.SumRequest) + return &pb.SumRequest{A: int64(req.A), B: int64(req.B)}, nil +} + +// encodeGRPCConcatRequest is a transport/grpc.EncodeRequestFunc that converts a +// user-domain concat request to a gRPC concat request. Primarily useful in a +// client. +func encodeGRPCConcatRequest(_ context.Context, request interface{}) (interface{}, error) { + req := request.(addendpoint.ConcatRequest) + return &pb.ConcatRequest{A: req.A, B: req.B}, nil +} + +// These annoying helper functions are required to translate Go error types to +// and from strings, which is the type we use in our IDLs to represent errors. +// There is special casing to treat empty strings as nil errors. + +func str2err(s string) error { + if s == "" { + return nil + } + return errors.New(s) +} + +func err2str(err error) string { + if err == nil { + return "" + } + return err.Error() +} diff --git a/examples/addsvc2/pkg/addtransport/http.go b/examples/addsvc2/pkg/addtransport/http.go new file mode 100644 index 0000000..5ab8e67 --- /dev/null +++ b/examples/addsvc2/pkg/addtransport/http.go @@ -0,0 +1,218 @@ +package addtransport + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "io/ioutil" + "net/http" + "net/url" + "strings" + "time" + + jujuratelimit "github.com/juju/ratelimit" + stdopentracing "github.com/opentracing/opentracing-go" + "github.com/sony/gobreaker" + + "github.com/go-kit/kit/circuitbreaker" + "github.com/go-kit/kit/endpoint" + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/ratelimit" + "github.com/go-kit/kit/tracing/opentracing" + httptransport "github.com/go-kit/kit/transport/http" + + "github.com/go-kit/kit/examples/addsvc2/pkg/addendpoint" + "github.com/go-kit/kit/examples/addsvc2/pkg/addservice" +) + +// NewHTTPHandler returns an HTTP handler that makes a set of endpoints +// available on predefined paths. +func NewHTTPHandler(endpoints addendpoint.Set, tracer stdopentracing.Tracer, logger log.Logger) http.Handler { + options := []httptransport.ServerOption{ + httptransport.ServerErrorEncoder(errorEncoder), + httptransport.ServerErrorLogger(logger), + } + m := http.NewServeMux() + m.Handle("/sum", httptransport.NewServer( + endpoints.SumEndpoint, + decodeHTTPSumRequest, + encodeHTTPGenericResponse, + append(options, httptransport.ServerBefore(opentracing.FromHTTPRequest(tracer, "Sum", logger)))..., + )) + m.Handle("/concat", httptransport.NewServer( + endpoints.ConcatEndpoint, + decodeHTTPConcatRequest, + encodeHTTPGenericResponse, + append(options, httptransport.ServerBefore(opentracing.FromHTTPRequest(tracer, "Concat", logger)))..., + )) + return m +} + +// NewHTTPClient returns an AddService backed by an HTTP server living at the +// remote instance. We expect instance to come from a service discovery system, +// so likely of the form "host:port". +func NewHTTPClient(instance string, tracer stdopentracing.Tracer, logger log.Logger) (addservice.Service, error) { + // Quickly sanitize the instance string. + if !strings.HasPrefix(instance, "http") { + instance = "http://" + instance + } + u, err := url.Parse(instance) + if err != nil { + return nil, err + } + + // We construct a single ratelimiter middleware, to limit the total outgoing + // QPS from this client to all methods on the remote instance. We also + // construct per-endpoint circuitbreaker middlewares to demonstrate how + // that's done, although they could easily be combined into a single breaker + // for the entire remote instance, too. + limiter := ratelimit.NewTokenBucketLimiter(jujuratelimit.NewBucketWithRate(100, 100)) + + // Each individual endpoint is an http/transport.Client (which implements + // endpoint.Endpoint) that gets wrapped with various middlewares. If you + // made your own client library, you'd do this work there, so your server + // could rely on a consistent set of client behavior. + var sumEndpoint endpoint.Endpoint + { + sumEndpoint = httptransport.NewClient( + "POST", + copyURL(u, "/sum"), + encodeHTTPGenericRequest, + decodeHTTPSumResponse, + httptransport.ClientBefore(opentracing.ToHTTPRequest(tracer, logger)), + ).Endpoint() + sumEndpoint = opentracing.TraceClient(tracer, "Sum")(sumEndpoint) + sumEndpoint = limiter(sumEndpoint) + sumEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{ + Name: "Sum", + Timeout: 30 * time.Second, + }))(sumEndpoint) + } + + // The Concat endpoint is the same thing, with slightly different + // middlewares to demonstrate how to specialize per-endpoint. + var concatEndpoint endpoint.Endpoint + { + concatEndpoint = httptransport.NewClient( + "POST", + copyURL(u, "/concat"), + encodeHTTPGenericRequest, + decodeHTTPConcatResponse, + httptransport.ClientBefore(opentracing.ToHTTPRequest(tracer, logger)), + ).Endpoint() + concatEndpoint = opentracing.TraceClient(tracer, "Concat")(concatEndpoint) + concatEndpoint = limiter(concatEndpoint) + concatEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{ + Name: "Concat", + Timeout: 10 * time.Second, + }))(concatEndpoint) + } + + // Returning the endpoint.Set as a service.Service relies on the + // endpoint.Set implementing the Service methods. That's just a simple bit + // of glue code. + return addendpoint.Set{ + SumEndpoint: sumEndpoint, + ConcatEndpoint: concatEndpoint, + }, nil +} + +func copyURL(base *url.URL, path string) *url.URL { + next := *base + next.Path = path + return &next +} + +func errorEncoder(_ context.Context, err error, w http.ResponseWriter) { + w.WriteHeader(err2code(err)) + json.NewEncoder(w).Encode(errorWrapper{Error: err.Error()}) +} + +func err2code(err error) int { + switch err { + case addservice.ErrTwoZeroes, addservice.ErrMaxSizeExceeded, addservice.ErrIntOverflow: + return http.StatusBadRequest + } + return http.StatusInternalServerError +} + +func errorDecoder(r *http.Response) error { + var w errorWrapper + if err := json.NewDecoder(r.Body).Decode(&w); err != nil { + return err + } + return errors.New(w.Error) +} + +type errorWrapper struct { + Error string `json:"error"` +} + +// decodeHTTPSumRequest is a transport/http.DecodeRequestFunc that decodes a +// JSON-encoded sum request from the HTTP request body. Primarily useful in a +// server. +func decodeHTTPSumRequest(_ context.Context, r *http.Request) (interface{}, error) { + var req addendpoint.SumRequest + err := json.NewDecoder(r.Body).Decode(&req) + return req, err +} + +// decodeHTTPConcatRequest is a transport/http.DecodeRequestFunc that decodes a +// JSON-encoded concat request from the HTTP request body. Primarily useful in a +// server. +func decodeHTTPConcatRequest(_ context.Context, r *http.Request) (interface{}, error) { + var req addendpoint.ConcatRequest + err := json.NewDecoder(r.Body).Decode(&req) + return req, err +} + +// decodeHTTPSumResponse is a transport/http.DecodeResponseFunc that decodes a +// JSON-encoded sum response from the HTTP response body. If the response has a +// non-200 status code, we will interpret that as an error and attempt to decode +// the specific error message from the response body. Primarily useful in a +// client. +func decodeHTTPSumResponse(_ context.Context, r *http.Response) (interface{}, error) { + if r.StatusCode != http.StatusOK { + return nil, errors.New(r.Status) + } + var resp addendpoint.SumResponse + err := json.NewDecoder(r.Body).Decode(&resp) + return resp, err +} + +// decodeHTTPConcatResponse is a transport/http.DecodeResponseFunc that decodes +// a JSON-encoded concat response from the HTTP response body. If the response +// has a non-200 status code, we will interpret that as an error and attempt to +// decode the specific error message from the response body. Primarily useful in +// a client. +func decodeHTTPConcatResponse(_ context.Context, r *http.Response) (interface{}, error) { + if r.StatusCode != http.StatusOK { + return nil, errors.New(r.Status) + } + var resp addendpoint.ConcatResponse + err := json.NewDecoder(r.Body).Decode(&resp) + return resp, err +} + +// encodeHTTPGenericRequest is a transport/http.EncodeRequestFunc that +// JSON-encodes any request to the request body. Primarily useful in a client. +func encodeHTTPGenericRequest(_ context.Context, r *http.Request, request interface{}) error { + var buf bytes.Buffer + if err := json.NewEncoder(&buf).Encode(request); err != nil { + return err + } + r.Body = ioutil.NopCloser(&buf) + return nil +} + +// encodeHTTPGenericResponse is a transport/http.EncodeResponseFunc that encodes +// the response as JSON to the response writer. Primarily useful in a server. +func encodeHTTPGenericResponse(ctx context.Context, w http.ResponseWriter, response interface{}) error { + if f, ok := response.(addendpoint.Failer); ok && f.Failed() != nil { + errorEncoder(ctx, f.Failed(), w) + return nil + } + w.Header().Set("Content-Type", "application/json; charset=utf-8") + return json.NewEncoder(w).Encode(response) +} diff --git a/examples/addsvc2/pkg/addtransport/thrift.go b/examples/addsvc2/pkg/addtransport/thrift.go new file mode 100644 index 0000000..ebbfc36 --- /dev/null +++ b/examples/addsvc2/pkg/addtransport/thrift.go @@ -0,0 +1,119 @@ +package addtransport + +import ( + "context" + "time" + + jujuratelimit "github.com/juju/ratelimit" + "github.com/sony/gobreaker" + + "github.com/go-kit/kit/circuitbreaker" + "github.com/go-kit/kit/endpoint" + "github.com/go-kit/kit/ratelimit" + + "github.com/go-kit/kit/examples/addsvc2/pkg/addendpoint" + "github.com/go-kit/kit/examples/addsvc2/pkg/addservice" + thriftadd "github.com/go-kit/kit/examples/addsvc2/thrift/gen-go/addsvc" +) + +type thriftServer struct { + ctx context.Context + endpoints addendpoint.Set +} + +// NewThriftServer makes a set of endpoints available as a Thrift service. +func NewThriftServer(ctx context.Context, endpoints addendpoint.Set) thriftadd.AddService { + return &thriftServer{ + ctx: ctx, + endpoints: endpoints, + } +} + +func (s *thriftServer) Sum(a int64, b int64) (*thriftadd.SumReply, error) { + request := addendpoint.SumRequest{A: int(a), B: int(b)} + response, err := s.endpoints.SumEndpoint(s.ctx, request) + if err != nil { + return nil, err + } + resp := response.(addendpoint.SumResponse) + return &thriftadd.SumReply{Value: int64(resp.V), Err: err2str(resp.Err)}, nil +} + +func (s *thriftServer) Concat(a string, b string) (*thriftadd.ConcatReply, error) { + request := addendpoint.ConcatRequest{A: a, B: b} + response, err := s.endpoints.ConcatEndpoint(s.ctx, request) + if err != nil { + return nil, err + } + resp := response.(addendpoint.ConcatResponse) + return &thriftadd.ConcatReply{Value: resp.V, Err: err2str(resp.Err)}, nil +} + +// NewThriftClient returns an AddService backed by a Thrift server described by +// the provided client. The caller is responsible for constructing the client, +// and eventually closing the underlying transport. +func NewThriftClient(client *thriftadd.AddServiceClient) addservice.Service { + // We construct a single ratelimiter middleware, to limit the total outgoing + // QPS from this client to all methods on the remote instance. We also + // construct per-endpoint circuitbreaker middlewares to demonstrate how + // that's done, although they could easily be combined into a single breaker + // for the entire remote instance, too. + limiter := ratelimit.NewTokenBucketLimiter(jujuratelimit.NewBucketWithRate(100, 100)) + + // Each individual endpoint is an http/transport.Client (which implements + // endpoint.Endpoint) that gets wrapped with various middlewares. If you + // could rely on a consistent set of client behavior. + var sumEndpoint endpoint.Endpoint + { + sumEndpoint = MakeThriftSumEndpoint(client) + sumEndpoint = limiter(sumEndpoint) + sumEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{ + Name: "Sum", + Timeout: 30 * time.Second, + }))(sumEndpoint) + } + + // The Concat endpoint is the same thing, with slightly different + // middlewares to demonstrate how to specialize per-endpoint. + var concatEndpoint endpoint.Endpoint + { + concatEndpoint = MakeThriftConcatEndpoint(client) + concatEndpoint = limiter(concatEndpoint) + concatEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{ + Name: "Concat", + Timeout: 10 * time.Second, + }))(concatEndpoint) + } + + // Returning the endpoint.Set as a service.Service relies on the + // endpoint.Set implementing the Service methods. That's just a simple bit + // of glue code. + return addendpoint.Set{ + SumEndpoint: sumEndpoint, + ConcatEndpoint: concatEndpoint, + } +} + +// MakeThriftSumEndpoint returns an endpoint that invokes the passed Thrift client. +// Useful only in clients, and only until a proper transport/thrift.Client exists. +func MakeThriftSumEndpoint(client *thriftadd.AddServiceClient) endpoint.Endpoint { + return func(ctx context.Context, request interface{}) (interface{}, error) { + req := request.(addendpoint.SumRequest) + reply, err := client.Sum(int64(req.A), int64(req.B)) + if err == addservice.ErrIntOverflow { + return nil, err // special case; see comment on ErrIntOverflow + } + return addendpoint.SumResponse{V: int(reply.Value), Err: err}, nil + } +} + +// MakeThriftConcatEndpoint returns an endpoint that invokes the passed Thrift +// client. Useful only in clients, and only until a proper +// transport/thrift.Client exists. +func MakeThriftConcatEndpoint(client *thriftadd.AddServiceClient) endpoint.Endpoint { + return func(ctx context.Context, request interface{}) (interface{}, error) { + req := request.(addendpoint.ConcatRequest) + reply, err := client.Concat(req.A, req.B) + return addendpoint.ConcatResponse{V: reply.Value, Err: err}, nil + } +} diff --git a/examples/addsvc2/pkg/endpoint/middleware.go b/examples/addsvc2/pkg/endpoint/middleware.go deleted file mode 100644 index d02bfbe..0000000 --- a/examples/addsvc2/pkg/endpoint/middleware.go +++ /dev/null @@ -1,43 +0,0 @@ -package endpoint - -import ( - "context" - "fmt" - "time" - - "github.com/go-kit/kit/endpoint" - "github.com/go-kit/kit/log" - "github.com/go-kit/kit/metrics" -) - -// InstrumentingMiddleware returns an endpoint middleware that records -// the duration of each invocation to the passed histogram. The middleware adds -// a single field: "success", which is "true" if no error is returned, and -// "false" otherwise. -func InstrumentingMiddleware(duration metrics.Histogram) endpoint.Middleware { - return func(next endpoint.Endpoint) endpoint.Endpoint { - return func(ctx context.Context, request interface{}) (response interface{}, err error) { - - defer func(begin time.Time) { - duration.With("success", fmt.Sprint(err == nil)).Observe(time.Since(begin).Seconds()) - }(time.Now()) - return next(ctx, request) - - } - } -} - -// LoggingMiddleware returns an endpoint middleware that logs the -// duration of each invocation, and the resulting error, if any. -func LoggingMiddleware(logger log.Logger) endpoint.Middleware { - return func(next endpoint.Endpoint) endpoint.Endpoint { - return func(ctx context.Context, request interface{}) (response interface{}, err error) { - - defer func(begin time.Time) { - logger.Log("transport_error", err, "took", time.Since(begin)) - }(time.Now()) - return next(ctx, request) - - } - } -} diff --git a/examples/addsvc2/pkg/endpoint/set.go b/examples/addsvc2/pkg/endpoint/set.go deleted file mode 100644 index 1515aaf..0000000 --- a/examples/addsvc2/pkg/endpoint/set.go +++ /dev/null @@ -1,128 +0,0 @@ -package endpoint - -import ( - "context" - - rl "github.com/juju/ratelimit" - stdopentracing "github.com/opentracing/opentracing-go" - "github.com/sony/gobreaker" - - "github.com/go-kit/kit/circuitbreaker" - "github.com/go-kit/kit/endpoint" - "github.com/go-kit/kit/log" - "github.com/go-kit/kit/metrics" - "github.com/go-kit/kit/ratelimit" - "github.com/go-kit/kit/tracing/opentracing" - - "github.com/go-kit/kit/examples/addsvc2/pkg/service" -) - -// Set collects all of the endpoints that compose an add service. It's meant to -// be used as a helper struct, to collect all of the endpoints into a single -// parameter. -type Set struct { - SumEndpoint endpoint.Endpoint - ConcatEndpoint endpoint.Endpoint -} - -// New returns a Set that wraps the provided server, and wires in all of the -// expected endpoint middlewares via the various parameters. -func New(svc service.Service, logger log.Logger, duration metrics.Histogram, trace stdopentracing.Tracer) Set { - var sumEndpoint endpoint.Endpoint - { - sumEndpoint = MakeSumEndpoint(svc) - sumEndpoint = ratelimit.NewTokenBucketLimiter(rl.NewBucketWithRate(1, 1))(sumEndpoint) - sumEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{}))(sumEndpoint) - sumEndpoint = opentracing.TraceServer(trace, "Sum")(sumEndpoint) - sumEndpoint = LoggingMiddleware(log.With(logger, "method", "Sum"))(sumEndpoint) - sumEndpoint = InstrumentingMiddleware(duration.With("method", "Sum"))(sumEndpoint) - } - var concatEndpoint endpoint.Endpoint - { - concatEndpoint = MakeConcatEndpoint(svc) - concatEndpoint = ratelimit.NewTokenBucketLimiter(rl.NewBucketWithRate(100, 100))(concatEndpoint) - concatEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{}))(concatEndpoint) - concatEndpoint = opentracing.TraceServer(trace, "Concat")(concatEndpoint) - concatEndpoint = LoggingMiddleware(log.With(logger, "method", "Concat"))(concatEndpoint) - concatEndpoint = InstrumentingMiddleware(duration.With("method", "Concat"))(concatEndpoint) - } - return Set{ - SumEndpoint: sumEndpoint, - ConcatEndpoint: concatEndpoint, - } -} - -// Sum implements the service interface, so Set may be used as a service. -// This is primarily useful in the context of a client library. -func (s Set) Sum(ctx context.Context, a, b int) (int, error) { - resp, err := s.SumEndpoint(ctx, SumRequest{A: a, B: b}) - if err != nil { - return 0, err - } - response := resp.(SumResponse) - return response.V, response.Err -} - -// Concat implements the service interface, so Set may be used as a -// service. This is primarily useful in the context of a client library. -func (s Set) Concat(ctx context.Context, a, b string) (string, error) { - resp, err := s.ConcatEndpoint(ctx, ConcatRequest{A: a, B: b}) - if err != nil { - return "", err - } - response := resp.(ConcatResponse) - return response.V, response.Err -} - -// MakeSumEndpoint constructs a Sum endpoint wrapping the service. -func MakeSumEndpoint(s service.Service) endpoint.Endpoint { - return func(ctx context.Context, request interface{}) (response interface{}, err error) { - req := request.(SumRequest) - v, err := s.Sum(ctx, req.A, req.B) - return SumResponse{V: v, Err: err}, nil - } -} - -// MakeConcatEndpoint constructs a Concat endpoint wrapping the service. -func MakeConcatEndpoint(s service.Service) endpoint.Endpoint { - return func(ctx context.Context, request interface{}) (response interface{}, err error) { - req := request.(ConcatRequest) - v, err := s.Concat(ctx, req.A, req.B) - return ConcatResponse{V: v, Err: err}, nil - } -} - -// Failer is an interface that should be implemented by response types. -// Response encoders can check if responses are Failer, and if so if they've -// failed, and if so encode them using a separate write path based on the error. -type Failer interface { - Failed() error -} - -// SumRequest collects the request parameters for the Sum method. -type SumRequest struct { - A, B int -} - -// SumResponse collects the response values for the Sum method. -type SumResponse struct { - V int `json:"v"` - Err error `json:"-"` // should be intercepted by Failed/errorEncoder -} - -// Failed implements Failer. -func (r SumResponse) Failed() error { return r.Err } - -// ConcatRequest collects the request parameters for the Concat method. -type ConcatRequest struct { - A, B string -} - -// ConcatResponse collects the response values for the Concat method. -type ConcatResponse struct { - V string `json:"v"` - Err error `json:"-"` -} - -// Failed implements Failer. -func (r ConcatResponse) Failed() error { return r.Err } diff --git a/examples/addsvc2/pkg/service/middleware.go b/examples/addsvc2/pkg/service/middleware.go deleted file mode 100644 index ba028e7..0000000 --- a/examples/addsvc2/pkg/service/middleware.go +++ /dev/null @@ -1,69 +0,0 @@ -package service - -import ( - "context" - - "github.com/go-kit/kit/log" - "github.com/go-kit/kit/metrics" -) - -// Middleware describes a service (as opposed to endpoint) middleware. -type Middleware func(Service) Service - -// LoggingMiddleware takes a logger as a dependency -// and returns a ServiceMiddleware. -func LoggingMiddleware(logger log.Logger) Middleware { - return func(next Service) Service { - return loggingMiddleware{logger, next} - } -} - -type loggingMiddleware struct { - logger log.Logger - next Service -} - -func (mw loggingMiddleware) Sum(ctx context.Context, a, b int) (v int, err error) { - defer func() { - mw.logger.Log("method", "Sum", "a", a, "b", b, "v", v, "err", err) - }() - return mw.next.Sum(ctx, a, b) -} - -func (mw loggingMiddleware) Concat(ctx context.Context, a, b string) (v string, err error) { - defer func() { - mw.logger.Log("method", "Concat", "a", a, "b", b, "v", v, "err", err) - }() - return mw.next.Concat(ctx, a, b) -} - -// InstrumentingMiddleware returns a service middleware that instruments -// the number of integers summed and characters concatenated over the lifetime of -// the service. -func InstrumentingMiddleware(ints, chars metrics.Counter) Middleware { - return func(next Service) Service { - return instrumentingMiddleware{ - ints: ints, - chars: chars, - next: next, - } - } -} - -type instrumentingMiddleware struct { - ints metrics.Counter - chars metrics.Counter - next Service -} - -func (mw instrumentingMiddleware) Sum(ctx context.Context, a, b int) (int, error) { - v, err := mw.next.Sum(ctx, a, b) - mw.ints.Add(float64(v)) - return v, err -} - -func (mw instrumentingMiddleware) Concat(ctx context.Context, a, b string) (string, error) { - v, err := mw.next.Concat(ctx, a, b) - mw.chars.Add(float64(len(v))) - return v, err -} diff --git a/examples/addsvc2/pkg/service/service.go b/examples/addsvc2/pkg/service/service.go deleted file mode 100644 index 7612cf2..0000000 --- a/examples/addsvc2/pkg/service/service.go +++ /dev/null @@ -1,71 +0,0 @@ -package service - -import ( - "context" - "errors" - - "github.com/go-kit/kit/log" - "github.com/go-kit/kit/metrics" -) - -// Service describes a service that adds things together. -type Service interface { - Sum(ctx context.Context, a, b int) (int, error) - Concat(ctx context.Context, a, b string) (string, error) -} - -// New returns a basic Service with all of the expected middlewares wired in. -func New(logger log.Logger, ints, chars metrics.Counter) Service { - var svc Service - { - svc = NewBasicService() - svc = LoggingMiddleware(logger)(svc) - svc = InstrumentingMiddleware(ints, chars)(svc) - } - return svc -} - -var ( - // ErrTwoZeroes is an arbitrary business rule for the Add method. - ErrTwoZeroes = errors.New("can't sum two zeroes") - - // ErrIntOverflow protects the Add method. We've decided that this error - // indicates a misbehaving service and should count against e.g. circuit - // breakers. So, we return it directly in endpoints, to illustrate the - // difference. In a real service, this probably wouldn't be the case. - ErrIntOverflow = errors.New("integer overflow") - - // ErrMaxSizeExceeded protects the Concat method. - ErrMaxSizeExceeded = errors.New("result exceeds maximum size") -) - -// NewBasicService returns a naïve, stateless implementation of Service. -func NewBasicService() Service { - return basicService{} -} - -type basicService struct{} - -const ( - intMax = 1<<31 - 1 - intMin = -(intMax + 1) - maxLen = 10 -) - -func (s basicService) Sum(_ context.Context, a, b int) (int, error) { - if a == 0 && b == 0 { - return 0, ErrTwoZeroes - } - if (b > 0 && a > (intMax-b)) || (b < 0 && a < (intMin-b)) { - return 0, ErrIntOverflow - } - return a + b, nil -} - -// Concat implements Service. -func (s basicService) Concat(_ context.Context, a, b string) (string, error) { - if len(a)+len(b) > maxLen { - return "", ErrMaxSizeExceeded - } - return a + b, nil -} diff --git a/examples/addsvc2/pkg/transport/grpc.go b/examples/addsvc2/pkg/transport/grpc.go deleted file mode 100644 index 24a4864..0000000 --- a/examples/addsvc2/pkg/transport/grpc.go +++ /dev/null @@ -1,209 +0,0 @@ -package transport - -import ( - "context" - "errors" - "time" - - "google.golang.org/grpc" - - jujuratelimit "github.com/juju/ratelimit" - stdopentracing "github.com/opentracing/opentracing-go" - "github.com/sony/gobreaker" - oldcontext "golang.org/x/net/context" - - "github.com/go-kit/kit/circuitbreaker" - "github.com/go-kit/kit/endpoint" - "github.com/go-kit/kit/log" - "github.com/go-kit/kit/ratelimit" - "github.com/go-kit/kit/tracing/opentracing" - grpctransport "github.com/go-kit/kit/transport/grpc" - - "github.com/go-kit/kit/examples/addsvc2/pb" - addendpoint "github.com/go-kit/kit/examples/addsvc2/pkg/endpoint" - addservice "github.com/go-kit/kit/examples/addsvc2/pkg/service" -) - -type grpcServer struct { - sum grpctransport.Handler - concat grpctransport.Handler -} - -// NewGRPCServer makes a set of endpoints available as a gRPC AddServer. -func NewGRPCServer(endpoints addendpoint.Set, tracer stdopentracing.Tracer, logger log.Logger) pb.AddServer { - options := []grpctransport.ServerOption{ - grpctransport.ServerErrorLogger(logger), - } - return &grpcServer{ - sum: grpctransport.NewServer( - endpoints.SumEndpoint, - decodeGRPCSumRequest, - encodeGRPCSumResponse, - append(options, grpctransport.ServerBefore(opentracing.FromGRPCRequest(tracer, "Sum", logger)))..., - ), - concat: grpctransport.NewServer( - endpoints.ConcatEndpoint, - decodeGRPCConcatRequest, - encodeGRPCConcatResponse, - append(options, grpctransport.ServerBefore(opentracing.FromGRPCRequest(tracer, "Concat", logger)))..., - ), - } -} - -func (s *grpcServer) Sum(ctx oldcontext.Context, req *pb.SumRequest) (*pb.SumReply, error) { - _, rep, err := s.sum.ServeGRPC(ctx, req) - if err != nil { - return nil, err - } - return rep.(*pb.SumReply), nil -} - -func (s *grpcServer) Concat(ctx oldcontext.Context, req *pb.ConcatRequest) (*pb.ConcatReply, error) { - _, rep, err := s.concat.ServeGRPC(ctx, req) - if err != nil { - return nil, err - } - return rep.(*pb.ConcatReply), nil -} - -// NewGRPCClient returns an AddService backed by a gRPC server at the other end -// of the conn. The caller is responsible for constructing the conn, and -// eventually closing the underlying transport. -func NewGRPCClient(conn *grpc.ClientConn, tracer stdopentracing.Tracer, logger log.Logger) addservice.Service { - // We construct a single ratelimiter middleware, to limit the total outgoing - // QPS from this client to all methods on the remote instance. We also - // construct per-endpoint circuitbreaker middlewares to demonstrate how - // that's done, although they could easily be combined into a single breaker - // for the entire remote instance, too. - limiter := ratelimit.NewTokenBucketLimiter(jujuratelimit.NewBucketWithRate(100, 100)) - - // Each individual endpoint is an http/transport.Client (which implements - // endpoint.Endpoint) that gets wrapped with various middlewares. If you - // made your own client library, you'd do this work there, so your server - // could rely on a consistent set of client behavior. - var sumEndpoint endpoint.Endpoint - { - sumEndpoint = grpctransport.NewClient( - conn, - "pb.Add", - "Sum", - encodeGRPCSumRequest, - decodeGRPCSumResponse, - pb.SumReply{}, - grpctransport.ClientBefore(opentracing.ToGRPCRequest(tracer, logger)), - ).Endpoint() - sumEndpoint = opentracing.TraceClient(tracer, "Sum")(sumEndpoint) - sumEndpoint = limiter(sumEndpoint) - sumEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{ - Name: "Sum", - Timeout: 30 * time.Second, - }))(sumEndpoint) - } - - // The Concat endpoint is the same thing, with slightly different - // middlewares to demonstrate how to specialize per-endpoint. - var concatEndpoint endpoint.Endpoint - { - concatEndpoint = grpctransport.NewClient( - conn, - "pb.Add", - "Concat", - encodeGRPCConcatRequest, - decodeGRPCConcatResponse, - pb.ConcatReply{}, - grpctransport.ClientBefore(opentracing.ToGRPCRequest(tracer, logger)), - ).Endpoint() - concatEndpoint = opentracing.TraceClient(tracer, "Concat")(concatEndpoint) - concatEndpoint = limiter(concatEndpoint) - concatEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{ - Name: "Concat", - Timeout: 10 * time.Second, - }))(concatEndpoint) - } - - // Returning the endpoint.Set as a service.Service relies on the - // endpoint.Set implementing the Service methods. That's just a simple bit - // of glue code. - return addendpoint.Set{ - SumEndpoint: sumEndpoint, - ConcatEndpoint: concatEndpoint, - } -} - -// decodeGRPCSumRequest is a transport/grpc.DecodeRequestFunc that converts a -// gRPC sum request to a user-domain sum request. Primarily useful in a server. -func decodeGRPCSumRequest(_ context.Context, grpcReq interface{}) (interface{}, error) { - req := grpcReq.(*pb.SumRequest) - return addendpoint.SumRequest{A: int(req.A), B: int(req.B)}, nil -} - -// decodeGRPCConcatRequest is a transport/grpc.DecodeRequestFunc that converts a -// gRPC concat request to a user-domain concat request. Primarily useful in a -// server. -func decodeGRPCConcatRequest(_ context.Context, grpcReq interface{}) (interface{}, error) { - req := grpcReq.(*pb.ConcatRequest) - return addendpoint.ConcatRequest{A: req.A, B: req.B}, nil -} - -// decodeGRPCSumResponse is a transport/grpc.DecodeResponseFunc that converts a -// gRPC sum reply to a user-domain sum response. Primarily useful in a client. -func decodeGRPCSumResponse(_ context.Context, grpcReply interface{}) (interface{}, error) { - reply := grpcReply.(*pb.SumReply) - return addendpoint.SumResponse{V: int(reply.V), Err: str2err(reply.Err)}, nil -} - -// decodeGRPCConcatResponse is a transport/grpc.DecodeResponseFunc that converts -// a gRPC concat reply to a user-domain concat response. Primarily useful in a -// client. -func decodeGRPCConcatResponse(_ context.Context, grpcReply interface{}) (interface{}, error) { - reply := grpcReply.(*pb.ConcatReply) - return addendpoint.ConcatResponse{V: reply.V, Err: str2err(reply.Err)}, nil -} - -// encodeGRPCSumResponse is a transport/grpc.EncodeResponseFunc that converts a -// user-domain sum response to a gRPC sum reply. Primarily useful in a server. -func encodeGRPCSumResponse(_ context.Context, response interface{}) (interface{}, error) { - resp := response.(addendpoint.SumResponse) - return &pb.SumReply{V: int64(resp.V), Err: err2str(resp.Err)}, nil -} - -// encodeGRPCConcatResponse is a transport/grpc.EncodeResponseFunc that converts -// a user-domain concat response to a gRPC concat reply. Primarily useful in a -// server. -func encodeGRPCConcatResponse(_ context.Context, response interface{}) (interface{}, error) { - resp := response.(addendpoint.ConcatResponse) - return &pb.ConcatReply{V: resp.V, Err: err2str(resp.Err)}, nil -} - -// encodeGRPCSumRequest is a transport/grpc.EncodeRequestFunc that converts a -// user-domain sum request to a gRPC sum request. Primarily useful in a client. -func encodeGRPCSumRequest(_ context.Context, request interface{}) (interface{}, error) { - req := request.(addendpoint.SumRequest) - return &pb.SumRequest{A: int64(req.A), B: int64(req.B)}, nil -} - -// encodeGRPCConcatRequest is a transport/grpc.EncodeRequestFunc that converts a -// user-domain concat request to a gRPC concat request. Primarily useful in a -// client. -func encodeGRPCConcatRequest(_ context.Context, request interface{}) (interface{}, error) { - req := request.(addendpoint.ConcatRequest) - return &pb.ConcatRequest{A: req.A, B: req.B}, nil -} - -// These annoying helper functions are required to translate Go error types to -// and from strings, which is the type we use in our IDLs to represent errors. -// There is special casing to treat empty strings as nil errors. - -func str2err(s string) error { - if s == "" { - return nil - } - return errors.New(s) -} - -func err2str(err error) string { - if err == nil { - return "" - } - return err.Error() -} diff --git a/examples/addsvc2/pkg/transport/http.go b/examples/addsvc2/pkg/transport/http.go deleted file mode 100644 index e011fc3..0000000 --- a/examples/addsvc2/pkg/transport/http.go +++ /dev/null @@ -1,218 +0,0 @@ -package transport - -import ( - "bytes" - "context" - "encoding/json" - "errors" - "io/ioutil" - "net/http" - "net/url" - "strings" - "time" - - jujuratelimit "github.com/juju/ratelimit" - stdopentracing "github.com/opentracing/opentracing-go" - "github.com/sony/gobreaker" - - "github.com/go-kit/kit/circuitbreaker" - "github.com/go-kit/kit/endpoint" - "github.com/go-kit/kit/log" - "github.com/go-kit/kit/ratelimit" - "github.com/go-kit/kit/tracing/opentracing" - httptransport "github.com/go-kit/kit/transport/http" - - addendpoint "github.com/go-kit/kit/examples/addsvc2/pkg/endpoint" - addservice "github.com/go-kit/kit/examples/addsvc2/pkg/service" -) - -// NewHTTPHandler returns an HTTP handler that makes a set of endpoints -// available on predefined paths. -func NewHTTPHandler(endpoints addendpoint.Set, tracer stdopentracing.Tracer, logger log.Logger) http.Handler { - options := []httptransport.ServerOption{ - httptransport.ServerErrorEncoder(errorEncoder), - httptransport.ServerErrorLogger(logger), - } - m := http.NewServeMux() - m.Handle("/sum", httptransport.NewServer( - endpoints.SumEndpoint, - decodeHTTPSumRequest, - encodeHTTPGenericResponse, - append(options, httptransport.ServerBefore(opentracing.FromHTTPRequest(tracer, "Sum", logger)))..., - )) - m.Handle("/concat", httptransport.NewServer( - endpoints.ConcatEndpoint, - decodeHTTPConcatRequest, - encodeHTTPGenericResponse, - append(options, httptransport.ServerBefore(opentracing.FromHTTPRequest(tracer, "Concat", logger)))..., - )) - return m -} - -// NewHTTPClient returns an AddService backed by an HTTP server living at the -// remote instance. We expect instance to come from a service discovery system, -// so likely of the form "host:port". -func NewHTTPClient(instance string, tracer stdopentracing.Tracer, logger log.Logger) (addservice.Service, error) { - // Quickly sanitize the instance string. - if !strings.HasPrefix(instance, "http") { - instance = "http://" + instance - } - u, err := url.Parse(instance) - if err != nil { - return nil, err - } - - // We construct a single ratelimiter middleware, to limit the total outgoing - // QPS from this client to all methods on the remote instance. We also - // construct per-endpoint circuitbreaker middlewares to demonstrate how - // that's done, although they could easily be combined into a single breaker - // for the entire remote instance, too. - limiter := ratelimit.NewTokenBucketLimiter(jujuratelimit.NewBucketWithRate(100, 100)) - - // Each individual endpoint is an http/transport.Client (which implements - // endpoint.Endpoint) that gets wrapped with various middlewares. If you - // made your own client library, you'd do this work there, so your server - // could rely on a consistent set of client behavior. - var sumEndpoint endpoint.Endpoint - { - sumEndpoint = httptransport.NewClient( - "POST", - copyURL(u, "/sum"), - encodeHTTPGenericRequest, - decodeHTTPSumResponse, - httptransport.ClientBefore(opentracing.ToHTTPRequest(tracer, logger)), - ).Endpoint() - sumEndpoint = opentracing.TraceClient(tracer, "Sum")(sumEndpoint) - sumEndpoint = limiter(sumEndpoint) - sumEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{ - Name: "Sum", - Timeout: 30 * time.Second, - }))(sumEndpoint) - } - - // The Concat endpoint is the same thing, with slightly different - // middlewares to demonstrate how to specialize per-endpoint. - var concatEndpoint endpoint.Endpoint - { - concatEndpoint = httptransport.NewClient( - "POST", - copyURL(u, "/concat"), - encodeHTTPGenericRequest, - decodeHTTPConcatResponse, - httptransport.ClientBefore(opentracing.ToHTTPRequest(tracer, logger)), - ).Endpoint() - concatEndpoint = opentracing.TraceClient(tracer, "Concat")(concatEndpoint) - concatEndpoint = limiter(concatEndpoint) - concatEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{ - Name: "Concat", - Timeout: 10 * time.Second, - }))(concatEndpoint) - } - - // Returning the endpoint.Set as a service.Service relies on the - // endpoint.Set implementing the Service methods. That's just a simple bit - // of glue code. - return addendpoint.Set{ - SumEndpoint: sumEndpoint, - ConcatEndpoint: concatEndpoint, - }, nil -} - -func copyURL(base *url.URL, path string) *url.URL { - next := *base - next.Path = path - return &next -} - -func errorEncoder(_ context.Context, err error, w http.ResponseWriter) { - w.WriteHeader(err2code(err)) - json.NewEncoder(w).Encode(errorWrapper{Error: err.Error()}) -} - -func err2code(err error) int { - switch err { - case addservice.ErrTwoZeroes, addservice.ErrMaxSizeExceeded, addservice.ErrIntOverflow: - return http.StatusBadRequest - } - return http.StatusInternalServerError -} - -func errorDecoder(r *http.Response) error { - var w errorWrapper - if err := json.NewDecoder(r.Body).Decode(&w); err != nil { - return err - } - return errors.New(w.Error) -} - -type errorWrapper struct { - Error string `json:"error"` -} - -// decodeHTTPSumRequest is a transport/http.DecodeRequestFunc that decodes a -// JSON-encoded sum request from the HTTP request body. Primarily useful in a -// server. -func decodeHTTPSumRequest(_ context.Context, r *http.Request) (interface{}, error) { - var req addendpoint.SumRequest - err := json.NewDecoder(r.Body).Decode(&req) - return req, err -} - -// decodeHTTPConcatRequest is a transport/http.DecodeRequestFunc that decodes a -// JSON-encoded concat request from the HTTP request body. Primarily useful in a -// server. -func decodeHTTPConcatRequest(_ context.Context, r *http.Request) (interface{}, error) { - var req addendpoint.ConcatRequest - err := json.NewDecoder(r.Body).Decode(&req) - return req, err -} - -// decodeHTTPSumResponse is a transport/http.DecodeResponseFunc that decodes a -// JSON-encoded sum response from the HTTP response body. If the response has a -// non-200 status code, we will interpret that as an error and attempt to decode -// the specific error message from the response body. Primarily useful in a -// client. -func decodeHTTPSumResponse(_ context.Context, r *http.Response) (interface{}, error) { - if r.StatusCode != http.StatusOK { - return nil, errors.New(r.Status) - } - var resp addendpoint.SumResponse - err := json.NewDecoder(r.Body).Decode(&resp) - return resp, err -} - -// decodeHTTPConcatResponse is a transport/http.DecodeResponseFunc that decodes -// a JSON-encoded concat response from the HTTP response body. If the response -// has a non-200 status code, we will interpret that as an error and attempt to -// decode the specific error message from the response body. Primarily useful in -// a client. -func decodeHTTPConcatResponse(_ context.Context, r *http.Response) (interface{}, error) { - if r.StatusCode != http.StatusOK { - return nil, errors.New(r.Status) - } - var resp addendpoint.ConcatResponse - err := json.NewDecoder(r.Body).Decode(&resp) - return resp, err -} - -// encodeHTTPGenericRequest is a transport/http.EncodeRequestFunc that -// JSON-encodes any request to the request body. Primarily useful in a client. -func encodeHTTPGenericRequest(_ context.Context, r *http.Request, request interface{}) error { - var buf bytes.Buffer - if err := json.NewEncoder(&buf).Encode(request); err != nil { - return err - } - r.Body = ioutil.NopCloser(&buf) - return nil -} - -// encodeHTTPGenericResponse is a transport/http.EncodeResponseFunc that encodes -// the response as JSON to the response writer. Primarily useful in a server. -func encodeHTTPGenericResponse(ctx context.Context, w http.ResponseWriter, response interface{}) error { - if f, ok := response.(addendpoint.Failer); ok && f.Failed() != nil { - errorEncoder(ctx, f.Failed(), w) - return nil - } - w.Header().Set("Content-Type", "application/json; charset=utf-8") - return json.NewEncoder(w).Encode(response) -} diff --git a/examples/addsvc2/pkg/transport/thrift.go b/examples/addsvc2/pkg/transport/thrift.go deleted file mode 100644 index 4ec99c6..0000000 --- a/examples/addsvc2/pkg/transport/thrift.go +++ /dev/null @@ -1,119 +0,0 @@ -package transport - -import ( - "context" - "time" - - jujuratelimit "github.com/juju/ratelimit" - "github.com/sony/gobreaker" - - "github.com/go-kit/kit/circuitbreaker" - "github.com/go-kit/kit/endpoint" - "github.com/go-kit/kit/ratelimit" - - addendpoint "github.com/go-kit/kit/examples/addsvc2/pkg/endpoint" - addservice "github.com/go-kit/kit/examples/addsvc2/pkg/service" - thriftadd "github.com/go-kit/kit/examples/addsvc2/thrift/gen-go/addsvc" -) - -type thriftServer struct { - ctx context.Context - endpoints addendpoint.Set -} - -// NewThriftServer makes a set of endpoints available as a Thrift service. -func NewThriftServer(ctx context.Context, endpoints addendpoint.Set) thriftadd.AddService { - return &thriftServer{ - ctx: ctx, - endpoints: endpoints, - } -} - -func (s *thriftServer) Sum(a int64, b int64) (*thriftadd.SumReply, error) { - request := addendpoint.SumRequest{A: int(a), B: int(b)} - response, err := s.endpoints.SumEndpoint(s.ctx, request) - if err != nil { - return nil, err - } - resp := response.(addendpoint.SumResponse) - return &thriftadd.SumReply{Value: int64(resp.V), Err: err2str(resp.Err)}, nil -} - -func (s *thriftServer) Concat(a string, b string) (*thriftadd.ConcatReply, error) { - request := addendpoint.ConcatRequest{A: a, B: b} - response, err := s.endpoints.ConcatEndpoint(s.ctx, request) - if err != nil { - return nil, err - } - resp := response.(addendpoint.ConcatResponse) - return &thriftadd.ConcatReply{Value: resp.V, Err: err2str(resp.Err)}, nil -} - -// NewThriftClient returns an AddService backed by a Thrift server described by -// the provided client. The caller is responsible for constructing the client, -// and eventually closing the underlying transport. -func NewThriftClient(client *thriftadd.AddServiceClient) addservice.Service { - // We construct a single ratelimiter middleware, to limit the total outgoing - // QPS from this client to all methods on the remote instance. We also - // construct per-endpoint circuitbreaker middlewares to demonstrate how - // that's done, although they could easily be combined into a single breaker - // for the entire remote instance, too. - limiter := ratelimit.NewTokenBucketLimiter(jujuratelimit.NewBucketWithRate(100, 100)) - - // Each individual endpoint is an http/transport.Client (which implements - // endpoint.Endpoint) that gets wrapped with various middlewares. If you - // could rely on a consistent set of client behavior. - var sumEndpoint endpoint.Endpoint - { - sumEndpoint = MakeThriftSumEndpoint(client) - sumEndpoint = limiter(sumEndpoint) - sumEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{ - Name: "Sum", - Timeout: 30 * time.Second, - }))(sumEndpoint) - } - - // The Concat endpoint is the same thing, with slightly different - // middlewares to demonstrate how to specialize per-endpoint. - var concatEndpoint endpoint.Endpoint - { - concatEndpoint = MakeThriftConcatEndpoint(client) - concatEndpoint = limiter(concatEndpoint) - concatEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{ - Name: "Concat", - Timeout: 10 * time.Second, - }))(concatEndpoint) - } - - // Returning the endpoint.Set as a service.Service relies on the - // endpoint.Set implementing the Service methods. That's just a simple bit - // of glue code. - return addendpoint.Set{ - SumEndpoint: sumEndpoint, - ConcatEndpoint: concatEndpoint, - } -} - -// MakeThriftSumEndpoint returns an endpoint that invokes the passed Thrift client. -// Useful only in clients, and only until a proper transport/thrift.Client exists. -func MakeThriftSumEndpoint(client *thriftadd.AddServiceClient) endpoint.Endpoint { - return func(ctx context.Context, request interface{}) (interface{}, error) { - req := request.(addendpoint.SumRequest) - reply, err := client.Sum(int64(req.A), int64(req.B)) - if err == addservice.ErrIntOverflow { - return nil, err // special case; see comment on ErrIntOverflow - } - return addendpoint.SumResponse{V: int(reply.Value), Err: err}, nil - } -} - -// MakeThriftConcatEndpoint returns an endpoint that invokes the passed Thrift -// client. Useful only in clients, and only until a proper -// transport/thrift.Client exists. -func MakeThriftConcatEndpoint(client *thriftadd.AddServiceClient) endpoint.Endpoint { - return func(ctx context.Context, request interface{}) (interface{}, error) { - req := request.(addendpoint.ConcatRequest) - reply, err := client.Concat(req.A, req.B) - return addendpoint.ConcatResponse{V: reply.Value, Err: err}, nil - } -}