diff --git a/examples/addsvc2/cmd/addcli/addcli.go b/examples/addsvc2/cmd/addcli/addcli.go new file mode 100644 index 0000000..01ee31f --- /dev/null +++ b/examples/addsvc2/cmd/addcli/addcli.go @@ -0,0 +1,198 @@ +package main + +import ( + "context" + "flag" + "fmt" + "os" + "strconv" + "text/tabwriter" + "time" + + "google.golang.org/grpc" + + "github.com/apache/thrift/lib/go/thrift" + lightstep "github.com/lightstep/lightstep-tracer-go" + stdopentracing "github.com/opentracing/opentracing-go" + zipkin "github.com/openzipkin/zipkin-go-opentracing" + "sourcegraph.com/sourcegraph/appdash" + appdashot "sourcegraph.com/sourcegraph/appdash/opentracing" + + "github.com/go-kit/kit/log" + + addservice "github.com/go-kit/kit/examples/addsvc2/pkg/service" + addtransport "github.com/go-kit/kit/examples/addsvc2/pkg/transport" + addthrift "github.com/go-kit/kit/examples/addsvc2/thrift/gen-go/addsvc" +) + +func main() { + // The addcli presumes no service discovery system, and expects users to + // provide the direct address of an addsvc. This presumption is reflected in + // the addcli binary and the the client packages: the -transport.addr flags + // and various client constructors both expect host:port strings. For an + // example service with a client built on top of a service discovery system, + // see profilesvc. + fs := flag.NewFlagSet("addcli", flag.ExitOnError) + var ( + httpAddr = fs.String("http-addr", "", "HTTP address of addsvc") + grpcAddr = fs.String("grpc-addr", "", "gRPC address of addsvc") + thriftAddr = fs.String("thrift-addr", "", "Thrift address of addsvc") + thriftProtocol = fs.String("thrift-protocol", "binary", "binary, compact, json, simplejson") + thriftBuffer = fs.Int("thrift-buffer", 0, "0 for unbuffered") + thriftFramed = fs.Bool("thrift-framed", false, "true to enable framing") + zipkinURL = fs.String("zipkin-url", "", "Enable Zipkin tracing via a collector URL e.g. http://localhost:9411/api/v1/spans") + lightstepToken = flag.String("lightstep-token", "", "Enable LightStep tracing via a LightStep access token") + appdashAddr = flag.String("appdash-addr", "", "Enable Appdash tracing via an Appdash server host:port") + method = fs.String("method", "sum", "sum, concat") + ) + fs.Usage = usageFor(fs, os.Args[0]+" [flags] ") + fs.Parse(os.Args[1:]) + if len(fs.Args()) != 2 { + fs.Usage() + os.Exit(1) + } + + // This is a demonstration client, which supports multiple tracers. + // Your clients will probably just use one tracer. + var tracer stdopentracing.Tracer + { + if *zipkinURL != "" { + collector, err := zipkin.NewHTTPCollector(*zipkinURL) + if err != nil { + fmt.Fprintln(os.Stderr, err.Error()) + os.Exit(1) + } + defer collector.Close() + var ( + debug = false + hostPort = "localhost:80" + serviceName = "addsvc" + ) + recorder := zipkin.NewRecorder(collector, debug, hostPort, serviceName) + tracer, err = zipkin.NewTracer(recorder) + if err != nil { + fmt.Fprintln(os.Stderr, err.Error()) + os.Exit(1) + } + } else if *lightstepToken != "" { + tracer = lightstep.NewTracer(lightstep.Options{ + AccessToken: *lightstepToken, + }) + defer lightstep.FlushLightStepTracer(tracer) + } else if *appdashAddr != "" { + tracer = appdashot.NewTracer(appdash.NewRemoteCollector(*appdashAddr)) + } else { + tracer = stdopentracing.GlobalTracer() // no-op + } + } + + // This is a demonstration client, which supports multiple transports. + // Your clients will probably just define and stick with 1 transport. + var ( + svc addservice.Service + err error + ) + if *httpAddr != "" { + svc, err = addtransport.NewHTTPClient(*httpAddr, tracer, log.NewNopLogger()) + } else if *grpcAddr != "" { + conn, err := grpc.Dial(*grpcAddr, grpc.WithInsecure(), grpc.WithTimeout(time.Second)) + if err != nil { + fmt.Fprintf(os.Stderr, "error: %v", err) + os.Exit(1) + } + defer conn.Close() + svc = addtransport.NewGRPCClient(conn, tracer, log.NewNopLogger()) + } else if *thriftAddr != "" { + // It's necessary to do all of this construction in the func main, + // because (among other reasons) we need to control the lifecycle of the + // Thrift transport, i.e. close it eventually. + var protocolFactory thrift.TProtocolFactory + switch *thriftProtocol { + case "compact": + protocolFactory = thrift.NewTCompactProtocolFactory() + case "simplejson": + protocolFactory = thrift.NewTSimpleJSONProtocolFactory() + case "json": + protocolFactory = thrift.NewTJSONProtocolFactory() + case "binary", "": + protocolFactory = thrift.NewTBinaryProtocolFactoryDefault() + default: + fmt.Fprintf(os.Stderr, "error: invalid protocol %q\n", *thriftProtocol) + os.Exit(1) + } + var transportFactory thrift.TTransportFactory + if *thriftBuffer > 0 { + transportFactory = thrift.NewTBufferedTransportFactory(*thriftBuffer) + } else { + transportFactory = thrift.NewTTransportFactory() + } + if *thriftFramed { + transportFactory = thrift.NewTFramedTransportFactory(transportFactory) + } + transportSocket, err := thrift.NewTSocket(*thriftAddr) + if err != nil { + fmt.Fprintf(os.Stderr, "error: %v\n", err) + os.Exit(1) + } + transport, err := transportFactory.GetTransport(transportSocket) + if err != nil { + fmt.Fprintf(os.Stderr, "error: %v\n", err) + os.Exit(1) + } + if err := transport.Open(); err != nil { + fmt.Fprintf(os.Stderr, "error: %v\n", err) + os.Exit(1) + } + defer transport.Close() + client := addthrift.NewAddServiceClientFactory(transport, protocolFactory) + svc = addtransport.NewThriftClient(client) + } else { + fmt.Fprintf(os.Stderr, "error: no remote address specified\n") + os.Exit(1) + } + if err != nil { + fmt.Fprintf(os.Stderr, "error: %v\n", err) + os.Exit(1) + } + + switch *method { + case "sum": + a, _ := strconv.ParseInt(fs.Args()[0], 10, 64) + b, _ := strconv.ParseInt(fs.Args()[1], 10, 64) + v, err := svc.Sum(context.Background(), int(a), int(b)) + if err != nil { + fmt.Fprintf(os.Stderr, "error: %v\n", err) + os.Exit(1) + } + fmt.Fprintf(os.Stdout, "%d + %d = %d\n", a, b, v) + + case "concat": + a := fs.Args()[0] + b := fs.Args()[1] + v, err := svc.Concat(context.Background(), a, b) + if err != nil { + fmt.Fprintf(os.Stderr, "error: %v\n", err) + os.Exit(1) + } + fmt.Fprintf(os.Stdout, "%q + %q = %q\n", a, b, v) + + default: + fmt.Fprintf(os.Stderr, "error: invalid method %q\n", method) + os.Exit(1) + } +} + +func usageFor(fs *flag.FlagSet, short string) func() { + return func() { + fmt.Fprintf(os.Stderr, "USAGE\n") + fmt.Fprintf(os.Stderr, " %s\n", short) + fmt.Fprintf(os.Stderr, "\n") + fmt.Fprintf(os.Stderr, "FLAGS\n") + w := tabwriter.NewWriter(os.Stderr, 0, 2, 2, ' ', 0) + fs.VisitAll(func(f *flag.Flag) { + fmt.Fprintf(w, "\t-%s %s\t%s\n", f.Name, f.DefValue, f.Usage) + }) + w.Flush() + fmt.Fprintf(os.Stderr, "\n") + } +} diff --git a/examples/addsvc2/cmd/addsvc/addsvc.go b/examples/addsvc2/cmd/addsvc/addsvc.go index 0d64e22..d7e3500 100644 --- a/examples/addsvc2/cmd/addsvc/addsvc.go +++ b/examples/addsvc2/cmd/addsvc/addsvc.go @@ -17,6 +17,7 @@ stdopentracing "github.com/opentracing/opentracing-go" zipkin "github.com/openzipkin/zipkin-go-opentracing" stdprometheus "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" "google.golang.org/grpc" "sourcegraph.com/sourcegraph/appdash" appdashot "sourcegraph.com/sourcegraph/appdash/opentracing" @@ -126,6 +127,7 @@ Help: "Request duration in seconds.", }, []string{"method", "success"}) } + http.DefaultServeMux.Handle("/metrics", promhttp.Handler()) // Build the layers of the service "onion" from the inside out. First, the // business logic service; then, the set of endpoints that wrap the service; @@ -136,7 +138,7 @@ var ( service = addservice.New(logger, ints, chars) endpoints = addendpoint.New(service, logger, duration, tracer) - httpHandler = addtransport.NewHTTPHandler(context.Background(), endpoints, logger, tracer) + httpHandler = addtransport.NewHTTPHandler(endpoints, tracer, logger) grpcServer = addtransport.NewGRPCServer(endpoints, tracer, logger) thriftServer = addtransport.NewThriftServer(context.Background(), endpoints) ) diff --git a/examples/addsvc2/pkg/endpoint/set.go b/examples/addsvc2/pkg/endpoint/set.go index 6b0630e..1515aaf 100644 --- a/examples/addsvc2/pkg/endpoint/set.go +++ b/examples/addsvc2/pkg/endpoint/set.go @@ -52,6 +52,28 @@ } } +// Sum implements the service interface, so Set may be used as a service. +// This is primarily useful in the context of a client library. +func (s Set) Sum(ctx context.Context, a, b int) (int, error) { + resp, err := s.SumEndpoint(ctx, SumRequest{A: a, B: b}) + if err != nil { + return 0, err + } + response := resp.(SumResponse) + return response.V, response.Err +} + +// Concat implements the service interface, so Set may be used as a +// service. This is primarily useful in the context of a client library. +func (s Set) Concat(ctx context.Context, a, b string) (string, error) { + resp, err := s.ConcatEndpoint(ctx, ConcatRequest{A: a, B: b}) + if err != nil { + return "", err + } + response := resp.(ConcatResponse) + return response.V, response.Err +} + // MakeSumEndpoint constructs a Sum endpoint wrapping the service. func MakeSumEndpoint(s service.Service) endpoint.Endpoint { return func(ctx context.Context, request interface{}) (response interface{}, err error) { diff --git a/examples/addsvc2/pkg/transport/grpc.go b/examples/addsvc2/pkg/transport/grpc.go index 59eac93..24a4864 100644 --- a/examples/addsvc2/pkg/transport/grpc.go +++ b/examples/addsvc2/pkg/transport/grpc.go @@ -3,20 +3,34 @@ import ( "context" "errors" - + "time" + + "google.golang.org/grpc" + + jujuratelimit "github.com/juju/ratelimit" stdopentracing "github.com/opentracing/opentracing-go" + "github.com/sony/gobreaker" oldcontext "golang.org/x/net/context" + "github.com/go-kit/kit/circuitbreaker" + "github.com/go-kit/kit/endpoint" "github.com/go-kit/kit/log" + "github.com/go-kit/kit/ratelimit" "github.com/go-kit/kit/tracing/opentracing" grpctransport "github.com/go-kit/kit/transport/grpc" "github.com/go-kit/kit/examples/addsvc2/pb" - "github.com/go-kit/kit/examples/addsvc2/pkg/endpoint" + addendpoint "github.com/go-kit/kit/examples/addsvc2/pkg/endpoint" + addservice "github.com/go-kit/kit/examples/addsvc2/pkg/service" ) +type grpcServer struct { + sum grpctransport.Handler + concat grpctransport.Handler +} + // NewGRPCServer makes a set of endpoints available as a gRPC AddServer. -func NewGRPCServer(endpoints endpoint.Set, tracer stdopentracing.Tracer, logger log.Logger) pb.AddServer { +func NewGRPCServer(endpoints addendpoint.Set, tracer stdopentracing.Tracer, logger log.Logger) pb.AddServer { options := []grpctransport.ServerOption{ grpctransport.ServerErrorLogger(logger), } @@ -36,11 +50,6 @@ } } -type grpcServer struct { - sum grpctransport.Handler - concat grpctransport.Handler -} - func (s *grpcServer) Sum(ctx oldcontext.Context, req *pb.SumRequest) (*pb.SumReply, error) { _, rep, err := s.sum.ServeGRPC(ctx, req) if err != nil { @@ -57,11 +66,75 @@ return rep.(*pb.ConcatReply), nil } +// NewGRPCClient returns an AddService backed by a gRPC server at the other end +// of the conn. The caller is responsible for constructing the conn, and +// eventually closing the underlying transport. +func NewGRPCClient(conn *grpc.ClientConn, tracer stdopentracing.Tracer, logger log.Logger) addservice.Service { + // We construct a single ratelimiter middleware, to limit the total outgoing + // QPS from this client to all methods on the remote instance. We also + // construct per-endpoint circuitbreaker middlewares to demonstrate how + // that's done, although they could easily be combined into a single breaker + // for the entire remote instance, too. + limiter := ratelimit.NewTokenBucketLimiter(jujuratelimit.NewBucketWithRate(100, 100)) + + // Each individual endpoint is an http/transport.Client (which implements + // endpoint.Endpoint) that gets wrapped with various middlewares. If you + // made your own client library, you'd do this work there, so your server + // could rely on a consistent set of client behavior. + var sumEndpoint endpoint.Endpoint + { + sumEndpoint = grpctransport.NewClient( + conn, + "pb.Add", + "Sum", + encodeGRPCSumRequest, + decodeGRPCSumResponse, + pb.SumReply{}, + grpctransport.ClientBefore(opentracing.ToGRPCRequest(tracer, logger)), + ).Endpoint() + sumEndpoint = opentracing.TraceClient(tracer, "Sum")(sumEndpoint) + sumEndpoint = limiter(sumEndpoint) + sumEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{ + Name: "Sum", + Timeout: 30 * time.Second, + }))(sumEndpoint) + } + + // The Concat endpoint is the same thing, with slightly different + // middlewares to demonstrate how to specialize per-endpoint. + var concatEndpoint endpoint.Endpoint + { + concatEndpoint = grpctransport.NewClient( + conn, + "pb.Add", + "Concat", + encodeGRPCConcatRequest, + decodeGRPCConcatResponse, + pb.ConcatReply{}, + grpctransport.ClientBefore(opentracing.ToGRPCRequest(tracer, logger)), + ).Endpoint() + concatEndpoint = opentracing.TraceClient(tracer, "Concat")(concatEndpoint) + concatEndpoint = limiter(concatEndpoint) + concatEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{ + Name: "Concat", + Timeout: 10 * time.Second, + }))(concatEndpoint) + } + + // Returning the endpoint.Set as a service.Service relies on the + // endpoint.Set implementing the Service methods. That's just a simple bit + // of glue code. + return addendpoint.Set{ + SumEndpoint: sumEndpoint, + ConcatEndpoint: concatEndpoint, + } +} + // decodeGRPCSumRequest is a transport/grpc.DecodeRequestFunc that converts a // gRPC sum request to a user-domain sum request. Primarily useful in a server. func decodeGRPCSumRequest(_ context.Context, grpcReq interface{}) (interface{}, error) { req := grpcReq.(*pb.SumRequest) - return endpoint.SumRequest{A: int(req.A), B: int(req.B)}, nil + return addendpoint.SumRequest{A: int(req.A), B: int(req.B)}, nil } // decodeGRPCConcatRequest is a transport/grpc.DecodeRequestFunc that converts a @@ -69,14 +142,14 @@ // server. func decodeGRPCConcatRequest(_ context.Context, grpcReq interface{}) (interface{}, error) { req := grpcReq.(*pb.ConcatRequest) - return endpoint.ConcatRequest{A: req.A, B: req.B}, nil + return addendpoint.ConcatRequest{A: req.A, B: req.B}, nil } // decodeGRPCSumResponse is a transport/grpc.DecodeResponseFunc that converts a // gRPC sum reply to a user-domain sum response. Primarily useful in a client. func decodeGRPCSumResponse(_ context.Context, grpcReply interface{}) (interface{}, error) { reply := grpcReply.(*pb.SumReply) - return endpoint.SumResponse{V: int(reply.V), Err: str2err(reply.Err)}, nil + return addendpoint.SumResponse{V: int(reply.V), Err: str2err(reply.Err)}, nil } // decodeGRPCConcatResponse is a transport/grpc.DecodeResponseFunc that converts @@ -84,13 +157,13 @@ // client. func decodeGRPCConcatResponse(_ context.Context, grpcReply interface{}) (interface{}, error) { reply := grpcReply.(*pb.ConcatReply) - return endpoint.ConcatResponse{V: reply.V, Err: str2err(reply.Err)}, nil + return addendpoint.ConcatResponse{V: reply.V, Err: str2err(reply.Err)}, nil } // encodeGRPCSumResponse is a transport/grpc.EncodeResponseFunc that converts a // user-domain sum response to a gRPC sum reply. Primarily useful in a server. func encodeGRPCSumResponse(_ context.Context, response interface{}) (interface{}, error) { - resp := response.(endpoint.SumResponse) + resp := response.(addendpoint.SumResponse) return &pb.SumReply{V: int64(resp.V), Err: err2str(resp.Err)}, nil } @@ -98,14 +171,14 @@ // a user-domain concat response to a gRPC concat reply. Primarily useful in a // server. func encodeGRPCConcatResponse(_ context.Context, response interface{}) (interface{}, error) { - resp := response.(endpoint.ConcatResponse) + resp := response.(addendpoint.ConcatResponse) return &pb.ConcatReply{V: resp.V, Err: err2str(resp.Err)}, nil } // encodeGRPCSumRequest is a transport/grpc.EncodeRequestFunc that converts a // user-domain sum request to a gRPC sum request. Primarily useful in a client. func encodeGRPCSumRequest(_ context.Context, request interface{}) (interface{}, error) { - req := request.(endpoint.SumRequest) + req := request.(addendpoint.SumRequest) return &pb.SumRequest{A: int64(req.A), B: int64(req.B)}, nil } @@ -113,7 +186,7 @@ // user-domain concat request to a gRPC concat request. Primarily useful in a // client. func encodeGRPCConcatRequest(_ context.Context, request interface{}) (interface{}, error) { - req := request.(endpoint.ConcatRequest) + req := request.(addendpoint.ConcatRequest) return &pb.ConcatRequest{A: req.A, B: req.B}, nil } diff --git a/examples/addsvc2/pkg/transport/http.go b/examples/addsvc2/pkg/transport/http.go index 4146b31..e011fc3 100644 --- a/examples/addsvc2/pkg/transport/http.go +++ b/examples/addsvc2/pkg/transport/http.go @@ -7,21 +7,28 @@ "errors" "io/ioutil" "net/http" - + "net/url" + "strings" + "time" + + jujuratelimit "github.com/juju/ratelimit" stdopentracing "github.com/opentracing/opentracing-go" - "github.com/prometheus/client_golang/prometheus/promhttp" - + "github.com/sony/gobreaker" + + "github.com/go-kit/kit/circuitbreaker" + "github.com/go-kit/kit/endpoint" "github.com/go-kit/kit/log" + "github.com/go-kit/kit/ratelimit" "github.com/go-kit/kit/tracing/opentracing" httptransport "github.com/go-kit/kit/transport/http" - "github.com/go-kit/kit/examples/addsvc2/pkg/endpoint" - "github.com/go-kit/kit/examples/addsvc2/pkg/service" + addendpoint "github.com/go-kit/kit/examples/addsvc2/pkg/endpoint" + addservice "github.com/go-kit/kit/examples/addsvc2/pkg/service" ) // NewHTTPHandler returns an HTTP handler that makes a set of endpoints // available on predefined paths. -func NewHTTPHandler(ctx context.Context, endpoints endpoint.Set, logger log.Logger, trace stdopentracing.Tracer) http.Handler { +func NewHTTPHandler(endpoints addendpoint.Set, tracer stdopentracing.Tracer, logger log.Logger) http.Handler { options := []httptransport.ServerOption{ httptransport.ServerErrorEncoder(errorEncoder), httptransport.ServerErrorLogger(logger), @@ -31,16 +38,90 @@ endpoints.SumEndpoint, decodeHTTPSumRequest, encodeHTTPGenericResponse, - append(options, httptransport.ServerBefore(opentracing.FromHTTPRequest(trace, "Sum", logger)))..., + append(options, httptransport.ServerBefore(opentracing.FromHTTPRequest(tracer, "Sum", logger)))..., )) m.Handle("/concat", httptransport.NewServer( endpoints.ConcatEndpoint, decodeHTTPConcatRequest, encodeHTTPGenericResponse, - append(options, httptransport.ServerBefore(opentracing.FromHTTPRequest(trace, "Concat", logger)))..., + append(options, httptransport.ServerBefore(opentracing.FromHTTPRequest(tracer, "Concat", logger)))..., )) - m.Handle("/metrics", promhttp.Handler()) return m +} + +// NewHTTPClient returns an AddService backed by an HTTP server living at the +// remote instance. We expect instance to come from a service discovery system, +// so likely of the form "host:port". +func NewHTTPClient(instance string, tracer stdopentracing.Tracer, logger log.Logger) (addservice.Service, error) { + // Quickly sanitize the instance string. + if !strings.HasPrefix(instance, "http") { + instance = "http://" + instance + } + u, err := url.Parse(instance) + if err != nil { + return nil, err + } + + // We construct a single ratelimiter middleware, to limit the total outgoing + // QPS from this client to all methods on the remote instance. We also + // construct per-endpoint circuitbreaker middlewares to demonstrate how + // that's done, although they could easily be combined into a single breaker + // for the entire remote instance, too. + limiter := ratelimit.NewTokenBucketLimiter(jujuratelimit.NewBucketWithRate(100, 100)) + + // Each individual endpoint is an http/transport.Client (which implements + // endpoint.Endpoint) that gets wrapped with various middlewares. If you + // made your own client library, you'd do this work there, so your server + // could rely on a consistent set of client behavior. + var sumEndpoint endpoint.Endpoint + { + sumEndpoint = httptransport.NewClient( + "POST", + copyURL(u, "/sum"), + encodeHTTPGenericRequest, + decodeHTTPSumResponse, + httptransport.ClientBefore(opentracing.ToHTTPRequest(tracer, logger)), + ).Endpoint() + sumEndpoint = opentracing.TraceClient(tracer, "Sum")(sumEndpoint) + sumEndpoint = limiter(sumEndpoint) + sumEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{ + Name: "Sum", + Timeout: 30 * time.Second, + }))(sumEndpoint) + } + + // The Concat endpoint is the same thing, with slightly different + // middlewares to demonstrate how to specialize per-endpoint. + var concatEndpoint endpoint.Endpoint + { + concatEndpoint = httptransport.NewClient( + "POST", + copyURL(u, "/concat"), + encodeHTTPGenericRequest, + decodeHTTPConcatResponse, + httptransport.ClientBefore(opentracing.ToHTTPRequest(tracer, logger)), + ).Endpoint() + concatEndpoint = opentracing.TraceClient(tracer, "Concat")(concatEndpoint) + concatEndpoint = limiter(concatEndpoint) + concatEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{ + Name: "Concat", + Timeout: 10 * time.Second, + }))(concatEndpoint) + } + + // Returning the endpoint.Set as a service.Service relies on the + // endpoint.Set implementing the Service methods. That's just a simple bit + // of glue code. + return addendpoint.Set{ + SumEndpoint: sumEndpoint, + ConcatEndpoint: concatEndpoint, + }, nil +} + +func copyURL(base *url.URL, path string) *url.URL { + next := *base + next.Path = path + return &next } func errorEncoder(_ context.Context, err error, w http.ResponseWriter) { @@ -50,7 +131,7 @@ func err2code(err error) int { switch err { - case service.ErrTwoZeroes, service.ErrMaxSizeExceeded, service.ErrIntOverflow: + case addservice.ErrTwoZeroes, addservice.ErrMaxSizeExceeded, addservice.ErrIntOverflow: return http.StatusBadRequest } return http.StatusInternalServerError @@ -72,7 +153,7 @@ // JSON-encoded sum request from the HTTP request body. Primarily useful in a // server. func decodeHTTPSumRequest(_ context.Context, r *http.Request) (interface{}, error) { - var req endpoint.SumRequest + var req addendpoint.SumRequest err := json.NewDecoder(r.Body).Decode(&req) return req, err } @@ -81,7 +162,7 @@ // JSON-encoded concat request from the HTTP request body. Primarily useful in a // server. func decodeHTTPConcatRequest(_ context.Context, r *http.Request) (interface{}, error) { - var req endpoint.ConcatRequest + var req addendpoint.ConcatRequest err := json.NewDecoder(r.Body).Decode(&req) return req, err } @@ -95,7 +176,7 @@ if r.StatusCode != http.StatusOK { return nil, errors.New(r.Status) } - var resp endpoint.SumResponse + var resp addendpoint.SumResponse err := json.NewDecoder(r.Body).Decode(&resp) return resp, err } @@ -109,7 +190,7 @@ if r.StatusCode != http.StatusOK { return nil, errors.New(r.Status) } - var resp endpoint.ConcatResponse + var resp addendpoint.ConcatResponse err := json.NewDecoder(r.Body).Decode(&resp) return resp, err } @@ -128,7 +209,7 @@ // encodeHTTPGenericResponse is a transport/http.EncodeResponseFunc that encodes // the response as JSON to the response writer. Primarily useful in a server. func encodeHTTPGenericResponse(ctx context.Context, w http.ResponseWriter, response interface{}) error { - if f, ok := response.(endpoint.Failer); ok && f.Failed() != nil { + if f, ok := response.(addendpoint.Failer); ok && f.Failed() != nil { errorEncoder(ctx, f.Failed(), w) return nil } diff --git a/examples/addsvc2/pkg/transport/thrift.go b/examples/addsvc2/pkg/transport/thrift.go index 49d99ff..4ec99c6 100644 --- a/examples/addsvc2/pkg/transport/thrift.go +++ b/examples/addsvc2/pkg/transport/thrift.go @@ -2,12 +2,24 @@ import ( "context" + "time" + jujuratelimit "github.com/juju/ratelimit" + "github.com/sony/gobreaker" + + "github.com/go-kit/kit/circuitbreaker" "github.com/go-kit/kit/endpoint" + "github.com/go-kit/kit/ratelimit" + addendpoint "github.com/go-kit/kit/examples/addsvc2/pkg/endpoint" - "github.com/go-kit/kit/examples/addsvc2/pkg/service" + addservice "github.com/go-kit/kit/examples/addsvc2/pkg/service" thriftadd "github.com/go-kit/kit/examples/addsvc2/thrift/gen-go/addsvc" ) + +type thriftServer struct { + ctx context.Context + endpoints addendpoint.Set +} // NewThriftServer makes a set of endpoints available as a Thrift service. func NewThriftServer(ctx context.Context, endpoints addendpoint.Set) thriftadd.AddService { @@ -15,11 +27,6 @@ ctx: ctx, endpoints: endpoints, } -} - -type thriftServer struct { - ctx context.Context - endpoints addendpoint.Set } func (s *thriftServer) Sum(a int64, b int64) (*thriftadd.SumReply, error) { @@ -42,13 +49,58 @@ return &thriftadd.ConcatReply{Value: resp.V, Err: err2str(resp.Err)}, nil } +// NewThriftClient returns an AddService backed by a Thrift server described by +// the provided client. The caller is responsible for constructing the client, +// and eventually closing the underlying transport. +func NewThriftClient(client *thriftadd.AddServiceClient) addservice.Service { + // We construct a single ratelimiter middleware, to limit the total outgoing + // QPS from this client to all methods on the remote instance. We also + // construct per-endpoint circuitbreaker middlewares to demonstrate how + // that's done, although they could easily be combined into a single breaker + // for the entire remote instance, too. + limiter := ratelimit.NewTokenBucketLimiter(jujuratelimit.NewBucketWithRate(100, 100)) + + // Each individual endpoint is an http/transport.Client (which implements + // endpoint.Endpoint) that gets wrapped with various middlewares. If you + // could rely on a consistent set of client behavior. + var sumEndpoint endpoint.Endpoint + { + sumEndpoint = MakeThriftSumEndpoint(client) + sumEndpoint = limiter(sumEndpoint) + sumEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{ + Name: "Sum", + Timeout: 30 * time.Second, + }))(sumEndpoint) + } + + // The Concat endpoint is the same thing, with slightly different + // middlewares to demonstrate how to specialize per-endpoint. + var concatEndpoint endpoint.Endpoint + { + concatEndpoint = MakeThriftConcatEndpoint(client) + concatEndpoint = limiter(concatEndpoint) + concatEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{ + Name: "Concat", + Timeout: 10 * time.Second, + }))(concatEndpoint) + } + + // Returning the endpoint.Set as a service.Service relies on the + // endpoint.Set implementing the Service methods. That's just a simple bit + // of glue code. + return addendpoint.Set{ + SumEndpoint: sumEndpoint, + ConcatEndpoint: concatEndpoint, + } +} + // MakeThriftSumEndpoint returns an endpoint that invokes the passed Thrift client. // Useful only in clients, and only until a proper transport/thrift.Client exists. func MakeThriftSumEndpoint(client *thriftadd.AddServiceClient) endpoint.Endpoint { return func(ctx context.Context, request interface{}) (interface{}, error) { req := request.(addendpoint.SumRequest) reply, err := client.Sum(int64(req.A), int64(req.B)) - if err == service.ErrIntOverflow { + if err == addservice.ErrIntOverflow { return nil, err // special case; see comment on ErrIntOverflow } return addendpoint.SumResponse{V: int(reply.Value), Err: err}, nil