Package list golang-github-go-kit-kit / 51ccc66
examples/addsvc: refactor Peter Bourgon 5 years ago
42 changed file(s) with 2649 addition(s) and 2525 deletion(s). Raw diff Collapse all Expand all
+0
-71
examples/addsvc/client/client.go less more
0 package main
1
2 import (
3 "golang.org/x/net/context"
4
5 "github.com/go-kit/kit/endpoint"
6 "github.com/go-kit/kit/examples/addsvc/server"
7 "github.com/go-kit/kit/log"
8 )
9
10 // NewClient returns an AddService that's backed by the provided Endpoints
11 func newClient(ctx context.Context, sumEndpoint endpoint.Endpoint, concatEndpoint endpoint.Endpoint, logger log.Logger) server.AddService {
12 return client{
13 Context: ctx,
14 Logger: logger,
15 sum: sumEndpoint,
16 concat: concatEndpoint,
17 }
18 }
19
20 type client struct {
21 context.Context
22 log.Logger
23 sum endpoint.Endpoint
24 concat endpoint.Endpoint
25 }
26
27 // TODO(pb): If your service interface methods don't return an error, we have
28 // no way to signal problems with a service client. If they don't take a
29 // context, we have to provide a global context for any transport that
30 // requires one, effectively making your service a black box to any context-
31 // specific information. So, we should make some recommendations:
32 //
33 // - To get started, a simple service interface is probably fine.
34 //
35 // - To properly deal with transport errors, every method on your service
36 // should return an error. This is probably important.
37 //
38 // - To properly deal with context information, every method on your service
39 // can take a context as its first argument. This may or may not be
40 // important.
41
42 func (c client) Sum(a, b int) int {
43 request := server.SumRequest{
44 A: a,
45 B: b,
46 }
47 reply, err := c.sum(c.Context, request)
48 if err != nil {
49 c.Logger.Log("err", err) // Without an error return parameter, we can't do anything else...
50 return 0
51 }
52
53 r := reply.(server.SumResponse)
54 return r.V
55 }
56
57 func (c client) Concat(a, b string) string {
58 request := server.ConcatRequest{
59 A: a,
60 B: b,
61 }
62 reply, err := c.concat(c.Context, request)
63 if err != nil {
64 c.Logger.Log("err", err) // Without an error return parameter, we can't do anything else...
65 return ""
66 }
67
68 r := reply.(server.ConcatResponse)
69 return r.V
70 }
0 // Package grpc provides a gRPC client for the add service.
1 package grpc
2
3 import (
4 "time"
5
6 jujuratelimit "github.com/juju/ratelimit"
7 stdopentracing "github.com/opentracing/opentracing-go"
8 "github.com/sony/gobreaker"
9 "google.golang.org/grpc"
10
11 "github.com/go-kit/kit/circuitbreaker"
12 "github.com/go-kit/kit/endpoint"
13 "github.com/go-kit/kit/examples/addsvc"
14 "github.com/go-kit/kit/examples/addsvc/pb"
15 "github.com/go-kit/kit/log"
16 "github.com/go-kit/kit/ratelimit"
17 "github.com/go-kit/kit/tracing/opentracing"
18 grpctransport "github.com/go-kit/kit/transport/grpc"
19 )
20
21 // New returns an AddService backed by a gRPC client connection. It is the
22 // responsibility of the caller to dial, and later close, the connection.
23 func New(conn *grpc.ClientConn, tracer stdopentracing.Tracer, logger log.Logger) addsvc.Service {
24 // We construct a single ratelimiter middleware, to limit the total outgoing
25 // QPS from this client to all methods on the remote instance. We also
26 // construct per-endpoint circuitbreaker middlewares to demonstrate how
27 // that's done, although they could easily be combined into a single breaker
28 // for the entire remote instance, too.
29
30 limiter := ratelimit.NewTokenBucketLimiter(jujuratelimit.NewBucketWithRate(100, 100))
31
32 var sumEndpoint endpoint.Endpoint
33 {
34 sumEndpoint = grpctransport.NewClient(
35 conn,
36 "Add",
37 "Sum",
38 addsvc.EncodeGRPCSumRequest,
39 addsvc.DecodeGRPCSumResponse,
40 pb.SumReply{},
41 grpctransport.SetClientBefore(opentracing.FromGRPCRequest(tracer, "Sum", logger)),
42 ).Endpoint()
43 sumEndpoint = opentracing.TraceClient(tracer, "Sum")(sumEndpoint)
44 sumEndpoint = limiter(sumEndpoint)
45 sumEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{
46 Name: "Sum",
47 Timeout: 30 * time.Second,
48 }))(sumEndpoint)
49 }
50
51 var concatEndpoint endpoint.Endpoint
52 {
53 concatEndpoint = grpctransport.NewClient(
54 conn,
55 "Add",
56 "Concat",
57 addsvc.EncodeGRPCConcatRequest,
58 addsvc.DecodeGRPCConcatResponse,
59 pb.ConcatReply{},
60 grpctransport.SetClientBefore(opentracing.FromGRPCRequest(tracer, "Concat", logger)),
61 ).Endpoint()
62 concatEndpoint = opentracing.TraceClient(tracer, "Concat")(concatEndpoint)
63 concatEndpoint = limiter(concatEndpoint)
64 sumEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{
65 Name: "Concat",
66 Timeout: 30 * time.Second,
67 }))(sumEndpoint)
68 }
69
70 return addsvc.Endpoints{
71 SumEndpoint: sumEndpoint,
72 ConcatEndpoint: concatEndpoint,
73 }
74 }
+0
-38
examples/addsvc/client/grpc/encode_decode.go less more
0 package grpc
1
2 import (
3 "golang.org/x/net/context"
4
5 "github.com/go-kit/kit/examples/addsvc/pb"
6 "github.com/go-kit/kit/examples/addsvc/server"
7 )
8
9 func encodeSumRequest(ctx context.Context, request interface{}) (interface{}, error) {
10 req := request.(server.SumRequest)
11 return &pb.SumRequest{
12 A: int64(req.A),
13 B: int64(req.B),
14 }, nil
15 }
16
17 func encodeConcatRequest(ctx context.Context, request interface{}) (interface{}, error) {
18 req := request.(server.ConcatRequest)
19 return &pb.ConcatRequest{
20 A: req.A,
21 B: req.B,
22 }, nil
23 }
24
25 func decodeSumResponse(ctx context.Context, response interface{}) (interface{}, error) {
26 resp := response.(*pb.SumReply)
27 return server.SumResponse{
28 V: int(resp.V),
29 }, nil
30 }
31
32 func decodeConcatResponse(ctx context.Context, response interface{}) (interface{}, error) {
33 resp := response.(*pb.ConcatReply)
34 return server.ConcatResponse{
35 V: resp.V,
36 }, nil
37 }
+0
-51
examples/addsvc/client/grpc/factory.go less more
0 package grpc
1
2 import (
3 "io"
4
5 kitot "github.com/go-kit/kit/tracing/opentracing"
6 "github.com/opentracing/opentracing-go"
7 "google.golang.org/grpc"
8
9 "github.com/go-kit/kit/endpoint"
10 "github.com/go-kit/kit/examples/addsvc/pb"
11 "github.com/go-kit/kit/loadbalancer"
12 "github.com/go-kit/kit/log"
13 grpctransport "github.com/go-kit/kit/transport/grpc"
14 )
15
16 // MakeSumEndpointFactory returns a loadbalancer.Factory that transforms GRPC
17 // host:port strings into Endpoints that call the Sum method on a GRPC server
18 // at that address.
19 func MakeSumEndpointFactory(tracer opentracing.Tracer, tracingLogger log.Logger) loadbalancer.Factory {
20 return func(instance string) (endpoint.Endpoint, io.Closer, error) {
21 cc, err := grpc.Dial(instance, grpc.WithInsecure())
22 return grpctransport.NewClient(
23 cc,
24 "Add",
25 "Sum",
26 encodeSumRequest,
27 decodeSumResponse,
28 pb.SumReply{},
29 grpctransport.SetClientBefore(kitot.ToGRPCRequest(tracer, tracingLogger)),
30 ).Endpoint(), cc, err
31 }
32 }
33
34 // MakeConcatEndpointFactory returns a loadbalancer.Factory that transforms
35 // GRPC host:port strings into Endpoints that call the Concat method on a GRPC
36 // server at that address.
37 func MakeConcatEndpointFactory(tracer opentracing.Tracer, tracingLogger log.Logger) loadbalancer.Factory {
38 return func(instance string) (endpoint.Endpoint, io.Closer, error) {
39 cc, err := grpc.Dial(instance, grpc.WithInsecure())
40 return grpctransport.NewClient(
41 cc,
42 "Add",
43 "Concat",
44 encodeConcatRequest,
45 decodeConcatResponse,
46 pb.ConcatReply{},
47 grpctransport.SetClientBefore(kitot.ToGRPCRequest(tracer, tracingLogger)),
48 ).Endpoint(), cc, err
49 }
50 }
0 // Package http provides an HTTP client for the add service.
1 package http
2
3 import (
4 "net/url"
5 "strings"
6 "time"
7
8 jujuratelimit "github.com/juju/ratelimit"
9 stdopentracing "github.com/opentracing/opentracing-go"
10 "github.com/sony/gobreaker"
11
12 "github.com/go-kit/kit/circuitbreaker"
13 "github.com/go-kit/kit/endpoint"
14 "github.com/go-kit/kit/examples/addsvc"
15 "github.com/go-kit/kit/log"
16 "github.com/go-kit/kit/ratelimit"
17 "github.com/go-kit/kit/tracing/opentracing"
18 httptransport "github.com/go-kit/kit/transport/http"
19 )
20
21 // New returns an AddService backed by an HTTP server living at the remote
22 // instance. We expect instance to come from a service discovery system, so
23 // likely of the form "host:port".
24 func New(instance string, tracer stdopentracing.Tracer, logger log.Logger) (addsvc.Service, error) {
25 if !strings.HasPrefix(instance, "http") {
26 instance = "http://" + instance
27 }
28 u, err := url.Parse(instance)
29 if err != nil {
30 return nil, err
31 }
32
33 // We construct a single ratelimiter middleware, to limit the total outgoing
34 // QPS from this client to all methods on the remote instance. We also
35 // construct per-endpoint circuitbreaker middlewares to demonstrate how
36 // that's done, although they could easily be combined into a single breaker
37 // for the entire remote instance, too.
38
39 limiter := ratelimit.NewTokenBucketLimiter(jujuratelimit.NewBucketWithRate(100, 100))
40
41 var sumEndpoint endpoint.Endpoint
42 {
43 sumEndpoint = httptransport.NewClient(
44 "POST",
45 copyURL(u, "/sum"),
46 addsvc.EncodeHTTPGenericRequest,
47 addsvc.DecodeHTTPSumResponse,
48 httptransport.SetClientBefore(opentracing.FromHTTPRequest(tracer, "Sum", logger)),
49 ).Endpoint()
50 sumEndpoint = opentracing.TraceClient(tracer, "Sum")(sumEndpoint)
51 sumEndpoint = limiter(sumEndpoint)
52 sumEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{
53 Name: "Sum",
54 Timeout: 30 * time.Second,
55 }))(sumEndpoint)
56 }
57
58 var concatEndpoint endpoint.Endpoint
59 {
60 concatEndpoint = httptransport.NewClient(
61 "POST",
62 copyURL(u, "/concat"),
63 addsvc.EncodeHTTPGenericRequest,
64 addsvc.DecodeHTTPConcatResponse,
65 httptransport.SetClientBefore(opentracing.FromHTTPRequest(tracer, "Concat", logger)),
66 ).Endpoint()
67 concatEndpoint = opentracing.TraceClient(tracer, "Concat")(concatEndpoint)
68 concatEndpoint = limiter(concatEndpoint)
69 sumEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{
70 Name: "Concat",
71 Timeout: 30 * time.Second,
72 }))(sumEndpoint)
73 }
74
75 return addsvc.Endpoints{
76 SumEndpoint: sumEndpoint,
77 ConcatEndpoint: concatEndpoint,
78 }, nil
79 }
80
81 func copyURL(base *url.URL, path string) *url.URL {
82 next := *base
83 next.Path = path
84 return &next
85 }
+0
-65
examples/addsvc/client/httpjson/factory.go less more
0 package httpjson
1
2 import (
3 "io"
4 "net/url"
5
6 "github.com/opentracing/opentracing-go"
7
8 "github.com/go-kit/kit/endpoint"
9 "github.com/go-kit/kit/examples/addsvc/server"
10 "github.com/go-kit/kit/loadbalancer"
11 "github.com/go-kit/kit/log"
12 kitot "github.com/go-kit/kit/tracing/opentracing"
13 httptransport "github.com/go-kit/kit/transport/http"
14 )
15
16 // MakeSumEndpointFactory generates a Factory that transforms an http url into
17 // an Endpoint.
18 //
19 // The path of the url is reset to /sum.
20 func MakeSumEndpointFactory(tracer opentracing.Tracer, tracingLogger log.Logger) loadbalancer.Factory {
21 return func(instance string) (endpoint.Endpoint, io.Closer, error) {
22 sumURL, err := url.Parse(instance)
23 if err != nil {
24 return nil, nil, err
25 }
26 sumURL.Path = "/sum"
27
28 client := httptransport.NewClient(
29 "GET",
30 sumURL,
31 server.EncodeSumRequest,
32 server.DecodeSumResponse,
33 httptransport.SetClient(nil),
34 httptransport.SetClientBefore(kitot.ToHTTPRequest(tracer, tracingLogger)),
35 )
36
37 return client.Endpoint(), nil, nil
38 }
39 }
40
41 // MakeConcatEndpointFactory generates a Factory that transforms an http url
42 // into an Endpoint.
43 //
44 // The path of the url is reset to /concat.
45 func MakeConcatEndpointFactory(tracer opentracing.Tracer, tracingLogger log.Logger) loadbalancer.Factory {
46 return func(instance string) (endpoint.Endpoint, io.Closer, error) {
47 concatURL, err := url.Parse(instance)
48 if err != nil {
49 return nil, nil, err
50 }
51 concatURL.Path = "/concat"
52
53 client := httptransport.NewClient(
54 "GET",
55 concatURL,
56 server.EncodeConcatRequest,
57 server.DecodeConcatResponse,
58 httptransport.SetClient(nil),
59 httptransport.SetClientBefore(kitot.ToHTTPRequest(tracer, tracingLogger)),
60 )
61
62 return client.Endpoint(), nil, nil
63 }
64 }
+0
-167
examples/addsvc/client/main.go less more
0 package main
1
2 import (
3 "flag"
4 "fmt"
5 "os"
6 "path/filepath"
7 "strconv"
8 "strings"
9 "time"
10
11 "github.com/lightstep/lightstep-tracer-go"
12 "github.com/opentracing/opentracing-go"
13 zipkin "github.com/openzipkin/zipkin-go-opentracing"
14 appdashot "github.com/sourcegraph/appdash/opentracing"
15 "golang.org/x/net/context"
16 "sourcegraph.com/sourcegraph/appdash"
17
18 "github.com/go-kit/kit/endpoint"
19 grpcclient "github.com/go-kit/kit/examples/addsvc/client/grpc"
20 httpjsonclient "github.com/go-kit/kit/examples/addsvc/client/httpjson"
21 netrpcclient "github.com/go-kit/kit/examples/addsvc/client/netrpc"
22 thriftclient "github.com/go-kit/kit/examples/addsvc/client/thrift"
23 "github.com/go-kit/kit/loadbalancer"
24 "github.com/go-kit/kit/loadbalancer/static"
25 "github.com/go-kit/kit/log"
26 kitot "github.com/go-kit/kit/tracing/opentracing"
27 )
28
29 func main() {
30 var (
31 transport = flag.String("transport", "httpjson", "httpjson, grpc, netrpc, thrift")
32 httpAddrs = flag.String("http.addrs", "localhost:8001", "Comma-separated list of addresses for HTTP (JSON) servers")
33 grpcAddrs = flag.String("grpc.addrs", "localhost:8002", "Comma-separated list of addresses for gRPC servers")
34 netrpcAddrs = flag.String("netrpc.addrs", "localhost:8003", "Comma-separated list of addresses for net/rpc servers")
35 thriftAddrs = flag.String("thrift.addrs", "localhost:8004", "Comma-separated list of addresses for Thrift servers")
36 thriftProtocol = flag.String("thrift.protocol", "binary", "binary, compact, json, simplejson")
37 thriftBufferSize = flag.Int("thrift.buffer.size", 0, "0 for unbuffered")
38 thriftFramed = flag.Bool("thrift.framed", false, "true to enable framing")
39
40 // Three OpenTracing backends (to demonstrate how they can be interchanged):
41 zipkinAddr = flag.String("zipkin.kafka.addr", "", "Enable Zipkin tracing via a Kafka Collector host:port")
42 appdashAddr = flag.String("appdash.addr", "", "Enable Appdash tracing via an Appdash server host:port")
43 lightstepAccessToken = flag.String("lightstep.token", "", "Enable LightStep tracing via a LightStep access token")
44 )
45 flag.Parse()
46 if len(os.Args) < 4 {
47 fmt.Fprintf(os.Stderr, "\n%s [flags] method arg1 arg2\n\n", filepath.Base(os.Args[0]))
48 flag.Usage()
49 os.Exit(1)
50 }
51
52 randomSeed := time.Now().UnixNano()
53
54 root := context.Background()
55 method, s1, s2 := flag.Arg(0), flag.Arg(1), flag.Arg(2)
56
57 var logger log.Logger
58 logger = log.NewLogfmtLogger(os.Stdout)
59 logger = log.NewContext(logger).With("caller", log.DefaultCaller)
60 logger = log.NewContext(logger).With("transport", *transport)
61 tracingLogger := log.NewContext(logger).With("component", "tracing")
62
63 // Set up OpenTracing
64 var tracer opentracing.Tracer
65 {
66 switch {
67 case *appdashAddr != "" && *lightstepAccessToken == "" && *zipkinAddr == "":
68 tracer = appdashot.NewTracer(appdash.NewRemoteCollector(*appdashAddr))
69 case *appdashAddr == "" && *lightstepAccessToken != "" && *zipkinAddr == "":
70 tracer = lightstep.NewTracer(lightstep.Options{
71 AccessToken: *lightstepAccessToken,
72 })
73 defer lightstep.FlushLightStepTracer(tracer)
74 case *appdashAddr == "" && *lightstepAccessToken == "" && *zipkinAddr != "":
75 collector, err := zipkin.NewKafkaCollector(
76 strings.Split(*zipkinAddr, ","),
77 zipkin.KafkaLogger(tracingLogger),
78 )
79 if err != nil {
80 tracingLogger.Log("err", "unable to create kafka collector", "fatal", err)
81 os.Exit(1)
82 }
83 tracer, err = zipkin.NewTracer(
84 zipkin.NewRecorder(collector, false, "localhost:8000", "addsvc-client"),
85 )
86 if err != nil {
87 tracingLogger.Log("err", "unable to create zipkin tracer", "fatal", err)
88 os.Exit(1)
89 }
90 case *appdashAddr == "" && *lightstepAccessToken == "" && *zipkinAddr == "":
91 tracer = opentracing.GlobalTracer() // no-op
92 default:
93 tracingLogger.Log("fatal", "specify a single -appdash.addr, -lightstep.access.token or -zipkin.kafka.addr")
94 os.Exit(1)
95 }
96 }
97
98 var (
99 instances []string
100 sumFactory, concatFactory loadbalancer.Factory
101 )
102
103 switch *transport {
104 case "grpc":
105 instances = strings.Split(*grpcAddrs, ",")
106 sumFactory = grpcclient.MakeSumEndpointFactory(tracer, tracingLogger)
107 concatFactory = grpcclient.MakeConcatEndpointFactory(tracer, tracingLogger)
108
109 case "httpjson":
110 instances = strings.Split(*httpAddrs, ",")
111 for i, rawurl := range instances {
112 if !strings.HasPrefix("http", rawurl) {
113 instances[i] = "http://" + rawurl
114 }
115 }
116 sumFactory = httpjsonclient.MakeSumEndpointFactory(tracer, tracingLogger)
117 concatFactory = httpjsonclient.MakeConcatEndpointFactory(tracer, tracingLogger)
118
119 case "netrpc":
120 instances = strings.Split(*netrpcAddrs, ",")
121 sumFactory = netrpcclient.SumEndpointFactory
122 concatFactory = netrpcclient.ConcatEndpointFactory
123
124 case "thrift":
125 instances = strings.Split(*thriftAddrs, ",")
126 thriftClient := thriftclient.New(*thriftProtocol, *thriftBufferSize, *thriftFramed, logger)
127 sumFactory = thriftClient.SumEndpoint
128 concatFactory = thriftClient.ConcatEndpoint
129
130 default:
131 logger.Log("err", "invalid transport")
132 os.Exit(1)
133 }
134
135 sum := buildEndpoint(tracer, "sum", instances, sumFactory, randomSeed, logger)
136 concat := buildEndpoint(tracer, "concat", instances, concatFactory, randomSeed, logger)
137
138 svc := newClient(root, sum, concat, logger)
139
140 begin := time.Now()
141 switch method {
142 case "sum":
143 a, _ := strconv.Atoi(s1)
144 b, _ := strconv.Atoi(s2)
145 v := svc.Sum(a, b)
146 logger.Log("method", "sum", "a", a, "b", b, "v", v, "took", time.Since(begin))
147
148 case "concat":
149 a, b := s1, s2
150 v := svc.Concat(a, b)
151 logger.Log("method", "concat", "a", a, "b", b, "v", v, "took", time.Since(begin))
152
153 default:
154 logger.Log("err", "invalid method "+method)
155 os.Exit(1)
156 }
157 // wait for collector
158 time.Sleep(2 * time.Second)
159 }
160
161 func buildEndpoint(tracer opentracing.Tracer, operationName string, instances []string, factory loadbalancer.Factory, seed int64, logger log.Logger) endpoint.Endpoint {
162 publisher := static.NewPublisher(instances, factory, logger)
163 random := loadbalancer.NewRandom(publisher, seed)
164 endpoint := loadbalancer.Retry(10, 10*time.Second, random)
165 return kitot.TraceClient(tracer, operationName)(endpoint)
166 }
+0
-43
examples/addsvc/client/netrpc/factory.go less more
0 package netrpc
1
2 import (
3 "io"
4 "net/rpc"
5
6 "golang.org/x/net/context"
7
8 "github.com/go-kit/kit/endpoint"
9 "github.com/go-kit/kit/examples/addsvc/server"
10 )
11
12 // SumEndpointFactory transforms host:port strings into Endpoints.
13 func SumEndpointFactory(instance string) (endpoint.Endpoint, io.Closer, error) {
14 client, err := rpc.DialHTTP("tcp", instance)
15 if err != nil {
16 return nil, nil, err
17 }
18
19 return func(ctx context.Context, request interface{}) (interface{}, error) {
20 var reply server.SumResponse
21 if err := client.Call("addsvc.Sum", request.(server.SumRequest), &reply); err != nil {
22 return server.SumResponse{}, err
23 }
24 return reply, nil
25 }, client, nil
26 }
27
28 // ConcatEndpointFactory transforms host:port strings into Endpoints.
29 func ConcatEndpointFactory(instance string) (endpoint.Endpoint, io.Closer, error) {
30 client, err := rpc.DialHTTP("tcp", instance)
31 if err != nil {
32 return nil, nil, err
33 }
34
35 return func(ctx context.Context, request interface{}) (interface{}, error) {
36 var reply server.ConcatResponse
37 if err := client.Call("addsvc.Concat", request.(server.ConcatRequest), &reply); err != nil {
38 return server.ConcatResponse{}, err
39 }
40 return reply, nil
41 }, client, nil
42 }
0 // Package thrift provides a Thrift client for the add service.
01 package thrift
12
23 import (
3 "io"
4 "time"
45
5 "github.com/apache/thrift/lib/go/thrift"
6 jujuratelimit "github.com/juju/ratelimit"
7 "github.com/sony/gobreaker"
8
9 "github.com/go-kit/kit/circuitbreaker"
610 "github.com/go-kit/kit/endpoint"
7 "github.com/go-kit/kit/examples/addsvc/server"
8 thriftadd "github.com/go-kit/kit/examples/addsvc/thrift/gen-go/add"
9 "github.com/go-kit/kit/log"
10 "golang.org/x/net/context"
11 "github.com/go-kit/kit/examples/addsvc"
12 thriftadd "github.com/go-kit/kit/examples/addsvc/thrift/gen-go/addsvc"
13 "github.com/go-kit/kit/ratelimit"
1114 )
1215
13 // New returns a stateful factory for Sum and Concat Endpoints
14 func New(protocol string, bufferSize int, framed bool, logger log.Logger) client {
15 var protocolFactory thrift.TProtocolFactory
16 switch protocol {
17 case "compact":
18 protocolFactory = thrift.NewTCompactProtocolFactory()
19 case "simplejson":
20 protocolFactory = thrift.NewTSimpleJSONProtocolFactory()
21 case "json":
22 protocolFactory = thrift.NewTJSONProtocolFactory()
23 case "binary", "":
24 protocolFactory = thrift.NewTBinaryProtocolFactoryDefault()
25 default:
26 panic("invalid protocol")
16 // New returns an AddService backed by a Thrift server described by the provided
17 // client. The caller is responsible for constructing the client, and eventually
18 // closing the underlying transport.
19 func New(client *thriftadd.AddServiceClient) addsvc.Service {
20 // We construct a single ratelimiter middleware, to limit the total outgoing
21 // QPS from this client to all methods on the remote instance. We also
22 // construct per-endpoint circuitbreaker middlewares to demonstrate how
23 // that's done, although they could easily be combined into a single breaker
24 // for the entire remote instance, too.
25
26 limiter := ratelimit.NewTokenBucketLimiter(jujuratelimit.NewBucketWithRate(100, 100))
27
28 // Thrift does not currently have tracer bindings, so we skip tracing.
29
30 var sumEndpoint endpoint.Endpoint
31 {
32 sumEndpoint = addsvc.MakeThriftSumEndpoint(client)
33 sumEndpoint = limiter(sumEndpoint)
34 sumEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{
35 Name: "Sum",
36 Timeout: 30 * time.Second,
37 }))(sumEndpoint)
2738 }
2839
29 var transportFactory thrift.TTransportFactory
30 if bufferSize > 0 {
31 transportFactory = thrift.NewTBufferedTransportFactory(bufferSize)
32 } else {
33 transportFactory = thrift.NewTTransportFactory()
34 }
35 if framed {
36 transportFactory = thrift.NewTFramedTransportFactory(transportFactory)
40 var concatEndpoint endpoint.Endpoint
41 {
42 concatEndpoint = addsvc.MakeThriftConcatEndpoint(client)
43 concatEndpoint = limiter(concatEndpoint)
44 sumEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{
45 Name: "Concat",
46 Timeout: 30 * time.Second,
47 }))(sumEndpoint)
3748 }
3849
39 return client{transportFactory, protocolFactory, logger}
50 return addsvc.Endpoints{
51 SumEndpoint: addsvc.MakeThriftSumEndpoint(client),
52 ConcatEndpoint: addsvc.MakeThriftConcatEndpoint(client),
53 }
4054 }
41
42 type client struct {
43 thrift.TTransportFactory
44 thrift.TProtocolFactory
45 log.Logger
46 }
47
48 // SumEndpointFactory transforms host:port strings into Endpoints.
49 func (c client) SumEndpoint(instance string) (endpoint.Endpoint, io.Closer, error) {
50 transportSocket, err := thrift.NewTSocket(instance)
51 if err != nil {
52 c.Logger.Log("during", "thrift.NewTSocket", "err", err)
53 return nil, nil, err
54 }
55 trans := c.TTransportFactory.GetTransport(transportSocket)
56
57 if err := trans.Open(); err != nil {
58 c.Logger.Log("during", "thrift transport.Open", "err", err)
59 return nil, nil, err
60 }
61 cli := thriftadd.NewAddServiceClientFactory(trans, c.TProtocolFactory)
62
63 return func(ctx context.Context, request interface{}) (interface{}, error) {
64 sumRequest := request.(server.SumRequest)
65 reply, err := cli.Sum(int64(sumRequest.A), int64(sumRequest.B))
66 if err != nil {
67 return server.SumResponse{}, err
68 }
69 return server.SumResponse{V: int(reply.Value)}, nil
70 }, trans, nil
71 }
72
73 // ConcatEndpointFactory transforms host:port strings into Endpoints.
74 func (c client) ConcatEndpoint(instance string) (endpoint.Endpoint, io.Closer, error) {
75 transportSocket, err := thrift.NewTSocket(instance)
76 if err != nil {
77 c.Logger.Log("during", "thrift.NewTSocket", "err", err)
78 return nil, nil, err
79 }
80 trans := c.TTransportFactory.GetTransport(transportSocket)
81
82 if err := trans.Open(); err != nil {
83 c.Logger.Log("during", "thrift transport.Open", "err", err)
84 return nil, nil, err
85 }
86 cli := thriftadd.NewAddServiceClientFactory(trans, c.TProtocolFactory)
87
88 return func(ctx context.Context, request interface{}) (interface{}, error) {
89 concatRequest := request.(server.ConcatRequest)
90 reply, err := cli.Concat(concatRequest.A, concatRequest.B)
91 if err != nil {
92 return server.ConcatResponse{}, err
93 }
94 return server.ConcatResponse{V: reply.Value}, nil
95 }, trans, nil
96 }
0 package main
1
2 import (
3 "flag"
4 "fmt"
5 "os"
6 "strconv"
7 "strings"
8 "time"
9
10 "github.com/apache/thrift/lib/go/thrift"
11 "github.com/lightstep/lightstep-tracer-go"
12 stdopentracing "github.com/opentracing/opentracing-go"
13 zipkin "github.com/openzipkin/zipkin-go-opentracing"
14 appdashot "github.com/sourcegraph/appdash/opentracing"
15 "golang.org/x/net/context"
16 "google.golang.org/grpc"
17 "sourcegraph.com/sourcegraph/appdash"
18
19 "github.com/go-kit/kit/examples/addsvc"
20 grpcclient "github.com/go-kit/kit/examples/addsvc/client/grpc"
21 httpclient "github.com/go-kit/kit/examples/addsvc/client/http"
22 thriftclient "github.com/go-kit/kit/examples/addsvc/client/thrift"
23 thriftadd "github.com/go-kit/kit/examples/addsvc/thrift/gen-go/addsvc"
24 "github.com/go-kit/kit/log"
25 )
26
27 func main() {
28 // The addcli presumes no service discovery system, and expects users to
29 // provide the direct address of an addsvc. This presumption is reflected in
30 // the addcli binary and the the client packages: the -transport.addr flags
31 // and various client constructors both expect host:port strings. For an
32 // example service with a client built on top of a service discovery system,
33 // see profilesvc.
34
35 var (
36 httpAddr = flag.String("http.addr", "", "HTTP address of addsvc")
37 grpcAddr = flag.String("grpc.addr", "", "gRPC (HTTP) address of addsvc")
38 thriftAddr = flag.String("thrift.addr", "", "Thrift address of addsvc")
39 thriftProtocol = flag.String("thrift.protocol", "binary", "binary, compact, json, simplejson")
40 thriftBufferSize = flag.Int("thrift.buffer.size", 0, "0 for unbuffered")
41 thriftFramed = flag.Bool("thrift.framed", false, "true to enable framing")
42 zipkinAddr = flag.String("zipkin.addr", "", "Enable Zipkin tracing via a Kafka Collector host:port")
43 appdashAddr = flag.String("appdash.addr", "", "Enable Appdash tracing via an Appdash server host:port")
44 lightstepToken = flag.String("lightstep.token", "", "Enable LightStep tracing via a LightStep access token")
45 method = flag.String("method", "sum", "sum, concat")
46 )
47 flag.Parse()
48
49 if len(flag.Args()) != 2 {
50 fmt.Fprintf(os.Stderr, "usage: addcli [flags] <a> <b>\n")
51 os.Exit(1)
52 }
53
54 // This is a demonstration client, which supports multiple tracers.
55 // Your clients will probably just use one tracer.
56 var tracer stdopentracing.Tracer
57 {
58 if *zipkinAddr != "" {
59 collector, err := zipkin.NewKafkaCollector(
60 strings.Split(*zipkinAddr, ","),
61 zipkin.KafkaLogger(log.NewNopLogger()),
62 )
63 if err != nil {
64 fmt.Fprintf(os.Stderr, "%v\n", err)
65 os.Exit(1)
66 }
67 tracer, err = zipkin.NewTracer(
68 zipkin.NewRecorder(collector, false, "localhost:8000", "addcli"),
69 )
70 if err != nil {
71 fmt.Fprintf(os.Stderr, "%v\n", err)
72 os.Exit(1)
73 }
74 } else if *appdashAddr != "" {
75 tracer = appdashot.NewTracer(appdash.NewRemoteCollector(*appdashAddr))
76 } else if *lightstepToken != "" {
77 tracer = lightstep.NewTracer(lightstep.Options{
78 AccessToken: *lightstepToken,
79 })
80 defer lightstep.FlushLightStepTracer(tracer)
81 } else {
82 tracer = stdopentracing.GlobalTracer() // no-op
83 }
84 }
85
86 // This is a demonstration client, which supports multiple transports.
87 // Your clients will probably just define and stick with 1 transport.
88
89 var (
90 service addsvc.Service
91 err error
92 )
93 if *httpAddr != "" {
94 service, err = httpclient.New(*httpAddr, tracer, log.NewNopLogger())
95 } else if *grpcAddr != "" {
96 conn, err := grpc.Dial(*grpcAddr, grpc.WithInsecure(), grpc.WithTimeout(time.Second))
97 if err != nil {
98 fmt.Fprintf(os.Stderr, "error: %v", err)
99 os.Exit(1)
100 }
101 defer conn.Close()
102 service = grpcclient.New(conn, tracer, log.NewNopLogger())
103 } else if *thriftAddr != "" {
104 // It's necessary to do all of this construction in the func main,
105 // because (among other reasons) we need to control the lifecycle of the
106 // Thrift transport, i.e. close it eventually.
107 var protocolFactory thrift.TProtocolFactory
108 switch *thriftProtocol {
109 case "compact":
110 protocolFactory = thrift.NewTCompactProtocolFactory()
111 case "simplejson":
112 protocolFactory = thrift.NewTSimpleJSONProtocolFactory()
113 case "json":
114 protocolFactory = thrift.NewTJSONProtocolFactory()
115 case "binary", "":
116 protocolFactory = thrift.NewTBinaryProtocolFactoryDefault()
117 default:
118 fmt.Fprintf(os.Stderr, "error: invalid protocol %q\n", *thriftProtocol)
119 os.Exit(1)
120 }
121 var transportFactory thrift.TTransportFactory
122 if *thriftBufferSize > 0 {
123 transportFactory = thrift.NewTBufferedTransportFactory(*thriftBufferSize)
124 } else {
125 transportFactory = thrift.NewTTransportFactory()
126 }
127 if *thriftFramed {
128 transportFactory = thrift.NewTFramedTransportFactory(transportFactory)
129 }
130 transportSocket, err := thrift.NewTSocket(*thriftAddr)
131 if err != nil {
132 fmt.Fprintf(os.Stderr, "error: %v\n", err)
133 os.Exit(1)
134 }
135 transport := transportFactory.GetTransport(transportSocket)
136 if err := transport.Open(); err != nil {
137 fmt.Fprintf(os.Stderr, "error: %v\n", err)
138 os.Exit(1)
139 }
140 defer transport.Close()
141 client := thriftadd.NewAddServiceClientFactory(transport, protocolFactory)
142 service = thriftclient.New(client)
143 } else {
144 fmt.Fprintf(os.Stderr, "error: no remote address specified\n")
145 os.Exit(1)
146 }
147 if err != nil {
148 fmt.Fprintf(os.Stderr, "error: %v\n", err)
149 os.Exit(1)
150 }
151
152 switch *method {
153 case "sum":
154 a, _ := strconv.ParseInt(flag.Args()[0], 10, 64)
155 b, _ := strconv.ParseInt(flag.Args()[1], 10, 64)
156 v, err := service.Sum(context.Background(), int(a), int(b))
157 if err != nil {
158 fmt.Fprintf(os.Stderr, "error: %v\n", err)
159 os.Exit(1)
160 }
161 fmt.Fprintf(os.Stdout, "%d + %d = %d\n", a, b, v)
162
163 case "concat":
164 a := flag.Args()[0]
165 b := flag.Args()[1]
166 v, err := service.Concat(context.Background(), a, b)
167 if err != nil {
168 fmt.Fprintf(os.Stderr, "error: %v\n", err)
169 os.Exit(1)
170 }
171 fmt.Fprintf(os.Stdout, "%q + %q = %q\n", a, b, v)
172
173 default:
174 fmt.Fprintf(os.Stderr, "error: invalid method %q\n", method)
175 os.Exit(1)
176 }
177 }
0 package main
1
2 import (
3 "flag"
4 "fmt"
5 "net"
6 "net/http"
7 "net/http/pprof"
8 "os"
9 "os/signal"
10 "strings"
11 "syscall"
12 "time"
13
14 "github.com/apache/thrift/lib/go/thrift"
15 lightstep "github.com/lightstep/lightstep-tracer-go"
16 stdopentracing "github.com/opentracing/opentracing-go"
17 zipkin "github.com/openzipkin/zipkin-go-opentracing"
18 stdprometheus "github.com/prometheus/client_golang/prometheus"
19 appdashot "github.com/sourcegraph/appdash/opentracing"
20 "golang.org/x/net/context"
21 "google.golang.org/grpc"
22 "sourcegraph.com/sourcegraph/appdash"
23
24 "github.com/go-kit/kit/endpoint"
25 "github.com/go-kit/kit/examples/addsvc"
26 "github.com/go-kit/kit/examples/addsvc/pb"
27 thriftadd "github.com/go-kit/kit/examples/addsvc/thrift/gen-go/addsvc"
28 "github.com/go-kit/kit/log"
29 "github.com/go-kit/kit/metrics"
30 "github.com/go-kit/kit/metrics/prometheus"
31 "github.com/go-kit/kit/tracing/opentracing"
32 )
33
34 func main() {
35 var (
36 debugAddr = flag.String("debug.addr", ":8080", "Debug and metrics listen address")
37 httpAddr = flag.String("http.addr", ":8081", "HTTP listen address")
38 grpcAddr = flag.String("grpc.addr", ":8082", "gRPC (HTTP) listen address")
39 thriftAddr = flag.String("thrift.addr", ":8083", "Thrift listen address")
40 thriftProtocol = flag.String("thrift.protocol", "binary", "binary, compact, json, simplejson")
41 thriftBufferSize = flag.Int("thrift.buffer.size", 0, "0 for unbuffered")
42 thriftFramed = flag.Bool("thrift.framed", false, "true to enable framing")
43 zipkinAddr = flag.String("zipkin.addr", "", "Enable Zipkin tracing via a Kafka server host:port")
44 appdashAddr = flag.String("appdash.addr", "", "Enable Appdash tracing via an Appdash server host:port")
45 lightstepToken = flag.String("lightstep.token", "", "Enable LightStep tracing via a LightStep access token")
46 )
47 flag.Parse()
48
49 // Logging domain
50 var logger log.Logger
51 {
52 logger = log.NewLogfmtLogger(os.Stdout)
53 logger = log.NewContext(logger).With("ts", log.DefaultTimestampUTC)
54 logger = log.NewContext(logger).With("caller", log.DefaultCaller)
55 }
56 logger.Log("msg", "hello")
57 defer logger.Log("msg", "goodbye")
58
59 // Metrics domain
60 var ints, chars metrics.Counter
61 {
62 // Business level metrics
63 ints = prometheus.NewCounter(stdprometheus.CounterOpts{
64 Namespace: "addsvc",
65 Name: "integers_summed",
66 Help: "Total count of integers summed via the Sum method.",
67 }, []string{})
68 chars = prometheus.NewCounter(stdprometheus.CounterOpts{
69 Namespace: "addsvc",
70 Name: "characters_concatenated",
71 Help: "Total count of characters concatenated via the Concat method.",
72 }, []string{})
73 }
74 var duration metrics.TimeHistogram
75 {
76 // Transport level metrics
77 duration = metrics.NewTimeHistogram(time.Nanosecond, prometheus.NewSummary(stdprometheus.SummaryOpts{
78 Namespace: "addsvc",
79 Name: "request_duration_ns",
80 Help: "Request duration in nanoseconds.",
81 }, []string{"method", "success"}))
82 }
83
84 // Tracing domain
85 var tracer stdopentracing.Tracer
86 {
87 if *zipkinAddr != "" {
88 logger := log.NewContext(logger).With("tracer", "Zipkin")
89 logger.Log("addr", *zipkinAddr)
90 collector, err := zipkin.NewKafkaCollector(
91 strings.Split(*zipkinAddr, ","),
92 zipkin.KafkaLogger(logger),
93 )
94 if err != nil {
95 logger.Log("err", err)
96 os.Exit(1)
97 }
98 tracer, err = zipkin.NewTracer(
99 zipkin.NewRecorder(collector, false, "localhost:80", "addsvc"),
100 )
101 if err != nil {
102 logger.Log("err", err)
103 os.Exit(1)
104 }
105 } else if *appdashAddr != "" {
106 logger := log.NewContext(logger).With("tracer", "Appdash")
107 logger.Log("addr", *appdashAddr)
108 tracer = appdashot.NewTracer(appdash.NewRemoteCollector(*appdashAddr))
109 } else if *lightstepToken != "" {
110 logger := log.NewContext(logger).With("tracer", "LightStep")
111 logger.Log() // probably don't want to print out the token :)
112 tracer = lightstep.NewTracer(lightstep.Options{
113 AccessToken: *lightstepToken,
114 })
115 defer lightstep.FlushLightStepTracer(tracer)
116 } else {
117 logger := log.NewContext(logger).With("tracer", "none")
118 logger.Log()
119 tracer = stdopentracing.GlobalTracer() // no-op
120 }
121 }
122
123 // Business domain
124 var service addsvc.Service
125 {
126 service = addsvc.NewBasicService()
127 service = addsvc.ServiceLoggingMiddleware(logger)(service)
128 service = addsvc.ServiceInstrumentingMiddleware(ints, chars)(service)
129 }
130
131 // Endpoint domain
132 var sumEndpoint endpoint.Endpoint
133 {
134 sumDuration := duration.With(metrics.Field{Key: "method", Value: "Sum"})
135 sumLogger := log.NewContext(logger).With("method", "Sum")
136
137 sumEndpoint = addsvc.MakeSumEndpoint(service)
138 sumEndpoint = opentracing.TraceServer(tracer, "Sum")(sumEndpoint)
139 sumEndpoint = addsvc.EndpointInstrumentingMiddleware(sumDuration)(sumEndpoint)
140 sumEndpoint = addsvc.EndpointLoggingMiddleware(sumLogger)(sumEndpoint)
141 }
142 var concatEndpoint endpoint.Endpoint
143 {
144 concatDuration := duration.With(metrics.Field{Key: "method", Value: "Concat"})
145 concatLogger := log.NewContext(logger).With("method", "Concat")
146
147 concatEndpoint = addsvc.MakeConcatEndpoint(service)
148 concatEndpoint = opentracing.TraceServer(tracer, "Concat")(concatEndpoint)
149 concatEndpoint = addsvc.EndpointInstrumentingMiddleware(concatDuration)(concatEndpoint)
150 concatEndpoint = addsvc.EndpointLoggingMiddleware(concatLogger)(concatEndpoint)
151 }
152 endpoints := addsvc.Endpoints{
153 SumEndpoint: sumEndpoint,
154 ConcatEndpoint: concatEndpoint,
155 }
156
157 // Mechanical domain
158 errc := make(chan error)
159 ctx := context.Background()
160
161 // Interrupt handler
162 go func() {
163 c := make(chan os.Signal, 1)
164 signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
165 errc <- fmt.Errorf("%s", <-c)
166 }()
167
168 // Debug listener
169 go func() {
170 logger := log.NewContext(logger).With("transport", "debug")
171
172 m := http.NewServeMux()
173 m.Handle("/debug/pprof/", http.HandlerFunc(pprof.Index))
174 m.Handle("/debug/pprof/cmdline", http.HandlerFunc(pprof.Cmdline))
175 m.Handle("/debug/pprof/profile", http.HandlerFunc(pprof.Profile))
176 m.Handle("/debug/pprof/symbol", http.HandlerFunc(pprof.Symbol))
177 m.Handle("/debug/pprof/trace", http.HandlerFunc(pprof.Trace))
178 m.Handle("/metrics", stdprometheus.Handler())
179
180 logger.Log("addr", *debugAddr)
181 errc <- http.ListenAndServe(*debugAddr, m)
182 }()
183
184 // HTTP transport
185 go func() {
186 logger := log.NewContext(logger).With("transport", "HTTP")
187 h := addsvc.MakeHTTPHandler(ctx, endpoints, tracer, logger)
188 logger.Log("addr", *httpAddr)
189 errc <- http.ListenAndServe(*httpAddr, h)
190 }()
191
192 // gRPC transport
193 go func() {
194 logger := log.NewContext(logger).With("transport", "gRPC")
195
196 ln, err := net.Listen("tcp", *grpcAddr)
197 if err != nil {
198 errc <- err
199 return
200 }
201
202 srv := addsvc.MakeGRPCServer(ctx, endpoints, tracer, logger)
203 s := grpc.NewServer()
204 pb.RegisterAddServer(s, srv)
205
206 logger.Log("addr", *grpcAddr)
207 errc <- s.Serve(ln)
208 }()
209
210 // Thrift transport
211 go func() {
212 logger := log.NewContext(logger).With("transport", "Thrift")
213
214 var protocolFactory thrift.TProtocolFactory
215 switch *thriftProtocol {
216 case "binary":
217 protocolFactory = thrift.NewTBinaryProtocolFactoryDefault()
218 case "compact":
219 protocolFactory = thrift.NewTCompactProtocolFactory()
220 case "json":
221 protocolFactory = thrift.NewTJSONProtocolFactory()
222 case "simplejson":
223 protocolFactory = thrift.NewTSimpleJSONProtocolFactory()
224 default:
225 errc <- fmt.Errorf("invalid Thrift protocol %q", *thriftProtocol)
226 return
227 }
228
229 var transportFactory thrift.TTransportFactory
230 if *thriftBufferSize > 0 {
231 transportFactory = thrift.NewTBufferedTransportFactory(*thriftBufferSize)
232 } else {
233 transportFactory = thrift.NewTTransportFactory()
234 }
235 if *thriftFramed {
236 transportFactory = thrift.NewTFramedTransportFactory(transportFactory)
237 }
238
239 transport, err := thrift.NewTServerSocket(*thriftAddr)
240 if err != nil {
241 errc <- err
242 return
243 }
244
245 logger.Log("addr", *thriftAddr)
246 errc <- thrift.NewTSimpleServer4(
247 thriftadd.NewAddServiceProcessor(addsvc.MakeThriftHandler(ctx, endpoints)),
248 transport,
249 transportFactory,
250 protocolFactory,
251 ).Serve()
252 }()
253
254 // Run
255 logger.Log("exit", <-errc)
256 }
0 // Package addsvc implements the business and transport logic for an example
1 // service that can sum integers and concatenate strings.
2 //
3 // A client library is available in the client subdirectory. A server binary is
4 // available in cmd/addsrv. An example client binary is available in cmd/addcli.
5 package addsvc
+0
-24
examples/addsvc/endpoint.go less more
0 package main
1
2 import (
3 "golang.org/x/net/context"
4
5 "github.com/go-kit/kit/endpoint"
6 "github.com/go-kit/kit/examples/addsvc/server"
7 )
8
9 func makeSumEndpoint(svc server.AddService) endpoint.Endpoint {
10 return func(ctx context.Context, request interface{}) (interface{}, error) {
11 req := request.(*server.SumRequest)
12 v := svc.Sum(req.A, req.B)
13 return server.SumResponse{V: v}, nil
14 }
15 }
16
17 func makeConcatEndpoint(svc server.AddService) endpoint.Endpoint {
18 return func(ctx context.Context, request interface{}) (interface{}, error) {
19 req := request.(*server.ConcatRequest)
20 v := svc.Concat(req.A, req.B)
21 return server.ConcatResponse{V: v}, nil
22 }
23 }
0 package addsvc
1
2 // This file contains methods to make individual endpoints from services,
3 // request and response types to serve those endpoints, as well as encoders and
4 // decoders for those types, for all of our supported transport serialization
5 // formats. It also includes endpoint middlewares.
6
7 import (
8 "fmt"
9 "time"
10
11 "golang.org/x/net/context"
12
13 "github.com/go-kit/kit/endpoint"
14 "github.com/go-kit/kit/log"
15 "github.com/go-kit/kit/metrics"
16 )
17
18 // Endpoints collects all of the endpoints that compose an add service. It's
19 // meant to be used as a helper struct, to collect all of the endpoints into a
20 // single parameter.
21 //
22 // In a server, it's useful for functions that need to operate on a per-endpoint
23 // basis. For example, you might pass an Endpoints to a function that produces
24 // an http.Handler, with each method (endpoint) wired up to a specific path. (It
25 // is probably a mistake in design to invoke the Service methods on the
26 // Endpoints struct in a server.)
27 //
28 // In a client, it's useful to collect individually constructed endpoints into a
29 // single type that implements the Service interface. For example, you might
30 // construct individual endpoints using transport/http.NewClient, combine them
31 // into an Endpoints, and return it to the caller as a Service.
32 type Endpoints struct {
33 SumEndpoint endpoint.Endpoint
34 ConcatEndpoint endpoint.Endpoint
35 }
36
37 // Sum implements Service. Primarily useful in a client.
38 func (e Endpoints) Sum(ctx context.Context, a, b int) (int, error) {
39 request := sumRequest{A: a, B: b}
40 response, err := e.SumEndpoint(ctx, request)
41 if err != nil {
42 return 0, err
43 }
44 return response.(sumResponse).V, nil
45 }
46
47 // Concat implements Service. Primarily useful in a client.
48 func (e Endpoints) Concat(ctx context.Context, a, b string) (string, error) {
49 request := concatRequest{A: a, B: b}
50 response, err := e.ConcatEndpoint(ctx, request)
51 if err != nil {
52 return "", err
53 }
54 return response.(concatResponse).V, err
55 }
56
57 // MakeSumEndpoint returns an endpoint that invokes Sum on the service.
58 // Primarily useful in a server.
59 func MakeSumEndpoint(s Service) endpoint.Endpoint {
60 return func(ctx context.Context, request interface{}) (response interface{}, err error) {
61 sumReq := request.(sumRequest)
62 v, err := s.Sum(ctx, sumReq.A, sumReq.B)
63 if err != nil {
64 return nil, err
65 }
66 return sumResponse{
67 V: v,
68 }, nil
69 }
70 }
71
72 // MakeConcatEndpoint returns an endpoint that invokes Concat on the service.
73 // Primarily useful in a server.
74 func MakeConcatEndpoint(s Service) endpoint.Endpoint {
75 return func(ctx context.Context, request interface{}) (response interface{}, err error) {
76 concatReq := request.(concatRequest)
77 v, err := s.Concat(ctx, concatReq.A, concatReq.B)
78 if err != nil {
79 return nil, err
80 }
81 return concatResponse{
82 V: v,
83 }, nil
84 }
85 }
86
87 // EndpointInstrumentingMiddleware returns an endpoint middleware that records
88 // the duration of each invocation to the passed histogram. The middleware adds
89 // a single field: "success", which is "true" if no error is returned, and
90 // "false" otherwise.
91 func EndpointInstrumentingMiddleware(duration metrics.TimeHistogram) endpoint.Middleware {
92 return func(next endpoint.Endpoint) endpoint.Endpoint {
93 return func(ctx context.Context, request interface{}) (response interface{}, err error) {
94
95 defer func(begin time.Time) {
96 f := metrics.Field{Key: "success", Value: fmt.Sprint(err == nil)}
97 duration.With(f).Observe(time.Since(begin))
98 }(time.Now())
99 return next(ctx, request)
100
101 }
102 }
103 }
104
105 // EndpointLoggingMiddleware returns an endpoint middleware that logs the
106 // duration of each invocation, and the resulting error, if any.
107 func EndpointLoggingMiddleware(logger log.Logger) endpoint.Middleware {
108 return func(next endpoint.Endpoint) endpoint.Endpoint {
109 return func(ctx context.Context, request interface{}) (response interface{}, err error) {
110
111 defer func(begin time.Time) {
112 logger.Log("error", err, "took", time.Since(begin))
113 }(time.Now())
114 return next(ctx, request)
115
116 }
117 }
118 }
119
120 // These types are unexported because they only exist to serve the endpoint
121 // domain, which is totally encapsulated in this package. They are otherwise
122 // opaque to all callers.
123
124 type sumRequest struct{ A, B int }
125
126 type sumResponse struct{ V int }
127
128 type concatRequest struct{ A, B string }
129
130 type concatResponse struct{ V string }
+0
-47
examples/addsvc/grpc_binding.go less more
0 package main
1
2 import (
3 "golang.org/x/net/context"
4
5 "github.com/opentracing/opentracing-go"
6
7 "github.com/go-kit/kit/examples/addsvc/pb"
8 "github.com/go-kit/kit/examples/addsvc/server"
9 servergrpc "github.com/go-kit/kit/examples/addsvc/server/grpc"
10 "github.com/go-kit/kit/log"
11 kitot "github.com/go-kit/kit/tracing/opentracing"
12 "github.com/go-kit/kit/transport/grpc"
13 )
14
15 type grpcBinding struct {
16 sum, concat grpc.Handler
17 }
18
19 func newGRPCBinding(ctx context.Context, tracer opentracing.Tracer, svc server.AddService, tracingLogger log.Logger) grpcBinding {
20 return grpcBinding{
21 sum: grpc.NewServer(
22 ctx,
23 kitot.TraceServer(tracer, "sum")(makeSumEndpoint(svc)),
24 servergrpc.DecodeSumRequest,
25 servergrpc.EncodeSumResponse,
26 grpc.ServerBefore(kitot.FromGRPCRequest(tracer, "", tracingLogger)),
27 ),
28 concat: grpc.NewServer(
29 ctx,
30 kitot.TraceServer(tracer, "concat")(makeConcatEndpoint(svc)),
31 servergrpc.DecodeConcatRequest,
32 servergrpc.EncodeConcatResponse,
33 grpc.ServerBefore(kitot.FromGRPCRequest(tracer, "", tracingLogger)),
34 ),
35 }
36 }
37
38 func (b grpcBinding) Sum(ctx context.Context, req *pb.SumRequest) (*pb.SumReply, error) {
39 _, resp, err := b.sum.ServeGRPC(ctx, req)
40 return resp.(*pb.SumReply), err
41 }
42
43 func (b grpcBinding) Concat(ctx context.Context, req *pb.ConcatRequest) (*pb.ConcatReply, error) {
44 _, resp, err := b.concat.ServeGRPC(ctx, req)
45 return resp.(*pb.ConcatReply), err
46 }
+0
-257
examples/addsvc/main.go less more
0 package main
1
2 import (
3 "flag"
4 "fmt"
5 stdlog "log"
6 "math/rand"
7 "net"
8 "net/http"
9 "net/rpc"
10 "os"
11 "os/signal"
12 "strings"
13 "syscall"
14 "time"
15
16 "github.com/apache/thrift/lib/go/thrift"
17 "github.com/lightstep/lightstep-tracer-go"
18 "github.com/opentracing/opentracing-go"
19 zipkin "github.com/openzipkin/zipkin-go-opentracing"
20 stdprometheus "github.com/prometheus/client_golang/prometheus"
21 appdashot "github.com/sourcegraph/appdash/opentracing"
22 "golang.org/x/net/context"
23 "google.golang.org/grpc"
24 "sourcegraph.com/sourcegraph/appdash"
25
26 "github.com/go-kit/kit/endpoint"
27 "github.com/go-kit/kit/examples/addsvc/pb"
28 "github.com/go-kit/kit/examples/addsvc/server"
29 thriftadd "github.com/go-kit/kit/examples/addsvc/thrift/gen-go/add"
30 "github.com/go-kit/kit/log"
31 "github.com/go-kit/kit/metrics"
32 "github.com/go-kit/kit/metrics/expvar"
33 "github.com/go-kit/kit/metrics/prometheus"
34 kitot "github.com/go-kit/kit/tracing/opentracing"
35 httptransport "github.com/go-kit/kit/transport/http"
36 )
37
38 func main() {
39 // Flag domain. Note that gRPC transitively registers flags via its import
40 // of glog. So, we define a new flag set, to keep those domains distinct.
41 fs := flag.NewFlagSet("", flag.ExitOnError)
42 var (
43 debugAddr = fs.String("debug.addr", ":8000", "Address for HTTP debug/instrumentation server")
44 httpAddr = fs.String("http.addr", ":8001", "Address for HTTP (JSON) server")
45 grpcAddr = fs.String("grpc.addr", ":8002", "Address for gRPC server")
46 netrpcAddr = fs.String("netrpc.addr", ":8003", "Address for net/rpc server")
47 thriftAddr = fs.String("thrift.addr", ":8004", "Address for Thrift server")
48 thriftProtocol = fs.String("thrift.protocol", "binary", "binary, compact, json, simplejson")
49 thriftBufferSize = fs.Int("thrift.buffer.size", 0, "0 for unbuffered")
50 thriftFramed = fs.Bool("thrift.framed", false, "true to enable framing")
51
52 // Supported OpenTracing backends
53 zipkinAddr = fs.String("zipkin.kafka.addr", "", "Enable Zipkin tracing via a Kafka server host:port")
54 appdashAddr = fs.String("appdash.addr", "", "Enable Appdash tracing via an Appdash server host:port")
55 lightstepAccessToken = fs.String("lightstep.token", "", "Enable LightStep tracing via a LightStep access token")
56 )
57 flag.Usage = fs.Usage // only show our flags
58 if err := fs.Parse(os.Args[1:]); err != nil {
59 fmt.Fprintf(os.Stderr, "%v", err)
60 os.Exit(1)
61 }
62
63 // package log
64 var logger log.Logger
65 {
66 logger = log.NewLogfmtLogger(os.Stderr)
67 logger = log.NewContext(logger).With("ts", log.DefaultTimestampUTC).With("caller", log.DefaultCaller)
68 stdlog.SetFlags(0) // flags are handled by Go kit's logger
69 stdlog.SetOutput(log.NewStdlibAdapter(logger)) // redirect anything using stdlib log to us
70 }
71
72 // package metrics
73 var requestDuration metrics.TimeHistogram
74 {
75 requestDuration = metrics.NewTimeHistogram(time.Nanosecond, metrics.NewMultiHistogram(
76 "request_duration_ns",
77 expvar.NewHistogram("request_duration_ns", 0, 5e9, 1, 50, 95, 99),
78 prometheus.NewSummary(stdprometheus.SummaryOpts{
79 Namespace: "myorg",
80 Subsystem: "addsvc",
81 Name: "duration_ns",
82 Help: "Request duration in nanoseconds.",
83 }, []string{"method"}),
84 ))
85 }
86
87 // Set up OpenTracing
88 var tracer opentracing.Tracer
89 {
90 switch {
91 case *appdashAddr != "" && *lightstepAccessToken == "" && *zipkinAddr == "":
92 tracer = appdashot.NewTracer(appdash.NewRemoteCollector(*appdashAddr))
93 case *appdashAddr == "" && *lightstepAccessToken != "" && *zipkinAddr == "":
94 tracer = lightstep.NewTracer(lightstep.Options{
95 AccessToken: *lightstepAccessToken,
96 })
97 defer lightstep.FlushLightStepTracer(tracer)
98 case *appdashAddr == "" && *lightstepAccessToken == "" && *zipkinAddr != "":
99 collector, err := zipkin.NewKafkaCollector(
100 strings.Split(*zipkinAddr, ","),
101 zipkin.KafkaLogger(logger),
102 )
103 if err != nil {
104 logger.Log("err", "unable to create collector", "fatal", err)
105 os.Exit(1)
106 }
107 tracer, err = zipkin.NewTracer(
108 zipkin.NewRecorder(collector, false, "localhost:80", "addsvc"),
109 )
110 if err != nil {
111 logger.Log("err", "unable to create zipkin tracer", "fatal", err)
112 os.Exit(1)
113 }
114 case *appdashAddr == "" && *lightstepAccessToken == "" && *zipkinAddr == "":
115 tracer = opentracing.GlobalTracer() // no-op
116 default:
117 logger.Log("fatal", "specify a single -appdash.addr, -lightstep.access.token or -zipkin.kafka.addr")
118 os.Exit(1)
119 }
120 }
121
122 // Business domain
123 var svc server.AddService
124 {
125 svc = pureAddService{}
126 svc = loggingMiddleware{svc, logger}
127 svc = instrumentingMiddleware{svc, requestDuration}
128 }
129
130 // Mechanical stuff
131 rand.Seed(time.Now().UnixNano())
132 root := context.Background()
133 errc := make(chan error)
134
135 go func() {
136 errc <- interrupt()
137 }()
138
139 // Debug/instrumentation
140 go func() {
141 transportLogger := log.NewContext(logger).With("transport", "debug")
142 transportLogger.Log("addr", *debugAddr)
143 errc <- http.ListenAndServe(*debugAddr, nil) // DefaultServeMux
144 }()
145
146 // Transport: HTTP/JSON
147 go func() {
148 var (
149 transportLogger = log.NewContext(logger).With("transport", "HTTP/JSON")
150 tracingLogger = log.NewContext(transportLogger).With("component", "tracing")
151 mux = http.NewServeMux()
152 sum, concat endpoint.Endpoint
153 )
154
155 sum = makeSumEndpoint(svc)
156 sum = kitot.TraceServer(tracer, "sum")(sum)
157 mux.Handle("/sum", httptransport.NewServer(
158 root,
159 sum,
160 server.DecodeSumRequest,
161 server.EncodeSumResponse,
162 httptransport.ServerErrorLogger(transportLogger),
163 httptransport.ServerBefore(kitot.FromHTTPRequest(tracer, "sum", tracingLogger)),
164 ))
165
166 concat = makeConcatEndpoint(svc)
167 concat = kitot.TraceServer(tracer, "concat")(concat)
168 mux.Handle("/concat", httptransport.NewServer(
169 root,
170 concat,
171 server.DecodeConcatRequest,
172 server.EncodeConcatResponse,
173 httptransport.ServerErrorLogger(transportLogger),
174 httptransport.ServerBefore(kitot.FromHTTPRequest(tracer, "concat", tracingLogger)),
175 ))
176
177 transportLogger.Log("addr", *httpAddr)
178 errc <- http.ListenAndServe(*httpAddr, mux)
179 }()
180
181 // Transport: gRPC
182 go func() {
183 transportLogger := log.NewContext(logger).With("transport", "gRPC")
184 tracingLogger := log.NewContext(transportLogger).With("component", "tracing")
185 ln, err := net.Listen("tcp", *grpcAddr)
186 if err != nil {
187 errc <- err
188 return
189 }
190 s := grpc.NewServer() // uses its own, internal context
191 pb.RegisterAddServer(s, newGRPCBinding(root, tracer, svc, tracingLogger))
192 transportLogger.Log("addr", *grpcAddr)
193 errc <- s.Serve(ln)
194 }()
195
196 // Transport: net/rpc
197 go func() {
198 transportLogger := log.NewContext(logger).With("transport", "net/rpc")
199 s := rpc.NewServer()
200 if err := s.RegisterName("addsvc", netrpcBinding{svc}); err != nil {
201 errc <- err
202 return
203 }
204 s.HandleHTTP(rpc.DefaultRPCPath, rpc.DefaultDebugPath)
205 transportLogger.Log("addr", *netrpcAddr)
206 errc <- http.ListenAndServe(*netrpcAddr, s)
207 }()
208
209 // Transport: Thrift
210 go func() {
211 var protocolFactory thrift.TProtocolFactory
212 switch *thriftProtocol {
213 case "binary":
214 protocolFactory = thrift.NewTBinaryProtocolFactoryDefault()
215 case "compact":
216 protocolFactory = thrift.NewTCompactProtocolFactory()
217 case "json":
218 protocolFactory = thrift.NewTJSONProtocolFactory()
219 case "simplejson":
220 protocolFactory = thrift.NewTSimpleJSONProtocolFactory()
221 default:
222 errc <- fmt.Errorf("invalid Thrift protocol %q", *thriftProtocol)
223 return
224 }
225 var transportFactory thrift.TTransportFactory
226 if *thriftBufferSize > 0 {
227 transportFactory = thrift.NewTBufferedTransportFactory(*thriftBufferSize)
228 } else {
229 transportFactory = thrift.NewTTransportFactory()
230 }
231 if *thriftFramed {
232 transportFactory = thrift.NewTFramedTransportFactory(transportFactory)
233 }
234 transport, err := thrift.NewTServerSocket(*thriftAddr)
235 if err != nil {
236 errc <- err
237 return
238 }
239 transportLogger := log.NewContext(logger).With("transport", "thrift")
240 transportLogger.Log("addr", *thriftAddr)
241 errc <- thrift.NewTSimpleServer4(
242 thriftadd.NewAddServiceProcessor(thriftBinding{svc}),
243 transport,
244 transportFactory,
245 protocolFactory,
246 ).Serve()
247 }()
248
249 logger.Log("fatal", <-errc)
250 }
251
252 func interrupt() error {
253 c := make(chan os.Signal)
254 signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
255 return fmt.Errorf("%s", <-c)
256 }
+0
-21
examples/addsvc/netrpc_binding.go less more
0 package main
1
2 import (
3 "github.com/go-kit/kit/examples/addsvc/server"
4 )
5
6 type netrpcBinding struct {
7 server.AddService
8 }
9
10 func (b netrpcBinding) Sum(request server.SumRequest, response *server.SumResponse) error {
11 v := b.AddService.Sum(request.A, request.B)
12 (*response) = server.SumResponse{V: v}
13 return nil
14 }
15
16 func (b netrpcBinding) Concat(request server.ConcatRequest, response *server.ConcatResponse) error {
17 v := b.AddService.Concat(request.A, request.B)
18 (*response) = server.ConcatResponse{V: v}
19 return nil
20 }
+0
-207
examples/addsvc/pb/add.pb.go less more
0 // Code generated by protoc-gen-go.
1 // source: add.proto
2 // DO NOT EDIT!
3
4 /*
5 Package pb is a generated protocol buffer package.
6
7 It is generated from these files:
8 add.proto
9
10 It has these top-level messages:
11 SumRequest
12 SumReply
13 ConcatRequest
14 ConcatReply
15 */
16 package pb
17
18 import proto "github.com/golang/protobuf/proto"
19 import fmt "fmt"
20 import math "math"
21
22 import (
23 context "golang.org/x/net/context"
24 grpc "google.golang.org/grpc"
25 )
26
27 // Reference imports to suppress errors if they are not otherwise used.
28 var _ = proto.Marshal
29 var _ = fmt.Errorf
30 var _ = math.Inf
31
32 // This is a compile-time assertion to ensure that this generated file
33 // is compatible with the proto package it is being compiled against.
34 const _ = proto.ProtoPackageIsVersion1
35
36 // The sum request contains two parameters.
37 type SumRequest struct {
38 A int64 `protobuf:"varint,1,opt,name=a" json:"a,omitempty"`
39 B int64 `protobuf:"varint,2,opt,name=b" json:"b,omitempty"`
40 }
41
42 func (m *SumRequest) Reset() { *m = SumRequest{} }
43 func (m *SumRequest) String() string { return proto.CompactTextString(m) }
44 func (*SumRequest) ProtoMessage() {}
45 func (*SumRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }
46
47 // The sum response contains the result of the calculation.
48 type SumReply struct {
49 V int64 `protobuf:"varint,1,opt,name=v" json:"v,omitempty"`
50 }
51
52 func (m *SumReply) Reset() { *m = SumReply{} }
53 func (m *SumReply) String() string { return proto.CompactTextString(m) }
54 func (*SumReply) ProtoMessage() {}
55 func (*SumReply) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} }
56
57 // The Concat request contains two parameters.
58 type ConcatRequest struct {
59 A string `protobuf:"bytes,1,opt,name=a" json:"a,omitempty"`
60 B string `protobuf:"bytes,2,opt,name=b" json:"b,omitempty"`
61 }
62
63 func (m *ConcatRequest) Reset() { *m = ConcatRequest{} }
64 func (m *ConcatRequest) String() string { return proto.CompactTextString(m) }
65 func (*ConcatRequest) ProtoMessage() {}
66 func (*ConcatRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} }
67
68 // The Concat response contains the result of the concatenation.
69 type ConcatReply struct {
70 V string `protobuf:"bytes,1,opt,name=v" json:"v,omitempty"`
71 }
72
73 func (m *ConcatReply) Reset() { *m = ConcatReply{} }
74 func (m *ConcatReply) String() string { return proto.CompactTextString(m) }
75 func (*ConcatReply) ProtoMessage() {}
76 func (*ConcatReply) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} }
77
78 func init() {
79 proto.RegisterType((*SumRequest)(nil), "pb.SumRequest")
80 proto.RegisterType((*SumReply)(nil), "pb.SumReply")
81 proto.RegisterType((*ConcatRequest)(nil), "pb.ConcatRequest")
82 proto.RegisterType((*ConcatReply)(nil), "pb.ConcatReply")
83 }
84
85 // Reference imports to suppress errors if they are not otherwise used.
86 var _ context.Context
87 var _ grpc.ClientConn
88
89 // This is a compile-time assertion to ensure that this generated file
90 // is compatible with the grpc package it is being compiled against.
91 const _ = grpc.SupportPackageIsVersion2
92
93 // Client API for Add service
94
95 type AddClient interface {
96 // Sums two integers.
97 Sum(ctx context.Context, in *SumRequest, opts ...grpc.CallOption) (*SumReply, error)
98 // Concatenates two strings
99 Concat(ctx context.Context, in *ConcatRequest, opts ...grpc.CallOption) (*ConcatReply, error)
100 }
101
102 type addClient struct {
103 cc *grpc.ClientConn
104 }
105
106 func NewAddClient(cc *grpc.ClientConn) AddClient {
107 return &addClient{cc}
108 }
109
110 func (c *addClient) Sum(ctx context.Context, in *SumRequest, opts ...grpc.CallOption) (*SumReply, error) {
111 out := new(SumReply)
112 err := grpc.Invoke(ctx, "/pb.Add/Sum", in, out, c.cc, opts...)
113 if err != nil {
114 return nil, err
115 }
116 return out, nil
117 }
118
119 func (c *addClient) Concat(ctx context.Context, in *ConcatRequest, opts ...grpc.CallOption) (*ConcatReply, error) {
120 out := new(ConcatReply)
121 err := grpc.Invoke(ctx, "/pb.Add/Concat", in, out, c.cc, opts...)
122 if err != nil {
123 return nil, err
124 }
125 return out, nil
126 }
127
128 // Server API for Add service
129
130 type AddServer interface {
131 // Sums two integers.
132 Sum(context.Context, *SumRequest) (*SumReply, error)
133 // Concatenates two strings
134 Concat(context.Context, *ConcatRequest) (*ConcatReply, error)
135 }
136
137 func RegisterAddServer(s *grpc.Server, srv AddServer) {
138 s.RegisterService(&_Add_serviceDesc, srv)
139 }
140
141 func _Add_Sum_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
142 in := new(SumRequest)
143 if err := dec(in); err != nil {
144 return nil, err
145 }
146 if interceptor == nil {
147 return srv.(AddServer).Sum(ctx, in)
148 }
149 info := &grpc.UnaryServerInfo{
150 Server: srv,
151 FullMethod: "/pb.Add/Sum",
152 }
153 handler := func(ctx context.Context, req interface{}) (interface{}, error) {
154 return srv.(AddServer).Sum(ctx, req.(*SumRequest))
155 }
156 return interceptor(ctx, in, info, handler)
157 }
158
159 func _Add_Concat_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
160 in := new(ConcatRequest)
161 if err := dec(in); err != nil {
162 return nil, err
163 }
164 if interceptor == nil {
165 return srv.(AddServer).Concat(ctx, in)
166 }
167 info := &grpc.UnaryServerInfo{
168 Server: srv,
169 FullMethod: "/pb.Add/Concat",
170 }
171 handler := func(ctx context.Context, req interface{}) (interface{}, error) {
172 return srv.(AddServer).Concat(ctx, req.(*ConcatRequest))
173 }
174 return interceptor(ctx, in, info, handler)
175 }
176
177 var _Add_serviceDesc = grpc.ServiceDesc{
178 ServiceName: "pb.Add",
179 HandlerType: (*AddServer)(nil),
180 Methods: []grpc.MethodDesc{
181 {
182 MethodName: "Sum",
183 Handler: _Add_Sum_Handler,
184 },
185 {
186 MethodName: "Concat",
187 Handler: _Add_Concat_Handler,
188 },
189 },
190 Streams: []grpc.StreamDesc{},
191 }
192
193 var fileDescriptor0 = []byte{
194 // 171 bytes of a gzipped FileDescriptorProto
195 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0xe2, 0x4c, 0x4c, 0x49, 0xd1,
196 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x2a, 0x48, 0x52, 0xd2, 0xe0, 0xe2, 0x0a, 0x2e, 0xcd,
197 0x0d, 0x4a, 0x2d, 0x2c, 0x4d, 0x2d, 0x2e, 0x11, 0xe2, 0xe1, 0x62, 0x4c, 0x94, 0x60, 0x54, 0x60,
198 0xd4, 0x60, 0x0e, 0x62, 0x4c, 0x04, 0xf1, 0x92, 0x24, 0x98, 0x20, 0xbc, 0x24, 0x25, 0x09, 0x2e,
199 0x0e, 0xb0, 0xca, 0x82, 0x9c, 0x4a, 0x90, 0x4c, 0x19, 0x4c, 0x5d, 0x99, 0x92, 0x36, 0x17, 0xaf,
200 0x73, 0x7e, 0x5e, 0x72, 0x62, 0x09, 0x86, 0x31, 0x9c, 0x28, 0xc6, 0x70, 0x82, 0x8c, 0x91, 0xe6,
201 0xe2, 0x86, 0x29, 0x46, 0x31, 0x09, 0x28, 0x59, 0x66, 0x14, 0xc3, 0xc5, 0xec, 0x98, 0x92, 0x22,
202 0xa4, 0xca, 0xc5, 0x0c, 0xb4, 0x4a, 0x88, 0x4f, 0xaf, 0x20, 0x49, 0x0f, 0xe1, 0x3a, 0x29, 0x1e,
203 0x38, 0x1f, 0xa8, 0x53, 0x89, 0x41, 0x48, 0x8f, 0x8b, 0x0d, 0x62, 0x94, 0x90, 0x20, 0x48, 0x06,
204 0xc5, 0x0d, 0x52, 0xfc, 0xc8, 0x42, 0x60, 0xf5, 0x49, 0x6c, 0x60, 0x6f, 0x1b, 0x03, 0x02, 0x00,
205 0x00, 0xff, 0xff, 0xb4, 0xc9, 0xe7, 0x58, 0x03, 0x01, 0x00, 0x00,
206 }
+0
-34
examples/addsvc/pb/add.proto less more
0 syntax = "proto3";
1
2 package pb;
3
4 // The Add service definition.
5 service Add {
6 // Sums two integers.
7 rpc Sum (SumRequest) returns (SumReply) {}
8
9 // Concatenates two strings
10 rpc Concat (ConcatRequest) returns (ConcatReply) {}
11 }
12
13 // The sum request contains two parameters.
14 message SumRequest {
15 int64 a = 1;
16 int64 b = 2;
17 }
18
19 // The sum response contains the result of the calculation.
20 message SumReply {
21 int64 v = 1;
22 }
23
24 // The Concat request contains two parameters.
25 message ConcatRequest {
26 string a = 1;
27 string b = 2;
28 }
29
30 // The Concat response contains the result of the concatenation.
31 message ConcatReply {
32 string v = 1;
33 }
0 // Code generated by protoc-gen-go.
1 // source: addsvc.proto
2 // DO NOT EDIT!
3
4 /*
5 Package pb is a generated protocol buffer package.
6
7 It is generated from these files:
8 addsvc.proto
9
10 It has these top-level messages:
11 SumRequest
12 SumReply
13 ConcatRequest
14 ConcatReply
15 */
16 package pb
17
18 import proto "github.com/golang/protobuf/proto"
19 import fmt "fmt"
20 import math "math"
21
22 import (
23 context "golang.org/x/net/context"
24 grpc "google.golang.org/grpc"
25 )
26
27 // Reference imports to suppress errors if they are not otherwise used.
28 var _ = proto.Marshal
29 var _ = fmt.Errorf
30 var _ = math.Inf
31
32 // This is a compile-time assertion to ensure that this generated file
33 // is compatible with the proto package it is being compiled against.
34 const _ = proto.ProtoPackageIsVersion1
35
36 // The sum request contains two parameters.
37 type SumRequest struct {
38 A int64 `protobuf:"varint,1,opt,name=a" json:"a,omitempty"`
39 B int64 `protobuf:"varint,2,opt,name=b" json:"b,omitempty"`
40 }
41
42 func (m *SumRequest) Reset() { *m = SumRequest{} }
43 func (m *SumRequest) String() string { return proto.CompactTextString(m) }
44 func (*SumRequest) ProtoMessage() {}
45 func (*SumRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }
46
47 // The sum response contains the result of the calculation.
48 type SumReply struct {
49 V int64 `protobuf:"varint,1,opt,name=v" json:"v,omitempty"`
50 }
51
52 func (m *SumReply) Reset() { *m = SumReply{} }
53 func (m *SumReply) String() string { return proto.CompactTextString(m) }
54 func (*SumReply) ProtoMessage() {}
55 func (*SumReply) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} }
56
57 // The Concat request contains two parameters.
58 type ConcatRequest struct {
59 A string `protobuf:"bytes,1,opt,name=a" json:"a,omitempty"`
60 B string `protobuf:"bytes,2,opt,name=b" json:"b,omitempty"`
61 }
62
63 func (m *ConcatRequest) Reset() { *m = ConcatRequest{} }
64 func (m *ConcatRequest) String() string { return proto.CompactTextString(m) }
65 func (*ConcatRequest) ProtoMessage() {}
66 func (*ConcatRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} }
67
68 // The Concat response contains the result of the concatenation.
69 type ConcatReply struct {
70 V string `protobuf:"bytes,1,opt,name=v" json:"v,omitempty"`
71 }
72
73 func (m *ConcatReply) Reset() { *m = ConcatReply{} }
74 func (m *ConcatReply) String() string { return proto.CompactTextString(m) }
75 func (*ConcatReply) ProtoMessage() {}
76 func (*ConcatReply) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} }
77
78 func init() {
79 proto.RegisterType((*SumRequest)(nil), "pb.SumRequest")
80 proto.RegisterType((*SumReply)(nil), "pb.SumReply")
81 proto.RegisterType((*ConcatRequest)(nil), "pb.ConcatRequest")
82 proto.RegisterType((*ConcatReply)(nil), "pb.ConcatReply")
83 }
84
85 // Reference imports to suppress errors if they are not otherwise used.
86 var _ context.Context
87 var _ grpc.ClientConn
88
89 // This is a compile-time assertion to ensure that this generated file
90 // is compatible with the grpc package it is being compiled against.
91 const _ = grpc.SupportPackageIsVersion2
92
93 // Client API for Add service
94
95 type AddClient interface {
96 // Sums two integers.
97 Sum(ctx context.Context, in *SumRequest, opts ...grpc.CallOption) (*SumReply, error)
98 // Concatenates two strings
99 Concat(ctx context.Context, in *ConcatRequest, opts ...grpc.CallOption) (*ConcatReply, error)
100 }
101
102 type addClient struct {
103 cc *grpc.ClientConn
104 }
105
106 func NewAddClient(cc *grpc.ClientConn) AddClient {
107 return &addClient{cc}
108 }
109
110 func (c *addClient) Sum(ctx context.Context, in *SumRequest, opts ...grpc.CallOption) (*SumReply, error) {
111 out := new(SumReply)
112 err := grpc.Invoke(ctx, "/pb.Add/Sum", in, out, c.cc, opts...)
113 if err != nil {
114 return nil, err
115 }
116 return out, nil
117 }
118
119 func (c *addClient) Concat(ctx context.Context, in *ConcatRequest, opts ...grpc.CallOption) (*ConcatReply, error) {
120 out := new(ConcatReply)
121 err := grpc.Invoke(ctx, "/pb.Add/Concat", in, out, c.cc, opts...)
122 if err != nil {
123 return nil, err
124 }
125 return out, nil
126 }
127
128 // Server API for Add service
129
130 type AddServer interface {
131 // Sums two integers.
132 Sum(context.Context, *SumRequest) (*SumReply, error)
133 // Concatenates two strings
134 Concat(context.Context, *ConcatRequest) (*ConcatReply, error)
135 }
136
137 func RegisterAddServer(s *grpc.Server, srv AddServer) {
138 s.RegisterService(&_Add_serviceDesc, srv)
139 }
140
141 func _Add_Sum_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
142 in := new(SumRequest)
143 if err := dec(in); err != nil {
144 return nil, err
145 }
146 if interceptor == nil {
147 return srv.(AddServer).Sum(ctx, in)
148 }
149 info := &grpc.UnaryServerInfo{
150 Server: srv,
151 FullMethod: "/pb.Add/Sum",
152 }
153 handler := func(ctx context.Context, req interface{}) (interface{}, error) {
154 return srv.(AddServer).Sum(ctx, req.(*SumRequest))
155 }
156 return interceptor(ctx, in, info, handler)
157 }
158
159 func _Add_Concat_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
160 in := new(ConcatRequest)
161 if err := dec(in); err != nil {
162 return nil, err
163 }
164 if interceptor == nil {
165 return srv.(AddServer).Concat(ctx, in)
166 }
167 info := &grpc.UnaryServerInfo{
168 Server: srv,
169 FullMethod: "/pb.Add/Concat",
170 }
171 handler := func(ctx context.Context, req interface{}) (interface{}, error) {
172 return srv.(AddServer).Concat(ctx, req.(*ConcatRequest))
173 }
174 return interceptor(ctx, in, info, handler)
175 }
176
177 var _Add_serviceDesc = grpc.ServiceDesc{
178 ServiceName: "pb.Add",
179 HandlerType: (*AddServer)(nil),
180 Methods: []grpc.MethodDesc{
181 {
182 MethodName: "Sum",
183 Handler: _Add_Sum_Handler,
184 },
185 {
186 MethodName: "Concat",
187 Handler: _Add_Concat_Handler,
188 },
189 },
190 Streams: []grpc.StreamDesc{},
191 }
192
193 var fileDescriptor0 = []byte{
194 // 174 bytes of a gzipped FileDescriptorProto
195 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0xe2, 0x49, 0x4c, 0x49, 0x29,
196 0x2e, 0x4b, 0xd6, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x2a, 0x48, 0x52, 0xd2, 0xe0, 0xe2,
197 0x0a, 0x2e, 0xcd, 0x0d, 0x4a, 0x2d, 0x2c, 0x4d, 0x2d, 0x2e, 0x11, 0xe2, 0xe1, 0x62, 0x4c, 0x94,
198 0x60, 0x54, 0x60, 0xd4, 0x60, 0x0e, 0x62, 0x4c, 0x04, 0xf1, 0x92, 0x24, 0x98, 0x20, 0xbc, 0x24,
199 0x25, 0x09, 0x2e, 0x0e, 0xb0, 0xca, 0x82, 0x9c, 0x4a, 0x90, 0x4c, 0x19, 0x4c, 0x5d, 0x99, 0x92,
200 0x36, 0x17, 0xaf, 0x73, 0x7e, 0x5e, 0x72, 0x62, 0x09, 0x86, 0x31, 0x9c, 0x28, 0xc6, 0x70, 0x82,
201 0x8c, 0x91, 0xe6, 0xe2, 0x86, 0x29, 0x46, 0x31, 0x09, 0x28, 0x59, 0x66, 0x14, 0xc3, 0xc5, 0xec,
202 0x98, 0x92, 0x22, 0xa4, 0xca, 0xc5, 0x0c, 0xb4, 0x4a, 0x88, 0x4f, 0xaf, 0x20, 0x49, 0x0f, 0xe1,
203 0x3a, 0x29, 0x1e, 0x38, 0x1f, 0xa8, 0x53, 0x89, 0x41, 0x48, 0x8f, 0x8b, 0x0d, 0x62, 0x94, 0x90,
204 0x20, 0x48, 0x06, 0xc5, 0x0d, 0x52, 0xfc, 0xc8, 0x42, 0x60, 0xf5, 0x49, 0x6c, 0x60, 0x6f, 0x1b,
205 0x03, 0x02, 0x00, 0x00, 0xff, 0xff, 0x8b, 0x2c, 0x12, 0xb4, 0x06, 0x01, 0x00, 0x00,
206 }
0 syntax = "proto3";
1
2 package pb;
3
4 // The Add service definition.
5 service Add {
6 // Sums two integers.
7 rpc Sum (SumRequest) returns (SumReply) {}
8
9 // Concatenates two strings
10 rpc Concat (ConcatRequest) returns (ConcatReply) {}
11 }
12
13 // The sum request contains two parameters.
14 message SumRequest {
15 int64 a = 1;
16 int64 b = 2;
17 }
18
19 // The sum response contains the result of the calculation.
20 message SumReply {
21 int64 v = 1;
22 }
23
24 // The Concat request contains two parameters.
25 message ConcatRequest {
26 string a = 1;
27 string b = 2;
28 }
29
30 // The Concat response contains the result of the concatenation.
31 message ConcatReply {
32 string v = 1;
33 }
1010 # See also
1111 # https://github.com/grpc/grpc-go/tree/master/examples
1212
13 protoc add.proto --go_out=plugins=grpc:.
13 protoc addsvc.proto --go_out=plugins=grpc:.
+0
-84
examples/addsvc/server/encode_decode.go less more
0 package server
1
2 import (
3 "bytes"
4 "encoding/json"
5 "io/ioutil"
6 "net/http"
7
8 "golang.org/x/net/context"
9 )
10
11 // DecodeSumRequest decodes the request from the provided HTTP request, simply
12 // by JSON decoding from the request body. It's designed to be used in
13 // transport/http.Server.
14 func DecodeSumRequest(_ context.Context, r *http.Request) (interface{}, error) {
15 var request SumRequest
16 err := json.NewDecoder(r.Body).Decode(&request)
17 return &request, err
18 }
19
20 // EncodeSumResponse encodes the response to the provided HTTP response
21 // writer, simply by JSON encoding to the writer. It's designed to be used in
22 // transport/http.Server.
23 func EncodeSumResponse(_ context.Context, w http.ResponseWriter, response interface{}) error {
24 return json.NewEncoder(w).Encode(response)
25 }
26
27 // DecodeConcatRequest decodes the request from the provided HTTP request,
28 // simply by JSON decoding from the request body. It's designed to be used in
29 // transport/http.Server.
30 func DecodeConcatRequest(_ context.Context, r *http.Request) (interface{}, error) {
31 var request ConcatRequest
32 err := json.NewDecoder(r.Body).Decode(&request)
33 return &request, err
34 }
35
36 // EncodeConcatResponse encodes the response to the provided HTTP response
37 // writer, simply by JSON encoding to the writer. It's designed to be used in
38 // transport/http.Server.
39 func EncodeConcatResponse(_ context.Context, w http.ResponseWriter, response interface{}) error {
40 return json.NewEncoder(w).Encode(response)
41 }
42
43 // EncodeSumRequest encodes the request to the provided HTTP request, simply
44 // by JSON encoding to the request body. It's designed to be used in
45 // transport/http.Client.
46 func EncodeSumRequest(_ context.Context, r *http.Request, request interface{}) error {
47 var buf bytes.Buffer
48 if err := json.NewEncoder(&buf).Encode(request); err != nil {
49 return err
50 }
51 r.Body = ioutil.NopCloser(&buf)
52 return nil
53 }
54
55 // DecodeSumResponse decodes the response from the provided HTTP response,
56 // simply by JSON decoding from the response body. It's designed to be used in
57 // transport/http.Client.
58 func DecodeSumResponse(_ context.Context, resp *http.Response) (interface{}, error) {
59 var response SumResponse
60 err := json.NewDecoder(resp.Body).Decode(&response)
61 return response, err
62 }
63
64 // EncodeConcatRequest encodes the request to the provided HTTP request,
65 // simply by JSON encoding to the request body. It's designed to be used in
66 // transport/http.Client.
67 func EncodeConcatRequest(_ context.Context, r *http.Request, request interface{}) error {
68 var buf bytes.Buffer
69 if err := json.NewEncoder(&buf).Encode(request); err != nil {
70 return err
71 }
72 r.Body = ioutil.NopCloser(&buf)
73 return nil
74 }
75
76 // DecodeConcatResponse decodes the response from the provided HTTP response,
77 // simply by JSON decoding from the response body. It's designed to be used in
78 // transport/http.Client.
79 func DecodeConcatResponse(_ context.Context, resp *http.Response) (interface{}, error) {
80 var response ConcatResponse
81 err := json.NewDecoder(resp.Body).Decode(&response)
82 return response, err
83 }
+0
-42
examples/addsvc/server/grpc/encode_decode.go less more
0 package grpc
1
2 import (
3 "golang.org/x/net/context"
4
5 "github.com/go-kit/kit/examples/addsvc/pb"
6 "github.com/go-kit/kit/examples/addsvc/server"
7 )
8
9 func DecodeSumRequest(ctx context.Context, req interface{}) (interface{}, error) {
10 sumRequest := req.(*pb.SumRequest)
11
12 return &server.SumRequest{
13 A: int(sumRequest.A),
14 B: int(sumRequest.B),
15 }, nil
16 }
17
18 func DecodeConcatRequest(ctx context.Context, req interface{}) (interface{}, error) {
19 concatRequest := req.(*pb.ConcatRequest)
20
21 return &server.ConcatRequest{
22 A: concatRequest.A,
23 B: concatRequest.B,
24 }, nil
25 }
26
27 func EncodeSumResponse(ctx context.Context, resp interface{}) (interface{}, error) {
28 domainResponse := resp.(server.SumResponse)
29
30 return &pb.SumReply{
31 V: int64(domainResponse.V),
32 }, nil
33 }
34
35 func EncodeConcatResponse(ctx context.Context, resp interface{}) (interface{}, error) {
36 domainResponse := resp.(server.ConcatResponse)
37
38 return &pb.ConcatReply{
39 V: domainResponse.V,
40 }, nil
41 }
+0
-23
examples/addsvc/server/request_response.go less more
0 package server
1
2 // SumRequest is the business domain type for a Sum method request.
3 type SumRequest struct {
4 A int `json:"a"`
5 B int `json:"b"`
6 }
7
8 // SumResponse is the business domain type for a Sum method response.
9 type SumResponse struct {
10 V int `json:"v"`
11 }
12
13 // ConcatRequest is the business domain type for a Concat method request.
14 type ConcatRequest struct {
15 A string `json:"a"`
16 B string `json:"b"`
17 }
18
19 // ConcatResponse is the business domain type for a Concat method response.
20 type ConcatResponse struct {
21 V string `json:"v"`
22 }
+0
-7
examples/addsvc/server/service.go less more
0 package server
1
2 // AddService is the abstract representation of this service.
3 type AddService interface {
4 Sum(a, b int) int
5 Concat(a, b string) string
6 }
0 package main
0 package addsvc
1
2 // This file contains the Service definition, and a basic service
3 // implementation. It also includes service middlewares.
14
25 import (
6 "errors"
37 "time"
48
5 "github.com/go-kit/kit/examples/addsvc/server"
9 "golang.org/x/net/context"
10
611 "github.com/go-kit/kit/log"
712 "github.com/go-kit/kit/metrics"
813 )
914
10 type pureAddService struct{}
11
12 func (pureAddService) Sum(a, b int) int { return a + b }
13
14 func (pureAddService) Concat(a, b string) string { return a + b }
15
16 type loggingMiddleware struct {
17 server.AddService
18 log.Logger
15 // Service describes a service that adds things together.
16 type Service interface {
17 Sum(ctx context.Context, a, b int) (int, error)
18 Concat(ctx context.Context, a, b string) (string, error)
1919 }
2020
21 func (m loggingMiddleware) Sum(a, b int) (v int) {
21 var (
22 // ErrTwoZeroes is an arbitrary business rule for the Add method.
23 ErrTwoZeroes = errors.New("can't sum two zeroes")
24
25 // ErrIntOverflow protects the Add method.
26 ErrIntOverflow = errors.New("integer overflow")
27
28 // ErrMaxSizeExceeded protects the Concat method.
29 ErrMaxSizeExceeded = errors.New("result exceeds maximum size")
30 )
31
32 // NewBasicService returns a naïve, stateless implementation of Service.
33 func NewBasicService() Service {
34 return basicService{}
35 }
36
37 type basicService struct{}
38
39 const (
40 intMax = 1<<31 - 1
41 intMin = -(intMax + 1)
42 maxLen = 102400
43 )
44
45 // Sum implements Service.
46 func (s basicService) Sum(_ context.Context, a, b int) (int, error) {
47 if a == 0 && b == 0 {
48 return 0, ErrTwoZeroes
49 }
50 if (b > 0 && a > (intMax-b)) || (b < 0 && a < (intMin-b)) {
51 return 0, ErrIntOverflow
52 }
53 return a + b, nil
54 }
55
56 // Concat implements Service.
57 func (s basicService) Concat(_ context.Context, a, b string) (string, error) {
58 if len(a)+len(b) > maxLen {
59 return "", ErrMaxSizeExceeded
60 }
61 return a + b, nil
62 }
63
64 // Middleware describes a service (as opposed to endpoint) middleware.
65 type Middleware func(Service) Service
66
67 // ServiceLoggingMiddleware returns a service middleware that logs the
68 // parameters and result of each method invocation.
69 func ServiceLoggingMiddleware(logger log.Logger) Middleware {
70 return func(next Service) Service {
71 return serviceLoggingMiddleware{
72 logger: logger,
73 next: next,
74 }
75 }
76 }
77
78 type serviceLoggingMiddleware struct {
79 logger log.Logger
80 next Service
81 }
82
83 func (mw serviceLoggingMiddleware) Sum(ctx context.Context, a, b int) (v int, err error) {
2284 defer func(begin time.Time) {
23 m.Logger.Log(
24 "method", "sum",
25 "a", a,
26 "b", b,
27 "v", v,
85 mw.logger.Log(
86 "method", "Sum",
87 "a", a, "b", b, "result", v, "error", err,
2888 "took", time.Since(begin),
2989 )
3090 }(time.Now())
31 v = m.AddService.Sum(a, b)
32 return
91 return mw.next.Sum(ctx, a, b)
3392 }
3493
35 func (m loggingMiddleware) Concat(a, b string) (v string) {
94 func (mw serviceLoggingMiddleware) Concat(ctx context.Context, a, b string) (v string, err error) {
3695 defer func(begin time.Time) {
37 m.Logger.Log(
38 "method", "concat",
39 "a", a,
40 "b", b,
41 "v", v,
96 mw.logger.Log(
97 "method", "Concat",
98 "a", a, "b", b, "result", v, "error", err,
4299 "took", time.Since(begin),
43100 )
44101 }(time.Now())
45 v = m.AddService.Concat(a, b)
46 return
102 return mw.next.Concat(ctx, a, b)
47103 }
48104
49 type instrumentingMiddleware struct {
50 server.AddService
51 requestDuration metrics.TimeHistogram
105 // ServiceInstrumentingMiddleware returns a service middleware that instruments
106 // the number of integers summed and characters concatenated over the lifetime of
107 // the service.
108 func ServiceInstrumentingMiddleware(ints, chars metrics.Counter) Middleware {
109 return func(next Service) Service {
110 return serviceInstrumentingMiddleware{
111 ints: ints,
112 chars: chars,
113 next: next,
114 }
115 }
52116 }
53117
54 func (m instrumentingMiddleware) Sum(a, b int) (v int) {
55 defer func(begin time.Time) {
56 methodField := metrics.Field{Key: "method", Value: "sum"}
57 m.requestDuration.With(methodField).Observe(time.Since(begin))
58 }(time.Now())
59 v = m.AddService.Sum(a, b)
60 return
118 type serviceInstrumentingMiddleware struct {
119 ints metrics.Counter
120 chars metrics.Counter
121 next Service
61122 }
62123
63 func (m instrumentingMiddleware) Concat(a, b string) (v string) {
64 defer func(begin time.Time) {
65 methodField := metrics.Field{Key: "method", Value: "concat"}
66 m.requestDuration.With(methodField).Observe(time.Since(begin))
67 }(time.Now())
68 v = m.AddService.Concat(a, b)
69 return
124 func (mw serviceInstrumentingMiddleware) Sum(ctx context.Context, a, b int) (int, error) {
125 v, err := mw.next.Sum(ctx, a, b)
126 mw.ints.Add(uint64(v))
127 return v, err
70128 }
129
130 func (mw serviceInstrumentingMiddleware) Concat(ctx context.Context, a, b string) (string, error) {
131 v, err := mw.next.Concat(ctx, a, b)
132 mw.chars.Add(uint64(len(v)))
133 return v, err
134 }
+0
-12
examples/addsvc/thrift/add.thrift less more
0 struct SumReply {
1 1: i64 value
2 }
3
4 struct ConcatReply {
5 1: string value
6 }
7
8 service AddService {
9 SumReply Sum(1: i64 a, 2: i64 b)
10 ConcatReply Concat(1: string a, 2: string b)
11 }
0 struct SumReply {
1 1: i64 value
2 }
3
4 struct ConcatReply {
5 1: string value
6 }
7
8 service AddService {
9 SumReply Sum(1: i64 a, 2: i64 b)
10 ConcatReply Concat(1: string a, 2: string b)
11 }
11
22 # See also https://thrift.apache.org/tutorial/go
33
4 thrift -r --gen "go:package_prefix=github.com/go-kit/kit/examples/addsvc/thrift/gen-go/,thrift_import=github.com/apache/thrift/lib/go/thrift" add.thrift
4 thrift -r --gen "go:package_prefix=github.com/go-kit/kit/examples/addsvc/thrift/gen-go/,thrift_import=github.com/apache/thrift/lib/go/thrift" addsvc.thrift
+0
-157
examples/addsvc/thrift/gen-go/add/add_service-remote/add_service-remote.go less more
0 // Autogenerated by Thrift Compiler (0.9.3)
1 // DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
2
3 package main
4
5 import (
6 "flag"
7 "fmt"
8 "github.com/apache/thrift/lib/go/thrift"
9 "github.com/go-kit/kit/examples/addsvc/thrift/gen-go/add"
10 "math"
11 "net"
12 "net/url"
13 "os"
14 "strconv"
15 "strings"
16 )
17
18 func Usage() {
19 fmt.Fprintln(os.Stderr, "Usage of ", os.Args[0], " [-h host:port] [-u url] [-f[ramed]] function [arg1 [arg2...]]:")
20 flag.PrintDefaults()
21 fmt.Fprintln(os.Stderr, "\nFunctions:")
22 fmt.Fprintln(os.Stderr, " SumReply Sum(i64 a, i64 b)")
23 fmt.Fprintln(os.Stderr, " ConcatReply Concat(string a, string b)")
24 fmt.Fprintln(os.Stderr)
25 os.Exit(0)
26 }
27
28 func main() {
29 flag.Usage = Usage
30 var host string
31 var port int
32 var protocol string
33 var urlString string
34 var framed bool
35 var useHttp bool
36 var parsedUrl url.URL
37 var trans thrift.TTransport
38 _ = strconv.Atoi
39 _ = math.Abs
40 flag.Usage = Usage
41 flag.StringVar(&host, "h", "localhost", "Specify host and port")
42 flag.IntVar(&port, "p", 9090, "Specify port")
43 flag.StringVar(&protocol, "P", "binary", "Specify the protocol (binary, compact, simplejson, json)")
44 flag.StringVar(&urlString, "u", "", "Specify the url")
45 flag.BoolVar(&framed, "framed", false, "Use framed transport")
46 flag.BoolVar(&useHttp, "http", false, "Use http")
47 flag.Parse()
48
49 if len(urlString) > 0 {
50 parsedUrl, err := url.Parse(urlString)
51 if err != nil {
52 fmt.Fprintln(os.Stderr, "Error parsing URL: ", err)
53 flag.Usage()
54 }
55 host = parsedUrl.Host
56 useHttp = len(parsedUrl.Scheme) <= 0 || parsedUrl.Scheme == "http"
57 } else if useHttp {
58 _, err := url.Parse(fmt.Sprint("http://", host, ":", port))
59 if err != nil {
60 fmt.Fprintln(os.Stderr, "Error parsing URL: ", err)
61 flag.Usage()
62 }
63 }
64
65 cmd := flag.Arg(0)
66 var err error
67 if useHttp {
68 trans, err = thrift.NewTHttpClient(parsedUrl.String())
69 } else {
70 portStr := fmt.Sprint(port)
71 if strings.Contains(host, ":") {
72 host, portStr, err = net.SplitHostPort(host)
73 if err != nil {
74 fmt.Fprintln(os.Stderr, "error with host:", err)
75 os.Exit(1)
76 }
77 }
78 trans, err = thrift.NewTSocket(net.JoinHostPort(host, portStr))
79 if err != nil {
80 fmt.Fprintln(os.Stderr, "error resolving address:", err)
81 os.Exit(1)
82 }
83 if framed {
84 trans = thrift.NewTFramedTransport(trans)
85 }
86 }
87 if err != nil {
88 fmt.Fprintln(os.Stderr, "Error creating transport", err)
89 os.Exit(1)
90 }
91 defer trans.Close()
92 var protocolFactory thrift.TProtocolFactory
93 switch protocol {
94 case "compact":
95 protocolFactory = thrift.NewTCompactProtocolFactory()
96 break
97 case "simplejson":
98 protocolFactory = thrift.NewTSimpleJSONProtocolFactory()
99 break
100 case "json":
101 protocolFactory = thrift.NewTJSONProtocolFactory()
102 break
103 case "binary", "":
104 protocolFactory = thrift.NewTBinaryProtocolFactoryDefault()
105 break
106 default:
107 fmt.Fprintln(os.Stderr, "Invalid protocol specified: ", protocol)
108 Usage()
109 os.Exit(1)
110 }
111 client := add.NewAddServiceClientFactory(trans, protocolFactory)
112 if err := trans.Open(); err != nil {
113 fmt.Fprintln(os.Stderr, "Error opening socket to ", host, ":", port, " ", err)
114 os.Exit(1)
115 }
116
117 switch cmd {
118 case "Sum":
119 if flag.NArg()-1 != 2 {
120 fmt.Fprintln(os.Stderr, "Sum requires 2 args")
121 flag.Usage()
122 }
123 argvalue0, err6 := (strconv.ParseInt(flag.Arg(1), 10, 64))
124 if err6 != nil {
125 Usage()
126 return
127 }
128 value0 := argvalue0
129 argvalue1, err7 := (strconv.ParseInt(flag.Arg(2), 10, 64))
130 if err7 != nil {
131 Usage()
132 return
133 }
134 value1 := argvalue1
135 fmt.Print(client.Sum(value0, value1))
136 fmt.Print("\n")
137 break
138 case "Concat":
139 if flag.NArg()-1 != 2 {
140 fmt.Fprintln(os.Stderr, "Concat requires 2 args")
141 flag.Usage()
142 }
143 argvalue0 := flag.Arg(1)
144 value0 := argvalue0
145 argvalue1 := flag.Arg(2)
146 value1 := argvalue1
147 fmt.Print(client.Concat(value0, value1))
148 fmt.Print("\n")
149 break
150 case "":
151 Usage()
152 break
153 default:
154 fmt.Fprintln(os.Stderr, "Invalid function ", cmd)
155 }
156 }
+0
-807
examples/addsvc/thrift/gen-go/add/addservice.go less more
0 // Autogenerated by Thrift Compiler (0.9.3)
1 // DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
2
3 package add
4
5 import (
6 "bytes"
7 "fmt"
8 "github.com/apache/thrift/lib/go/thrift"
9 )
10
11 // (needed to ensure safety because of naive import list construction.)
12 var _ = thrift.ZERO
13 var _ = fmt.Printf
14 var _ = bytes.Equal
15
16 type AddService interface {
17 // Parameters:
18 // - A
19 // - B
20 Sum(a int64, b int64) (r *SumReply, err error)
21 // Parameters:
22 // - A
23 // - B
24 Concat(a string, b string) (r *ConcatReply, err error)
25 }
26
27 type AddServiceClient struct {
28 Transport thrift.TTransport
29 ProtocolFactory thrift.TProtocolFactory
30 InputProtocol thrift.TProtocol
31 OutputProtocol thrift.TProtocol
32 SeqId int32
33 }
34
35 func NewAddServiceClientFactory(t thrift.TTransport, f thrift.TProtocolFactory) *AddServiceClient {
36 return &AddServiceClient{Transport: t,
37 ProtocolFactory: f,
38 InputProtocol: f.GetProtocol(t),
39 OutputProtocol: f.GetProtocol(t),
40 SeqId: 0,
41 }
42 }
43
44 func NewAddServiceClientProtocol(t thrift.TTransport, iprot thrift.TProtocol, oprot thrift.TProtocol) *AddServiceClient {
45 return &AddServiceClient{Transport: t,
46 ProtocolFactory: nil,
47 InputProtocol: iprot,
48 OutputProtocol: oprot,
49 SeqId: 0,
50 }
51 }
52
53 // Parameters:
54 // - A
55 // - B
56 func (p *AddServiceClient) Sum(a int64, b int64) (r *SumReply, err error) {
57 if err = p.sendSum(a, b); err != nil {
58 return
59 }
60 return p.recvSum()
61 }
62
63 func (p *AddServiceClient) sendSum(a int64, b int64) (err error) {
64 oprot := p.OutputProtocol
65 if oprot == nil {
66 oprot = p.ProtocolFactory.GetProtocol(p.Transport)
67 p.OutputProtocol = oprot
68 }
69 p.SeqId++
70 if err = oprot.WriteMessageBegin("Sum", thrift.CALL, p.SeqId); err != nil {
71 return
72 }
73 args := AddServiceSumArgs{
74 A: a,
75 B: b,
76 }
77 if err = args.Write(oprot); err != nil {
78 return
79 }
80 if err = oprot.WriteMessageEnd(); err != nil {
81 return
82 }
83 return oprot.Flush()
84 }
85
86 func (p *AddServiceClient) recvSum() (value *SumReply, err error) {
87 iprot := p.InputProtocol
88 if iprot == nil {
89 iprot = p.ProtocolFactory.GetProtocol(p.Transport)
90 p.InputProtocol = iprot
91 }
92 method, mTypeId, seqId, err := iprot.ReadMessageBegin()
93 if err != nil {
94 return
95 }
96 if method != "Sum" {
97 err = thrift.NewTApplicationException(thrift.WRONG_METHOD_NAME, "Sum failed: wrong method name")
98 return
99 }
100 if p.SeqId != seqId {
101 err = thrift.NewTApplicationException(thrift.BAD_SEQUENCE_ID, "Sum failed: out of sequence response")
102 return
103 }
104 if mTypeId == thrift.EXCEPTION {
105 error0 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception")
106 var error1 error
107 error1, err = error0.Read(iprot)
108 if err != nil {
109 return
110 }
111 if err = iprot.ReadMessageEnd(); err != nil {
112 return
113 }
114 err = error1
115 return
116 }
117 if mTypeId != thrift.REPLY {
118 err = thrift.NewTApplicationException(thrift.INVALID_MESSAGE_TYPE_EXCEPTION, "Sum failed: invalid message type")
119 return
120 }
121 result := AddServiceSumResult{}
122 if err = result.Read(iprot); err != nil {
123 return
124 }
125 if err = iprot.ReadMessageEnd(); err != nil {
126 return
127 }
128 value = result.GetSuccess()
129 return
130 }
131
132 // Parameters:
133 // - A
134 // - B
135 func (p *AddServiceClient) Concat(a string, b string) (r *ConcatReply, err error) {
136 if err = p.sendConcat(a, b); err != nil {
137 return
138 }
139 return p.recvConcat()
140 }
141
142 func (p *AddServiceClient) sendConcat(a string, b string) (err error) {
143 oprot := p.OutputProtocol
144 if oprot == nil {
145 oprot = p.ProtocolFactory.GetProtocol(p.Transport)
146 p.OutputProtocol = oprot
147 }
148 p.SeqId++
149 if err = oprot.WriteMessageBegin("Concat", thrift.CALL, p.SeqId); err != nil {
150 return
151 }
152 args := AddServiceConcatArgs{
153 A: a,
154 B: b,
155 }
156 if err = args.Write(oprot); err != nil {
157 return
158 }
159 if err = oprot.WriteMessageEnd(); err != nil {
160 return
161 }
162 return oprot.Flush()
163 }
164
165 func (p *AddServiceClient) recvConcat() (value *ConcatReply, err error) {
166 iprot := p.InputProtocol
167 if iprot == nil {
168 iprot = p.ProtocolFactory.GetProtocol(p.Transport)
169 p.InputProtocol = iprot
170 }
171 method, mTypeId, seqId, err := iprot.ReadMessageBegin()
172 if err != nil {
173 return
174 }
175 if method != "Concat" {
176 err = thrift.NewTApplicationException(thrift.WRONG_METHOD_NAME, "Concat failed: wrong method name")
177 return
178 }
179 if p.SeqId != seqId {
180 err = thrift.NewTApplicationException(thrift.BAD_SEQUENCE_ID, "Concat failed: out of sequence response")
181 return
182 }
183 if mTypeId == thrift.EXCEPTION {
184 error2 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception")
185 var error3 error
186 error3, err = error2.Read(iprot)
187 if err != nil {
188 return
189 }
190 if err = iprot.ReadMessageEnd(); err != nil {
191 return
192 }
193 err = error3
194 return
195 }
196 if mTypeId != thrift.REPLY {
197 err = thrift.NewTApplicationException(thrift.INVALID_MESSAGE_TYPE_EXCEPTION, "Concat failed: invalid message type")
198 return
199 }
200 result := AddServiceConcatResult{}
201 if err = result.Read(iprot); err != nil {
202 return
203 }
204 if err = iprot.ReadMessageEnd(); err != nil {
205 return
206 }
207 value = result.GetSuccess()
208 return
209 }
210
211 type AddServiceProcessor struct {
212 processorMap map[string]thrift.TProcessorFunction
213 handler AddService
214 }
215
216 func (p *AddServiceProcessor) AddToProcessorMap(key string, processor thrift.TProcessorFunction) {
217 p.processorMap[key] = processor
218 }
219
220 func (p *AddServiceProcessor) GetProcessorFunction(key string) (processor thrift.TProcessorFunction, ok bool) {
221 processor, ok = p.processorMap[key]
222 return processor, ok
223 }
224
225 func (p *AddServiceProcessor) ProcessorMap() map[string]thrift.TProcessorFunction {
226 return p.processorMap
227 }
228
229 func NewAddServiceProcessor(handler AddService) *AddServiceProcessor {
230
231 self4 := &AddServiceProcessor{handler: handler, processorMap: make(map[string]thrift.TProcessorFunction)}
232 self4.processorMap["Sum"] = &addServiceProcessorSum{handler: handler}
233 self4.processorMap["Concat"] = &addServiceProcessorConcat{handler: handler}
234 return self4
235 }
236
237 func (p *AddServiceProcessor) Process(iprot, oprot thrift.TProtocol) (success bool, err thrift.TException) {
238 name, _, seqId, err := iprot.ReadMessageBegin()
239 if err != nil {
240 return false, err
241 }
242 if processor, ok := p.GetProcessorFunction(name); ok {
243 return processor.Process(seqId, iprot, oprot)
244 }
245 iprot.Skip(thrift.STRUCT)
246 iprot.ReadMessageEnd()
247 x5 := thrift.NewTApplicationException(thrift.UNKNOWN_METHOD, "Unknown function "+name)
248 oprot.WriteMessageBegin(name, thrift.EXCEPTION, seqId)
249 x5.Write(oprot)
250 oprot.WriteMessageEnd()
251 oprot.Flush()
252 return false, x5
253
254 }
255
256 type addServiceProcessorSum struct {
257 handler AddService
258 }
259
260 func (p *addServiceProcessorSum) Process(seqId int32, iprot, oprot thrift.TProtocol) (success bool, err thrift.TException) {
261 args := AddServiceSumArgs{}
262 if err = args.Read(iprot); err != nil {
263 iprot.ReadMessageEnd()
264 x := thrift.NewTApplicationException(thrift.PROTOCOL_ERROR, err.Error())
265 oprot.WriteMessageBegin("Sum", thrift.EXCEPTION, seqId)
266 x.Write(oprot)
267 oprot.WriteMessageEnd()
268 oprot.Flush()
269 return false, err
270 }
271
272 iprot.ReadMessageEnd()
273 result := AddServiceSumResult{}
274 var retval *SumReply
275 var err2 error
276 if retval, err2 = p.handler.Sum(args.A, args.B); err2 != nil {
277 x := thrift.NewTApplicationException(thrift.INTERNAL_ERROR, "Internal error processing Sum: "+err2.Error())
278 oprot.WriteMessageBegin("Sum", thrift.EXCEPTION, seqId)
279 x.Write(oprot)
280 oprot.WriteMessageEnd()
281 oprot.Flush()
282 return true, err2
283 } else {
284 result.Success = retval
285 }
286 if err2 = oprot.WriteMessageBegin("Sum", thrift.REPLY, seqId); err2 != nil {
287 err = err2
288 }
289 if err2 = result.Write(oprot); err == nil && err2 != nil {
290 err = err2
291 }
292 if err2 = oprot.WriteMessageEnd(); err == nil && err2 != nil {
293 err = err2
294 }
295 if err2 = oprot.Flush(); err == nil && err2 != nil {
296 err = err2
297 }
298 if err != nil {
299 return
300 }
301 return true, err
302 }
303
304 type addServiceProcessorConcat struct {
305 handler AddService
306 }
307
308 func (p *addServiceProcessorConcat) Process(seqId int32, iprot, oprot thrift.TProtocol) (success bool, err thrift.TException) {
309 args := AddServiceConcatArgs{}
310 if err = args.Read(iprot); err != nil {
311 iprot.ReadMessageEnd()
312 x := thrift.NewTApplicationException(thrift.PROTOCOL_ERROR, err.Error())
313 oprot.WriteMessageBegin("Concat", thrift.EXCEPTION, seqId)
314 x.Write(oprot)
315 oprot.WriteMessageEnd()
316 oprot.Flush()
317 return false, err
318 }
319
320 iprot.ReadMessageEnd()
321 result := AddServiceConcatResult{}
322 var retval *ConcatReply
323 var err2 error
324 if retval, err2 = p.handler.Concat(args.A, args.B); err2 != nil {
325 x := thrift.NewTApplicationException(thrift.INTERNAL_ERROR, "Internal error processing Concat: "+err2.Error())
326 oprot.WriteMessageBegin("Concat", thrift.EXCEPTION, seqId)
327 x.Write(oprot)
328 oprot.WriteMessageEnd()
329 oprot.Flush()
330 return true, err2
331 } else {
332 result.Success = retval
333 }
334 if err2 = oprot.WriteMessageBegin("Concat", thrift.REPLY, seqId); err2 != nil {
335 err = err2
336 }
337 if err2 = result.Write(oprot); err == nil && err2 != nil {
338 err = err2
339 }
340 if err2 = oprot.WriteMessageEnd(); err == nil && err2 != nil {
341 err = err2
342 }
343 if err2 = oprot.Flush(); err == nil && err2 != nil {
344 err = err2
345 }
346 if err != nil {
347 return
348 }
349 return true, err
350 }
351
352 // HELPER FUNCTIONS AND STRUCTURES
353
354 // Attributes:
355 // - A
356 // - B
357 type AddServiceSumArgs struct {
358 A int64 `thrift:"a,1" json:"a"`
359 B int64 `thrift:"b,2" json:"b"`
360 }
361
362 func NewAddServiceSumArgs() *AddServiceSumArgs {
363 return &AddServiceSumArgs{}
364 }
365
366 func (p *AddServiceSumArgs) GetA() int64 {
367 return p.A
368 }
369
370 func (p *AddServiceSumArgs) GetB() int64 {
371 return p.B
372 }
373 func (p *AddServiceSumArgs) Read(iprot thrift.TProtocol) error {
374 if _, err := iprot.ReadStructBegin(); err != nil {
375 return thrift.PrependError(fmt.Sprintf("%T read error: ", p), err)
376 }
377
378 for {
379 _, fieldTypeId, fieldId, err := iprot.ReadFieldBegin()
380 if err != nil {
381 return thrift.PrependError(fmt.Sprintf("%T field %d read error: ", p, fieldId), err)
382 }
383 if fieldTypeId == thrift.STOP {
384 break
385 }
386 switch fieldId {
387 case 1:
388 if err := p.readField1(iprot); err != nil {
389 return err
390 }
391 case 2:
392 if err := p.readField2(iprot); err != nil {
393 return err
394 }
395 default:
396 if err := iprot.Skip(fieldTypeId); err != nil {
397 return err
398 }
399 }
400 if err := iprot.ReadFieldEnd(); err != nil {
401 return err
402 }
403 }
404 if err := iprot.ReadStructEnd(); err != nil {
405 return thrift.PrependError(fmt.Sprintf("%T read struct end error: ", p), err)
406 }
407 return nil
408 }
409
410 func (p *AddServiceSumArgs) readField1(iprot thrift.TProtocol) error {
411 if v, err := iprot.ReadI64(); err != nil {
412 return thrift.PrependError("error reading field 1: ", err)
413 } else {
414 p.A = v
415 }
416 return nil
417 }
418
419 func (p *AddServiceSumArgs) readField2(iprot thrift.TProtocol) error {
420 if v, err := iprot.ReadI64(); err != nil {
421 return thrift.PrependError("error reading field 2: ", err)
422 } else {
423 p.B = v
424 }
425 return nil
426 }
427
428 func (p *AddServiceSumArgs) Write(oprot thrift.TProtocol) error {
429 if err := oprot.WriteStructBegin("Sum_args"); err != nil {
430 return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ", p), err)
431 }
432 if err := p.writeField1(oprot); err != nil {
433 return err
434 }
435 if err := p.writeField2(oprot); err != nil {
436 return err
437 }
438 if err := oprot.WriteFieldStop(); err != nil {
439 return thrift.PrependError("write field stop error: ", err)
440 }
441 if err := oprot.WriteStructEnd(); err != nil {
442 return thrift.PrependError("write struct stop error: ", err)
443 }
444 return nil
445 }
446
447 func (p *AddServiceSumArgs) writeField1(oprot thrift.TProtocol) (err error) {
448 if err := oprot.WriteFieldBegin("a", thrift.I64, 1); err != nil {
449 return thrift.PrependError(fmt.Sprintf("%T write field begin error 1:a: ", p), err)
450 }
451 if err := oprot.WriteI64(int64(p.A)); err != nil {
452 return thrift.PrependError(fmt.Sprintf("%T.a (1) field write error: ", p), err)
453 }
454 if err := oprot.WriteFieldEnd(); err != nil {
455 return thrift.PrependError(fmt.Sprintf("%T write field end error 1:a: ", p), err)
456 }
457 return err
458 }
459
460 func (p *AddServiceSumArgs) writeField2(oprot thrift.TProtocol) (err error) {
461 if err := oprot.WriteFieldBegin("b", thrift.I64, 2); err != nil {
462 return thrift.PrependError(fmt.Sprintf("%T write field begin error 2:b: ", p), err)
463 }
464 if err := oprot.WriteI64(int64(p.B)); err != nil {
465 return thrift.PrependError(fmt.Sprintf("%T.b (2) field write error: ", p), err)
466 }
467 if err := oprot.WriteFieldEnd(); err != nil {
468 return thrift.PrependError(fmt.Sprintf("%T write field end error 2:b: ", p), err)
469 }
470 return err
471 }
472
473 func (p *AddServiceSumArgs) String() string {
474 if p == nil {
475 return "<nil>"
476 }
477 return fmt.Sprintf("AddServiceSumArgs(%+v)", *p)
478 }
479
480 // Attributes:
481 // - Success
482 type AddServiceSumResult struct {
483 Success *SumReply `thrift:"success,0" json:"success,omitempty"`
484 }
485
486 func NewAddServiceSumResult() *AddServiceSumResult {
487 return &AddServiceSumResult{}
488 }
489
490 var AddServiceSumResult_Success_DEFAULT *SumReply
491
492 func (p *AddServiceSumResult) GetSuccess() *SumReply {
493 if !p.IsSetSuccess() {
494 return AddServiceSumResult_Success_DEFAULT
495 }
496 return p.Success
497 }
498 func (p *AddServiceSumResult) IsSetSuccess() bool {
499 return p.Success != nil
500 }
501
502 func (p *AddServiceSumResult) Read(iprot thrift.TProtocol) error {
503 if _, err := iprot.ReadStructBegin(); err != nil {
504 return thrift.PrependError(fmt.Sprintf("%T read error: ", p), err)
505 }
506
507 for {
508 _, fieldTypeId, fieldId, err := iprot.ReadFieldBegin()
509 if err != nil {
510 return thrift.PrependError(fmt.Sprintf("%T field %d read error: ", p, fieldId), err)
511 }
512 if fieldTypeId == thrift.STOP {
513 break
514 }
515 switch fieldId {
516 case 0:
517 if err := p.readField0(iprot); err != nil {
518 return err
519 }
520 default:
521 if err := iprot.Skip(fieldTypeId); err != nil {
522 return err
523 }
524 }
525 if err := iprot.ReadFieldEnd(); err != nil {
526 return err
527 }
528 }
529 if err := iprot.ReadStructEnd(); err != nil {
530 return thrift.PrependError(fmt.Sprintf("%T read struct end error: ", p), err)
531 }
532 return nil
533 }
534
535 func (p *AddServiceSumResult) readField0(iprot thrift.TProtocol) error {
536 p.Success = &SumReply{}
537 if err := p.Success.Read(iprot); err != nil {
538 return thrift.PrependError(fmt.Sprintf("%T error reading struct: ", p.Success), err)
539 }
540 return nil
541 }
542
543 func (p *AddServiceSumResult) Write(oprot thrift.TProtocol) error {
544 if err := oprot.WriteStructBegin("Sum_result"); err != nil {
545 return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ", p), err)
546 }
547 if err := p.writeField0(oprot); err != nil {
548 return err
549 }
550 if err := oprot.WriteFieldStop(); err != nil {
551 return thrift.PrependError("write field stop error: ", err)
552 }
553 if err := oprot.WriteStructEnd(); err != nil {
554 return thrift.PrependError("write struct stop error: ", err)
555 }
556 return nil
557 }
558
559 func (p *AddServiceSumResult) writeField0(oprot thrift.TProtocol) (err error) {
560 if p.IsSetSuccess() {
561 if err := oprot.WriteFieldBegin("success", thrift.STRUCT, 0); err != nil {
562 return thrift.PrependError(fmt.Sprintf("%T write field begin error 0:success: ", p), err)
563 }
564 if err := p.Success.Write(oprot); err != nil {
565 return thrift.PrependError(fmt.Sprintf("%T error writing struct: ", p.Success), err)
566 }
567 if err := oprot.WriteFieldEnd(); err != nil {
568 return thrift.PrependError(fmt.Sprintf("%T write field end error 0:success: ", p), err)
569 }
570 }
571 return err
572 }
573
574 func (p *AddServiceSumResult) String() string {
575 if p == nil {
576 return "<nil>"
577 }
578 return fmt.Sprintf("AddServiceSumResult(%+v)", *p)
579 }
580
581 // Attributes:
582 // - A
583 // - B
584 type AddServiceConcatArgs struct {
585 A string `thrift:"a,1" json:"a"`
586 B string `thrift:"b,2" json:"b"`
587 }
588
589 func NewAddServiceConcatArgs() *AddServiceConcatArgs {
590 return &AddServiceConcatArgs{}
591 }
592
593 func (p *AddServiceConcatArgs) GetA() string {
594 return p.A
595 }
596
597 func (p *AddServiceConcatArgs) GetB() string {
598 return p.B
599 }
600 func (p *AddServiceConcatArgs) Read(iprot thrift.TProtocol) error {
601 if _, err := iprot.ReadStructBegin(); err != nil {
602 return thrift.PrependError(fmt.Sprintf("%T read error: ", p), err)
603 }
604
605 for {
606 _, fieldTypeId, fieldId, err := iprot.ReadFieldBegin()
607 if err != nil {
608 return thrift.PrependError(fmt.Sprintf("%T field %d read error: ", p, fieldId), err)
609 }
610 if fieldTypeId == thrift.STOP {
611 break
612 }
613 switch fieldId {
614 case 1:
615 if err := p.readField1(iprot); err != nil {
616 return err
617 }
618 case 2:
619 if err := p.readField2(iprot); err != nil {
620 return err
621 }
622 default:
623 if err := iprot.Skip(fieldTypeId); err != nil {
624 return err
625 }
626 }
627 if err := iprot.ReadFieldEnd(); err != nil {
628 return err
629 }
630 }
631 if err := iprot.ReadStructEnd(); err != nil {
632 return thrift.PrependError(fmt.Sprintf("%T read struct end error: ", p), err)
633 }
634 return nil
635 }
636
637 func (p *AddServiceConcatArgs) readField1(iprot thrift.TProtocol) error {
638 if v, err := iprot.ReadString(); err != nil {
639 return thrift.PrependError("error reading field 1: ", err)
640 } else {
641 p.A = v
642 }
643 return nil
644 }
645
646 func (p *AddServiceConcatArgs) readField2(iprot thrift.TProtocol) error {
647 if v, err := iprot.ReadString(); err != nil {
648 return thrift.PrependError("error reading field 2: ", err)
649 } else {
650 p.B = v
651 }
652 return nil
653 }
654
655 func (p *AddServiceConcatArgs) Write(oprot thrift.TProtocol) error {
656 if err := oprot.WriteStructBegin("Concat_args"); err != nil {
657 return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ", p), err)
658 }
659 if err := p.writeField1(oprot); err != nil {
660 return err
661 }
662 if err := p.writeField2(oprot); err != nil {
663 return err
664 }
665 if err := oprot.WriteFieldStop(); err != nil {
666 return thrift.PrependError("write field stop error: ", err)
667 }
668 if err := oprot.WriteStructEnd(); err != nil {
669 return thrift.PrependError("write struct stop error: ", err)
670 }
671 return nil
672 }
673
674 func (p *AddServiceConcatArgs) writeField1(oprot thrift.TProtocol) (err error) {
675 if err := oprot.WriteFieldBegin("a", thrift.STRING, 1); err != nil {
676 return thrift.PrependError(fmt.Sprintf("%T write field begin error 1:a: ", p), err)
677 }
678 if err := oprot.WriteString(string(p.A)); err != nil {
679 return thrift.PrependError(fmt.Sprintf("%T.a (1) field write error: ", p), err)
680 }
681 if err := oprot.WriteFieldEnd(); err != nil {
682 return thrift.PrependError(fmt.Sprintf("%T write field end error 1:a: ", p), err)
683 }
684 return err
685 }
686
687 func (p *AddServiceConcatArgs) writeField2(oprot thrift.TProtocol) (err error) {
688 if err := oprot.WriteFieldBegin("b", thrift.STRING, 2); err != nil {
689 return thrift.PrependError(fmt.Sprintf("%T write field begin error 2:b: ", p), err)
690 }
691 if err := oprot.WriteString(string(p.B)); err != nil {
692 return thrift.PrependError(fmt.Sprintf("%T.b (2) field write error: ", p), err)
693 }
694 if err := oprot.WriteFieldEnd(); err != nil {
695 return thrift.PrependError(fmt.Sprintf("%T write field end error 2:b: ", p), err)
696 }
697 return err
698 }
699
700 func (p *AddServiceConcatArgs) String() string {
701 if p == nil {
702 return "<nil>"
703 }
704 return fmt.Sprintf("AddServiceConcatArgs(%+v)", *p)
705 }
706
707 // Attributes:
708 // - Success
709 type AddServiceConcatResult struct {
710 Success *ConcatReply `thrift:"success,0" json:"success,omitempty"`
711 }
712
713 func NewAddServiceConcatResult() *AddServiceConcatResult {
714 return &AddServiceConcatResult{}
715 }
716
717 var AddServiceConcatResult_Success_DEFAULT *ConcatReply
718
719 func (p *AddServiceConcatResult) GetSuccess() *ConcatReply {
720 if !p.IsSetSuccess() {
721 return AddServiceConcatResult_Success_DEFAULT
722 }
723 return p.Success
724 }
725 func (p *AddServiceConcatResult) IsSetSuccess() bool {
726 return p.Success != nil
727 }
728
729 func (p *AddServiceConcatResult) Read(iprot thrift.TProtocol) error {
730 if _, err := iprot.ReadStructBegin(); err != nil {
731 return thrift.PrependError(fmt.Sprintf("%T read error: ", p), err)
732 }
733
734 for {
735 _, fieldTypeId, fieldId, err := iprot.ReadFieldBegin()
736 if err != nil {
737 return thrift.PrependError(fmt.Sprintf("%T field %d read error: ", p, fieldId), err)
738 }
739 if fieldTypeId == thrift.STOP {
740 break
741 }
742 switch fieldId {
743 case 0:
744 if err := p.readField0(iprot); err != nil {
745 return err
746 }
747 default:
748 if err := iprot.Skip(fieldTypeId); err != nil {
749 return err
750 }
751 }
752 if err := iprot.ReadFieldEnd(); err != nil {
753 return err
754 }
755 }
756 if err := iprot.ReadStructEnd(); err != nil {
757 return thrift.PrependError(fmt.Sprintf("%T read struct end error: ", p), err)
758 }
759 return nil
760 }
761
762 func (p *AddServiceConcatResult) readField0(iprot thrift.TProtocol) error {
763 p.Success = &ConcatReply{}
764 if err := p.Success.Read(iprot); err != nil {
765 return thrift.PrependError(fmt.Sprintf("%T error reading struct: ", p.Success), err)
766 }
767 return nil
768 }
769
770 func (p *AddServiceConcatResult) Write(oprot thrift.TProtocol) error {
771 if err := oprot.WriteStructBegin("Concat_result"); err != nil {
772 return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ", p), err)
773 }
774 if err := p.writeField0(oprot); err != nil {
775 return err
776 }
777 if err := oprot.WriteFieldStop(); err != nil {
778 return thrift.PrependError("write field stop error: ", err)
779 }
780