Codebase list golang-github-go-kit-kit / 6cb68cf
examples/addsvc2: add addcli Peter Bourgon 6 years ago
6 changed file(s) with 467 addition(s) and 39 deletion(s). Raw diff Collapse all Expand all
0 package main
1
2 import (
3 "context"
4 "flag"
5 "fmt"
6 "os"
7 "strconv"
8 "text/tabwriter"
9 "time"
10
11 "google.golang.org/grpc"
12
13 "github.com/apache/thrift/lib/go/thrift"
14 lightstep "github.com/lightstep/lightstep-tracer-go"
15 stdopentracing "github.com/opentracing/opentracing-go"
16 zipkin "github.com/openzipkin/zipkin-go-opentracing"
17 "sourcegraph.com/sourcegraph/appdash"
18 appdashot "sourcegraph.com/sourcegraph/appdash/opentracing"
19
20 "github.com/go-kit/kit/log"
21
22 addservice "github.com/go-kit/kit/examples/addsvc2/pkg/service"
23 addtransport "github.com/go-kit/kit/examples/addsvc2/pkg/transport"
24 addthrift "github.com/go-kit/kit/examples/addsvc2/thrift/gen-go/addsvc"
25 )
26
27 func main() {
28 // The addcli presumes no service discovery system, and expects users to
29 // provide the direct address of an addsvc. This presumption is reflected in
30 // the addcli binary and the the client packages: the -transport.addr flags
31 // and various client constructors both expect host:port strings. For an
32 // example service with a client built on top of a service discovery system,
33 // see profilesvc.
34 fs := flag.NewFlagSet("addcli", flag.ExitOnError)
35 var (
36 httpAddr = fs.String("http-addr", "", "HTTP address of addsvc")
37 grpcAddr = fs.String("grpc-addr", "", "gRPC address of addsvc")
38 thriftAddr = fs.String("thrift-addr", "", "Thrift address of addsvc")
39 thriftProtocol = fs.String("thrift-protocol", "binary", "binary, compact, json, simplejson")
40 thriftBuffer = fs.Int("thrift-buffer", 0, "0 for unbuffered")
41 thriftFramed = fs.Bool("thrift-framed", false, "true to enable framing")
42 zipkinURL = fs.String("zipkin-url", "", "Enable Zipkin tracing via a collector URL e.g. http://localhost:9411/api/v1/spans")
43 lightstepToken = flag.String("lightstep-token", "", "Enable LightStep tracing via a LightStep access token")
44 appdashAddr = flag.String("appdash-addr", "", "Enable Appdash tracing via an Appdash server host:port")
45 method = fs.String("method", "sum", "sum, concat")
46 )
47 fs.Usage = usageFor(fs, os.Args[0]+" [flags] <a> <b>")
48 fs.Parse(os.Args[1:])
49 if len(fs.Args()) != 2 {
50 fs.Usage()
51 os.Exit(1)
52 }
53
54 // This is a demonstration client, which supports multiple tracers.
55 // Your clients will probably just use one tracer.
56 var tracer stdopentracing.Tracer
57 {
58 if *zipkinURL != "" {
59 collector, err := zipkin.NewHTTPCollector(*zipkinURL)
60 if err != nil {
61 fmt.Fprintln(os.Stderr, err.Error())
62 os.Exit(1)
63 }
64 defer collector.Close()
65 var (
66 debug = false
67 hostPort = "localhost:80"
68 serviceName = "addsvc"
69 )
70 recorder := zipkin.NewRecorder(collector, debug, hostPort, serviceName)
71 tracer, err = zipkin.NewTracer(recorder)
72 if err != nil {
73 fmt.Fprintln(os.Stderr, err.Error())
74 os.Exit(1)
75 }
76 } else if *lightstepToken != "" {
77 tracer = lightstep.NewTracer(lightstep.Options{
78 AccessToken: *lightstepToken,
79 })
80 defer lightstep.FlushLightStepTracer(tracer)
81 } else if *appdashAddr != "" {
82 tracer = appdashot.NewTracer(appdash.NewRemoteCollector(*appdashAddr))
83 } else {
84 tracer = stdopentracing.GlobalTracer() // no-op
85 }
86 }
87
88 // This is a demonstration client, which supports multiple transports.
89 // Your clients will probably just define and stick with 1 transport.
90 var (
91 svc addservice.Service
92 err error
93 )
94 if *httpAddr != "" {
95 svc, err = addtransport.NewHTTPClient(*httpAddr, tracer, log.NewNopLogger())
96 } else if *grpcAddr != "" {
97 conn, err := grpc.Dial(*grpcAddr, grpc.WithInsecure(), grpc.WithTimeout(time.Second))
98 if err != nil {
99 fmt.Fprintf(os.Stderr, "error: %v", err)
100 os.Exit(1)
101 }
102 defer conn.Close()
103 svc = addtransport.NewGRPCClient(conn, tracer, log.NewNopLogger())
104 } else if *thriftAddr != "" {
105 // It's necessary to do all of this construction in the func main,
106 // because (among other reasons) we need to control the lifecycle of the
107 // Thrift transport, i.e. close it eventually.
108 var protocolFactory thrift.TProtocolFactory
109 switch *thriftProtocol {
110 case "compact":
111 protocolFactory = thrift.NewTCompactProtocolFactory()
112 case "simplejson":
113 protocolFactory = thrift.NewTSimpleJSONProtocolFactory()
114 case "json":
115 protocolFactory = thrift.NewTJSONProtocolFactory()
116 case "binary", "":
117 protocolFactory = thrift.NewTBinaryProtocolFactoryDefault()
118 default:
119 fmt.Fprintf(os.Stderr, "error: invalid protocol %q\n", *thriftProtocol)
120 os.Exit(1)
121 }
122 var transportFactory thrift.TTransportFactory
123 if *thriftBuffer > 0 {
124 transportFactory = thrift.NewTBufferedTransportFactory(*thriftBuffer)
125 } else {
126 transportFactory = thrift.NewTTransportFactory()
127 }
128 if *thriftFramed {
129 transportFactory = thrift.NewTFramedTransportFactory(transportFactory)
130 }
131 transportSocket, err := thrift.NewTSocket(*thriftAddr)
132 if err != nil {
133 fmt.Fprintf(os.Stderr, "error: %v\n", err)
134 os.Exit(1)
135 }
136 transport, err := transportFactory.GetTransport(transportSocket)
137 if err != nil {
138 fmt.Fprintf(os.Stderr, "error: %v\n", err)
139 os.Exit(1)
140 }
141 if err := transport.Open(); err != nil {
142 fmt.Fprintf(os.Stderr, "error: %v\n", err)
143 os.Exit(1)
144 }
145 defer transport.Close()
146 client := addthrift.NewAddServiceClientFactory(transport, protocolFactory)
147 svc = addtransport.NewThriftClient(client)
148 } else {
149 fmt.Fprintf(os.Stderr, "error: no remote address specified\n")
150 os.Exit(1)
151 }
152 if err != nil {
153 fmt.Fprintf(os.Stderr, "error: %v\n", err)
154 os.Exit(1)
155 }
156
157 switch *method {
158 case "sum":
159 a, _ := strconv.ParseInt(fs.Args()[0], 10, 64)
160 b, _ := strconv.ParseInt(fs.Args()[1], 10, 64)
161 v, err := svc.Sum(context.Background(), int(a), int(b))
162 if err != nil {
163 fmt.Fprintf(os.Stderr, "error: %v\n", err)
164 os.Exit(1)
165 }
166 fmt.Fprintf(os.Stdout, "%d + %d = %d\n", a, b, v)
167
168 case "concat":
169 a := fs.Args()[0]
170 b := fs.Args()[1]
171 v, err := svc.Concat(context.Background(), a, b)
172 if err != nil {
173 fmt.Fprintf(os.Stderr, "error: %v\n", err)
174 os.Exit(1)
175 }
176 fmt.Fprintf(os.Stdout, "%q + %q = %q\n", a, b, v)
177
178 default:
179 fmt.Fprintf(os.Stderr, "error: invalid method %q\n", method)
180 os.Exit(1)
181 }
182 }
183
184 func usageFor(fs *flag.FlagSet, short string) func() {
185 return func() {
186 fmt.Fprintf(os.Stderr, "USAGE\n")
187 fmt.Fprintf(os.Stderr, " %s\n", short)
188 fmt.Fprintf(os.Stderr, "\n")
189 fmt.Fprintf(os.Stderr, "FLAGS\n")
190 w := tabwriter.NewWriter(os.Stderr, 0, 2, 2, ' ', 0)
191 fs.VisitAll(func(f *flag.Flag) {
192 fmt.Fprintf(w, "\t-%s %s\t%s\n", f.Name, f.DefValue, f.Usage)
193 })
194 w.Flush()
195 fmt.Fprintf(os.Stderr, "\n")
196 }
197 }
1616 stdopentracing "github.com/opentracing/opentracing-go"
1717 zipkin "github.com/openzipkin/zipkin-go-opentracing"
1818 stdprometheus "github.com/prometheus/client_golang/prometheus"
19 "github.com/prometheus/client_golang/prometheus/promhttp"
1920 "google.golang.org/grpc"
2021 "sourcegraph.com/sourcegraph/appdash"
2122 appdashot "sourcegraph.com/sourcegraph/appdash/opentracing"
125126 Help: "Request duration in seconds.",
126127 }, []string{"method", "success"})
127128 }
129 http.DefaultServeMux.Handle("/metrics", promhttp.Handler())
128130
129131 // Build the layers of the service "onion" from the inside out. First, the
130132 // business logic service; then, the set of endpoints that wrap the service;
135137 var (
136138 service = addservice.New(logger, ints, chars)
137139 endpoints = addendpoint.New(service, logger, duration, tracer)
138 httpHandler = addtransport.NewHTTPHandler(context.Background(), endpoints, logger, tracer)
140 httpHandler = addtransport.NewHTTPHandler(endpoints, tracer, logger)
139141 grpcServer = addtransport.NewGRPCServer(endpoints, tracer, logger)
140142 thriftServer = addtransport.NewThriftServer(context.Background(), endpoints)
141143 )
5151 }
5252 }
5353
54 // Sum implements the service interface, so Set may be used as a service.
55 // This is primarily useful in the context of a client library.
56 func (s Set) Sum(ctx context.Context, a, b int) (int, error) {
57 resp, err := s.SumEndpoint(ctx, SumRequest{A: a, B: b})
58 if err != nil {
59 return 0, err
60 }
61 response := resp.(SumResponse)
62 return response.V, response.Err
63 }
64
65 // Concat implements the service interface, so Set may be used as a
66 // service. This is primarily useful in the context of a client library.
67 func (s Set) Concat(ctx context.Context, a, b string) (string, error) {
68 resp, err := s.ConcatEndpoint(ctx, ConcatRequest{A: a, B: b})
69 if err != nil {
70 return "", err
71 }
72 response := resp.(ConcatResponse)
73 return response.V, response.Err
74 }
75
5476 // MakeSumEndpoint constructs a Sum endpoint wrapping the service.
5577 func MakeSumEndpoint(s service.Service) endpoint.Endpoint {
5678 return func(ctx context.Context, request interface{}) (response interface{}, err error) {
22 import (
33 "context"
44 "errors"
5
5 "time"
6
7 "google.golang.org/grpc"
8
9 jujuratelimit "github.com/juju/ratelimit"
610 stdopentracing "github.com/opentracing/opentracing-go"
11 "github.com/sony/gobreaker"
712 oldcontext "golang.org/x/net/context"
813
14 "github.com/go-kit/kit/circuitbreaker"
15 "github.com/go-kit/kit/endpoint"
916 "github.com/go-kit/kit/log"
17 "github.com/go-kit/kit/ratelimit"
1018 "github.com/go-kit/kit/tracing/opentracing"
1119 grpctransport "github.com/go-kit/kit/transport/grpc"
1220
1321 "github.com/go-kit/kit/examples/addsvc2/pb"
14 "github.com/go-kit/kit/examples/addsvc2/pkg/endpoint"
22 addendpoint "github.com/go-kit/kit/examples/addsvc2/pkg/endpoint"
23 addservice "github.com/go-kit/kit/examples/addsvc2/pkg/service"
1524 )
1625
26 type grpcServer struct {
27 sum grpctransport.Handler
28 concat grpctransport.Handler
29 }
30
1731 // NewGRPCServer makes a set of endpoints available as a gRPC AddServer.
18 func NewGRPCServer(endpoints endpoint.Set, tracer stdopentracing.Tracer, logger log.Logger) pb.AddServer {
32 func NewGRPCServer(endpoints addendpoint.Set, tracer stdopentracing.Tracer, logger log.Logger) pb.AddServer {
1933 options := []grpctransport.ServerOption{
2034 grpctransport.ServerErrorLogger(logger),
2135 }
3549 }
3650 }
3751
38 type grpcServer struct {
39 sum grpctransport.Handler
40 concat grpctransport.Handler
41 }
42
4352 func (s *grpcServer) Sum(ctx oldcontext.Context, req *pb.SumRequest) (*pb.SumReply, error) {
4453 _, rep, err := s.sum.ServeGRPC(ctx, req)
4554 if err != nil {
5665 return rep.(*pb.ConcatReply), nil
5766 }
5867
68 // NewGRPCClient returns an AddService backed by a gRPC server at the other end
69 // of the conn. The caller is responsible for constructing the conn, and
70 // eventually closing the underlying transport.
71 func NewGRPCClient(conn *grpc.ClientConn, tracer stdopentracing.Tracer, logger log.Logger) addservice.Service {
72 // We construct a single ratelimiter middleware, to limit the total outgoing
73 // QPS from this client to all methods on the remote instance. We also
74 // construct per-endpoint circuitbreaker middlewares to demonstrate how
75 // that's done, although they could easily be combined into a single breaker
76 // for the entire remote instance, too.
77 limiter := ratelimit.NewTokenBucketLimiter(jujuratelimit.NewBucketWithRate(100, 100))
78
79 // Each individual endpoint is an http/transport.Client (which implements
80 // endpoint.Endpoint) that gets wrapped with various middlewares. If you
81 // made your own client library, you'd do this work there, so your server
82 // could rely on a consistent set of client behavior.
83 var sumEndpoint endpoint.Endpoint
84 {
85 sumEndpoint = grpctransport.NewClient(
86 conn,
87 "pb.Add",
88 "Sum",
89 encodeGRPCSumRequest,
90 decodeGRPCSumResponse,
91 pb.SumReply{},
92 grpctransport.ClientBefore(opentracing.ToGRPCRequest(tracer, logger)),
93 ).Endpoint()
94 sumEndpoint = opentracing.TraceClient(tracer, "Sum")(sumEndpoint)
95 sumEndpoint = limiter(sumEndpoint)
96 sumEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{
97 Name: "Sum",
98 Timeout: 30 * time.Second,
99 }))(sumEndpoint)
100 }
101
102 // The Concat endpoint is the same thing, with slightly different
103 // middlewares to demonstrate how to specialize per-endpoint.
104 var concatEndpoint endpoint.Endpoint
105 {
106 concatEndpoint = grpctransport.NewClient(
107 conn,
108 "pb.Add",
109 "Concat",
110 encodeGRPCConcatRequest,
111 decodeGRPCConcatResponse,
112 pb.ConcatReply{},
113 grpctransport.ClientBefore(opentracing.ToGRPCRequest(tracer, logger)),
114 ).Endpoint()
115 concatEndpoint = opentracing.TraceClient(tracer, "Concat")(concatEndpoint)
116 concatEndpoint = limiter(concatEndpoint)
117 concatEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{
118 Name: "Concat",
119 Timeout: 10 * time.Second,
120 }))(concatEndpoint)
121 }
122
123 // Returning the endpoint.Set as a service.Service relies on the
124 // endpoint.Set implementing the Service methods. That's just a simple bit
125 // of glue code.
126 return addendpoint.Set{
127 SumEndpoint: sumEndpoint,
128 ConcatEndpoint: concatEndpoint,
129 }
130 }
131
59132 // decodeGRPCSumRequest is a transport/grpc.DecodeRequestFunc that converts a
60133 // gRPC sum request to a user-domain sum request. Primarily useful in a server.
61134 func decodeGRPCSumRequest(_ context.Context, grpcReq interface{}) (interface{}, error) {
62135 req := grpcReq.(*pb.SumRequest)
63 return endpoint.SumRequest{A: int(req.A), B: int(req.B)}, nil
136 return addendpoint.SumRequest{A: int(req.A), B: int(req.B)}, nil
64137 }
65138
66139 // decodeGRPCConcatRequest is a transport/grpc.DecodeRequestFunc that converts a
68141 // server.
69142 func decodeGRPCConcatRequest(_ context.Context, grpcReq interface{}) (interface{}, error) {
70143 req := grpcReq.(*pb.ConcatRequest)
71 return endpoint.ConcatRequest{A: req.A, B: req.B}, nil
144 return addendpoint.ConcatRequest{A: req.A, B: req.B}, nil
72145 }
73146
74147 // decodeGRPCSumResponse is a transport/grpc.DecodeResponseFunc that converts a
75148 // gRPC sum reply to a user-domain sum response. Primarily useful in a client.
76149 func decodeGRPCSumResponse(_ context.Context, grpcReply interface{}) (interface{}, error) {
77150 reply := grpcReply.(*pb.SumReply)
78 return endpoint.SumResponse{V: int(reply.V), Err: str2err(reply.Err)}, nil
151 return addendpoint.SumResponse{V: int(reply.V), Err: str2err(reply.Err)}, nil
79152 }
80153
81154 // decodeGRPCConcatResponse is a transport/grpc.DecodeResponseFunc that converts
83156 // client.
84157 func decodeGRPCConcatResponse(_ context.Context, grpcReply interface{}) (interface{}, error) {
85158 reply := grpcReply.(*pb.ConcatReply)
86 return endpoint.ConcatResponse{V: reply.V, Err: str2err(reply.Err)}, nil
159 return addendpoint.ConcatResponse{V: reply.V, Err: str2err(reply.Err)}, nil
87160 }
88161
89162 // encodeGRPCSumResponse is a transport/grpc.EncodeResponseFunc that converts a
90163 // user-domain sum response to a gRPC sum reply. Primarily useful in a server.
91164 func encodeGRPCSumResponse(_ context.Context, response interface{}) (interface{}, error) {
92 resp := response.(endpoint.SumResponse)
165 resp := response.(addendpoint.SumResponse)
93166 return &pb.SumReply{V: int64(resp.V), Err: err2str(resp.Err)}, nil
94167 }
95168
97170 // a user-domain concat response to a gRPC concat reply. Primarily useful in a
98171 // server.
99172 func encodeGRPCConcatResponse(_ context.Context, response interface{}) (interface{}, error) {
100 resp := response.(endpoint.ConcatResponse)
173 resp := response.(addendpoint.ConcatResponse)
101174 return &pb.ConcatReply{V: resp.V, Err: err2str(resp.Err)}, nil
102175 }
103176
104177 // encodeGRPCSumRequest is a transport/grpc.EncodeRequestFunc that converts a
105178 // user-domain sum request to a gRPC sum request. Primarily useful in a client.
106179 func encodeGRPCSumRequest(_ context.Context, request interface{}) (interface{}, error) {
107 req := request.(endpoint.SumRequest)
180 req := request.(addendpoint.SumRequest)
108181 return &pb.SumRequest{A: int64(req.A), B: int64(req.B)}, nil
109182 }
110183
112185 // user-domain concat request to a gRPC concat request. Primarily useful in a
113186 // client.
114187 func encodeGRPCConcatRequest(_ context.Context, request interface{}) (interface{}, error) {
115 req := request.(endpoint.ConcatRequest)
188 req := request.(addendpoint.ConcatRequest)
116189 return &pb.ConcatRequest{A: req.A, B: req.B}, nil
117190 }
118191
66 "errors"
77 "io/ioutil"
88 "net/http"
9
9 "net/url"
10 "strings"
11 "time"
12
13 jujuratelimit "github.com/juju/ratelimit"
1014 stdopentracing "github.com/opentracing/opentracing-go"
11 "github.com/prometheus/client_golang/prometheus/promhttp"
12
15 "github.com/sony/gobreaker"
16
17 "github.com/go-kit/kit/circuitbreaker"
18 "github.com/go-kit/kit/endpoint"
1319 "github.com/go-kit/kit/log"
20 "github.com/go-kit/kit/ratelimit"
1421 "github.com/go-kit/kit/tracing/opentracing"
1522 httptransport "github.com/go-kit/kit/transport/http"
1623
17 "github.com/go-kit/kit/examples/addsvc2/pkg/endpoint"
18 "github.com/go-kit/kit/examples/addsvc2/pkg/service"
24 addendpoint "github.com/go-kit/kit/examples/addsvc2/pkg/endpoint"
25 addservice "github.com/go-kit/kit/examples/addsvc2/pkg/service"
1926 )
2027
2128 // NewHTTPHandler returns an HTTP handler that makes a set of endpoints
2229 // available on predefined paths.
23 func NewHTTPHandler(ctx context.Context, endpoints endpoint.Set, logger log.Logger, trace stdopentracing.Tracer) http.Handler {
30 func NewHTTPHandler(endpoints addendpoint.Set, tracer stdopentracing.Tracer, logger log.Logger) http.Handler {
2431 options := []httptransport.ServerOption{
2532 httptransport.ServerErrorEncoder(errorEncoder),
2633 httptransport.ServerErrorLogger(logger),
3037 endpoints.SumEndpoint,
3138 decodeHTTPSumRequest,
3239 encodeHTTPGenericResponse,
33 append(options, httptransport.ServerBefore(opentracing.FromHTTPRequest(trace, "Sum", logger)))...,
40 append(options, httptransport.ServerBefore(opentracing.FromHTTPRequest(tracer, "Sum", logger)))...,
3441 ))
3542 m.Handle("/concat", httptransport.NewServer(
3643 endpoints.ConcatEndpoint,
3744 decodeHTTPConcatRequest,
3845 encodeHTTPGenericResponse,
39 append(options, httptransport.ServerBefore(opentracing.FromHTTPRequest(trace, "Concat", logger)))...,
46 append(options, httptransport.ServerBefore(opentracing.FromHTTPRequest(tracer, "Concat", logger)))...,
4047 ))
41 m.Handle("/metrics", promhttp.Handler())
4248 return m
49 }
50
51 // NewHTTPClient returns an AddService backed by an HTTP server living at the
52 // remote instance. We expect instance to come from a service discovery system,
53 // so likely of the form "host:port".
54 func NewHTTPClient(instance string, tracer stdopentracing.Tracer, logger log.Logger) (addservice.Service, error) {
55 // Quickly sanitize the instance string.
56 if !strings.HasPrefix(instance, "http") {
57 instance = "http://" + instance
58 }
59 u, err := url.Parse(instance)
60 if err != nil {
61 return nil, err
62 }
63
64 // We construct a single ratelimiter middleware, to limit the total outgoing
65 // QPS from this client to all methods on the remote instance. We also
66 // construct per-endpoint circuitbreaker middlewares to demonstrate how
67 // that's done, although they could easily be combined into a single breaker
68 // for the entire remote instance, too.
69 limiter := ratelimit.NewTokenBucketLimiter(jujuratelimit.NewBucketWithRate(100, 100))
70
71 // Each individual endpoint is an http/transport.Client (which implements
72 // endpoint.Endpoint) that gets wrapped with various middlewares. If you
73 // made your own client library, you'd do this work there, so your server
74 // could rely on a consistent set of client behavior.
75 var sumEndpoint endpoint.Endpoint
76 {
77 sumEndpoint = httptransport.NewClient(
78 "POST",
79 copyURL(u, "/sum"),
80 encodeHTTPGenericRequest,
81 decodeHTTPSumResponse,
82 httptransport.ClientBefore(opentracing.ToHTTPRequest(tracer, logger)),
83 ).Endpoint()
84 sumEndpoint = opentracing.TraceClient(tracer, "Sum")(sumEndpoint)
85 sumEndpoint = limiter(sumEndpoint)
86 sumEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{
87 Name: "Sum",
88 Timeout: 30 * time.Second,
89 }))(sumEndpoint)
90 }
91
92 // The Concat endpoint is the same thing, with slightly different
93 // middlewares to demonstrate how to specialize per-endpoint.
94 var concatEndpoint endpoint.Endpoint
95 {
96 concatEndpoint = httptransport.NewClient(
97 "POST",
98 copyURL(u, "/concat"),
99 encodeHTTPGenericRequest,
100 decodeHTTPConcatResponse,
101 httptransport.ClientBefore(opentracing.ToHTTPRequest(tracer, logger)),
102 ).Endpoint()
103 concatEndpoint = opentracing.TraceClient(tracer, "Concat")(concatEndpoint)
104 concatEndpoint = limiter(concatEndpoint)
105 concatEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{
106 Name: "Concat",
107 Timeout: 10 * time.Second,
108 }))(concatEndpoint)
109 }
110
111 // Returning the endpoint.Set as a service.Service relies on the
112 // endpoint.Set implementing the Service methods. That's just a simple bit
113 // of glue code.
114 return addendpoint.Set{
115 SumEndpoint: sumEndpoint,
116 ConcatEndpoint: concatEndpoint,
117 }, nil
118 }
119
120 func copyURL(base *url.URL, path string) *url.URL {
121 next := *base
122 next.Path = path
123 return &next
43124 }
44125
45126 func errorEncoder(_ context.Context, err error, w http.ResponseWriter) {
49130
50131 func err2code(err error) int {
51132 switch err {
52 case service.ErrTwoZeroes, service.ErrMaxSizeExceeded, service.ErrIntOverflow:
133 case addservice.ErrTwoZeroes, addservice.ErrMaxSizeExceeded, addservice.ErrIntOverflow:
53134 return http.StatusBadRequest
54135 }
55136 return http.StatusInternalServerError
71152 // JSON-encoded sum request from the HTTP request body. Primarily useful in a
72153 // server.
73154 func decodeHTTPSumRequest(_ context.Context, r *http.Request) (interface{}, error) {
74 var req endpoint.SumRequest
155 var req addendpoint.SumRequest
75156 err := json.NewDecoder(r.Body).Decode(&req)
76157 return req, err
77158 }
80161 // JSON-encoded concat request from the HTTP request body. Primarily useful in a
81162 // server.
82163 func decodeHTTPConcatRequest(_ context.Context, r *http.Request) (interface{}, error) {
83 var req endpoint.ConcatRequest
164 var req addendpoint.ConcatRequest
84165 err := json.NewDecoder(r.Body).Decode(&req)
85166 return req, err
86167 }
94175 if r.StatusCode != http.StatusOK {
95176 return nil, errors.New(r.Status)
96177 }
97 var resp endpoint.SumResponse
178 var resp addendpoint.SumResponse
98179 err := json.NewDecoder(r.Body).Decode(&resp)
99180 return resp, err
100181 }
108189 if r.StatusCode != http.StatusOK {
109190 return nil, errors.New(r.Status)
110191 }
111 var resp endpoint.ConcatResponse
192 var resp addendpoint.ConcatResponse
112193 err := json.NewDecoder(r.Body).Decode(&resp)
113194 return resp, err
114195 }
127208 // encodeHTTPGenericResponse is a transport/http.EncodeResponseFunc that encodes
128209 // the response as JSON to the response writer. Primarily useful in a server.
129210 func encodeHTTPGenericResponse(ctx context.Context, w http.ResponseWriter, response interface{}) error {
130 if f, ok := response.(endpoint.Failer); ok && f.Failed() != nil {
211 if f, ok := response.(addendpoint.Failer); ok && f.Failed() != nil {
131212 errorEncoder(ctx, f.Failed(), w)
132213 return nil
133214 }
11
22 import (
33 "context"
4 "time"
45
6 jujuratelimit "github.com/juju/ratelimit"
7 "github.com/sony/gobreaker"
8
9 "github.com/go-kit/kit/circuitbreaker"
510 "github.com/go-kit/kit/endpoint"
11 "github.com/go-kit/kit/ratelimit"
12
613 addendpoint "github.com/go-kit/kit/examples/addsvc2/pkg/endpoint"
7 "github.com/go-kit/kit/examples/addsvc2/pkg/service"
14 addservice "github.com/go-kit/kit/examples/addsvc2/pkg/service"
815 thriftadd "github.com/go-kit/kit/examples/addsvc2/thrift/gen-go/addsvc"
916 )
17
18 type thriftServer struct {
19 ctx context.Context
20 endpoints addendpoint.Set
21 }
1022
1123 // NewThriftServer makes a set of endpoints available as a Thrift service.
1224 func NewThriftServer(ctx context.Context, endpoints addendpoint.Set) thriftadd.AddService {
1426 ctx: ctx,
1527 endpoints: endpoints,
1628 }
17 }
18
19 type thriftServer struct {
20 ctx context.Context
21 endpoints addendpoint.Set
2229 }
2330
2431 func (s *thriftServer) Sum(a int64, b int64) (*thriftadd.SumReply, error) {
4148 return &thriftadd.ConcatReply{Value: resp.V, Err: err2str(resp.Err)}, nil
4249 }
4350
51 // NewThriftClient returns an AddService backed by a Thrift server described by
52 // the provided client. The caller is responsible for constructing the client,
53 // and eventually closing the underlying transport.
54 func NewThriftClient(client *thriftadd.AddServiceClient) addservice.Service {
55 // We construct a single ratelimiter middleware, to limit the total outgoing
56 // QPS from this client to all methods on the remote instance. We also
57 // construct per-endpoint circuitbreaker middlewares to demonstrate how
58 // that's done, although they could easily be combined into a single breaker
59 // for the entire remote instance, too.
60 limiter := ratelimit.NewTokenBucketLimiter(jujuratelimit.NewBucketWithRate(100, 100))
61
62 // Each individual endpoint is an http/transport.Client (which implements
63 // endpoint.Endpoint) that gets wrapped with various middlewares. If you
64 // could rely on a consistent set of client behavior.
65 var sumEndpoint endpoint.Endpoint
66 {
67 sumEndpoint = MakeThriftSumEndpoint(client)
68 sumEndpoint = limiter(sumEndpoint)
69 sumEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{
70 Name: "Sum",
71 Timeout: 30 * time.Second,
72 }))(sumEndpoint)
73 }
74
75 // The Concat endpoint is the same thing, with slightly different
76 // middlewares to demonstrate how to specialize per-endpoint.
77 var concatEndpoint endpoint.Endpoint
78 {
79 concatEndpoint = MakeThriftConcatEndpoint(client)
80 concatEndpoint = limiter(concatEndpoint)
81 concatEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{
82 Name: "Concat",
83 Timeout: 10 * time.Second,
84 }))(concatEndpoint)
85 }
86
87 // Returning the endpoint.Set as a service.Service relies on the
88 // endpoint.Set implementing the Service methods. That's just a simple bit
89 // of glue code.
90 return addendpoint.Set{
91 SumEndpoint: sumEndpoint,
92 ConcatEndpoint: concatEndpoint,
93 }
94 }
95
4496 // MakeThriftSumEndpoint returns an endpoint that invokes the passed Thrift client.
4597 // Useful only in clients, and only until a proper transport/thrift.Client exists.
4698 func MakeThriftSumEndpoint(client *thriftadd.AddServiceClient) endpoint.Endpoint {
4799 return func(ctx context.Context, request interface{}) (interface{}, error) {
48100 req := request.(addendpoint.SumRequest)
49101 reply, err := client.Sum(int64(req.A), int64(req.B))
50 if err == service.ErrIntOverflow {
102 if err == addservice.ErrIntOverflow {
51103 return nil, err // special case; see comment on ErrIntOverflow
52104 }
53105 return addendpoint.SumResponse{V: int(reply.Value), Err: err}, nil