Codebase list golang-github-go-kit-kit / 6995013
Merge pull request #577 from go-kit/update-addsvc Update addsvc Peter Bourgon authored 6 years ago GitHub committed 6 years ago
26 changed file(s) with 1521 addition(s) and 1335 deletion(s). Raw diff Collapse all Expand all
1616 - mkdir -p /home/ubuntu/.go_workspace/src/github.com/go-kit
1717 - mv /home/ubuntu/kit /home/ubuntu/.go_workspace/src/github.com/go-kit
1818 - ln -s /home/ubuntu/.go_workspace/src/github.com/go-kit/kit /home/ubuntu/kit
19 - go get github.com/go-kit/kit/...
19 - go get -t github.com/go-kit/kit/...
2020 override:
2121 - go test -v -race -tags integration github.com/go-kit/kit/...:
2222 environment:
0 # addsvc
1
2 addsvc is an example microservice which takes full advantage of most of Go
3 kit's features, including both service- and transport-level middlewares,
4 speaking multiple transports simultaneously, distributed tracing, and rich
5 error definitions. The server binary is available in cmd/addsvc. The client
6 binary is available in cmd/addcli.
7
8 Finally, the addtransport package provides both server and clients for each
9 supported transport. The client structs bake-in certain middlewares, in order to
10 demonstrate the _client library pattern_. But beware: client libraries are
11 generally a bad idea, because they easily lead to the
12 [distributed monolith antipattern](https://www.microservices.com/talks/dont-build-a-distributed-monolith/).
13 If you don't _know_ you need to use one in your organization, it's probably best
14 avoided: prefer moving that logic to consumers, and relying on
15 [contract testing](https://docs.pact.io/best_practices/contract_tests_not_functional_tests.html)
16 to detect incompatibilities.
+0
-75
examples/addsvc/client/grpc/client.go less more
0 // Package grpc provides a gRPC client for the add service.
1 package grpc
2
3 import (
4 "time"
5
6 jujuratelimit "github.com/juju/ratelimit"
7 stdopentracing "github.com/opentracing/opentracing-go"
8 "github.com/sony/gobreaker"
9 "google.golang.org/grpc"
10
11 "github.com/go-kit/kit/circuitbreaker"
12 "github.com/go-kit/kit/endpoint"
13 "github.com/go-kit/kit/examples/addsvc"
14 "github.com/go-kit/kit/examples/addsvc/pb"
15 "github.com/go-kit/kit/log"
16 "github.com/go-kit/kit/ratelimit"
17 "github.com/go-kit/kit/tracing/opentracing"
18 grpctransport "github.com/go-kit/kit/transport/grpc"
19 )
20
21 // New returns an AddService backed by a gRPC client connection. It is the
22 // responsibility of the caller to dial, and later close, the connection.
23 func New(conn *grpc.ClientConn, tracer stdopentracing.Tracer, logger log.Logger) addsvc.Service {
24 // We construct a single ratelimiter middleware, to limit the total outgoing
25 // QPS from this client to all methods on the remote instance. We also
26 // construct per-endpoint circuitbreaker middlewares to demonstrate how
27 // that's done, although they could easily be combined into a single breaker
28 // for the entire remote instance, too.
29
30 limiter := ratelimit.NewTokenBucketLimiter(jujuratelimit.NewBucketWithRate(100, 100))
31
32 var sumEndpoint endpoint.Endpoint
33 {
34 sumEndpoint = grpctransport.NewClient(
35 conn,
36 "pb.Add",
37 "Sum",
38 addsvc.EncodeGRPCSumRequest,
39 addsvc.DecodeGRPCSumResponse,
40 pb.SumReply{},
41 grpctransport.ClientBefore(opentracing.ToGRPCRequest(tracer, logger)),
42 ).Endpoint()
43 sumEndpoint = opentracing.TraceClient(tracer, "Sum")(sumEndpoint)
44 sumEndpoint = limiter(sumEndpoint)
45 sumEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{
46 Name: "Sum",
47 Timeout: 30 * time.Second,
48 }))(sumEndpoint)
49 }
50
51 var concatEndpoint endpoint.Endpoint
52 {
53 concatEndpoint = grpctransport.NewClient(
54 conn,
55 "pb.Add",
56 "Concat",
57 addsvc.EncodeGRPCConcatRequest,
58 addsvc.DecodeGRPCConcatResponse,
59 pb.ConcatReply{},
60 grpctransport.ClientBefore(opentracing.ToGRPCRequest(tracer, logger)),
61 ).Endpoint()
62 concatEndpoint = opentracing.TraceClient(tracer, "Concat")(concatEndpoint)
63 concatEndpoint = limiter(concatEndpoint)
64 concatEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{
65 Name: "Concat",
66 Timeout: 30 * time.Second,
67 }))(concatEndpoint)
68 }
69
70 return addsvc.Endpoints{
71 SumEndpoint: sumEndpoint,
72 ConcatEndpoint: concatEndpoint,
73 }
74 }
+0
-86
examples/addsvc/client/http/client.go less more
0 // Package http provides an HTTP client for the add service.
1 package http
2
3 import (
4 "net/url"
5 "strings"
6 "time"
7
8 jujuratelimit "github.com/juju/ratelimit"
9 stdopentracing "github.com/opentracing/opentracing-go"
10 "github.com/sony/gobreaker"
11
12 "github.com/go-kit/kit/circuitbreaker"
13 "github.com/go-kit/kit/endpoint"
14 "github.com/go-kit/kit/examples/addsvc"
15 "github.com/go-kit/kit/log"
16 "github.com/go-kit/kit/ratelimit"
17 "github.com/go-kit/kit/tracing/opentracing"
18 httptransport "github.com/go-kit/kit/transport/http"
19 )
20
21 // New returns an AddService backed by an HTTP server living at the remote
22 // instance. We expect instance to come from a service discovery system, so
23 // likely of the form "host:port".
24 func New(instance string, tracer stdopentracing.Tracer, logger log.Logger) (addsvc.Service, error) {
25 if !strings.HasPrefix(instance, "http") {
26 instance = "http://" + instance
27 }
28 u, err := url.Parse(instance)
29 if err != nil {
30 return nil, err
31 }
32
33 // We construct a single ratelimiter middleware, to limit the total outgoing
34 // QPS from this client to all methods on the remote instance. We also
35 // construct per-endpoint circuitbreaker middlewares to demonstrate how
36 // that's done, although they could easily be combined into a single breaker
37 // for the entire remote instance, too.
38
39 limiter := ratelimit.NewTokenBucketLimiter(jujuratelimit.NewBucketWithRate(100, 100))
40
41 var sumEndpoint endpoint.Endpoint
42 {
43 sumEndpoint = httptransport.NewClient(
44 "POST",
45 copyURL(u, "/sum"),
46 addsvc.EncodeHTTPGenericRequest,
47 addsvc.DecodeHTTPSumResponse,
48 httptransport.ClientBefore(opentracing.ToHTTPRequest(tracer, logger)),
49 ).Endpoint()
50 sumEndpoint = opentracing.TraceClient(tracer, "Sum")(sumEndpoint)
51 sumEndpoint = limiter(sumEndpoint)
52 sumEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{
53 Name: "Sum",
54 Timeout: 30 * time.Second,
55 }))(sumEndpoint)
56 }
57
58 var concatEndpoint endpoint.Endpoint
59 {
60 concatEndpoint = httptransport.NewClient(
61 "POST",
62 copyURL(u, "/concat"),
63 addsvc.EncodeHTTPGenericRequest,
64 addsvc.DecodeHTTPConcatResponse,
65 httptransport.ClientBefore(opentracing.ToHTTPRequest(tracer, logger)),
66 ).Endpoint()
67 concatEndpoint = opentracing.TraceClient(tracer, "Concat")(concatEndpoint)
68 concatEndpoint = limiter(concatEndpoint)
69 concatEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{
70 Name: "Concat",
71 Timeout: 30 * time.Second,
72 }))(concatEndpoint)
73 }
74
75 return addsvc.Endpoints{
76 SumEndpoint: sumEndpoint,
77 ConcatEndpoint: concatEndpoint,
78 }, nil
79 }
80
81 func copyURL(base *url.URL, path string) *url.URL {
82 next := *base
83 next.Path = path
84 return &next
85 }
+0
-55
examples/addsvc/client/thrift/client.go less more
0 // Package thrift provides a Thrift client for the add service.
1 package thrift
2
3 import (
4 "time"
5
6 jujuratelimit "github.com/juju/ratelimit"
7 "github.com/sony/gobreaker"
8
9 "github.com/go-kit/kit/circuitbreaker"
10 "github.com/go-kit/kit/endpoint"
11 "github.com/go-kit/kit/examples/addsvc"
12 thriftadd "github.com/go-kit/kit/examples/addsvc/thrift/gen-go/addsvc"
13 "github.com/go-kit/kit/ratelimit"
14 )
15
16 // New returns an AddService backed by a Thrift server described by the provided
17 // client. The caller is responsible for constructing the client, and eventually
18 // closing the underlying transport.
19 func New(client *thriftadd.AddServiceClient) addsvc.Service {
20 // We construct a single ratelimiter middleware, to limit the total outgoing
21 // QPS from this client to all methods on the remote instance. We also
22 // construct per-endpoint circuitbreaker middlewares to demonstrate how
23 // that's done, although they could easily be combined into a single breaker
24 // for the entire remote instance, too.
25
26 limiter := ratelimit.NewTokenBucketLimiter(jujuratelimit.NewBucketWithRate(100, 100))
27
28 // Thrift does not currently have tracer bindings, so we skip tracing.
29
30 var sumEndpoint endpoint.Endpoint
31 {
32 sumEndpoint = addsvc.MakeThriftSumEndpoint(client)
33 sumEndpoint = limiter(sumEndpoint)
34 sumEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{
35 Name: "Sum",
36 Timeout: 30 * time.Second,
37 }))(sumEndpoint)
38 }
39
40 var concatEndpoint endpoint.Endpoint
41 {
42 concatEndpoint = addsvc.MakeThriftConcatEndpoint(client)
43 concatEndpoint = limiter(concatEndpoint)
44 concatEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{
45 Name: "Concat",
46 Timeout: 30 * time.Second,
47 }))(concatEndpoint)
48 }
49
50 return addsvc.Endpoints{
51 SumEndpoint: sumEndpoint,
52 ConcatEndpoint: concatEndpoint,
53 }
54 }
0 package main
1
2 import (
3 "context"
4 "flag"
5 "fmt"
6 "os"
7 "strconv"
8 "text/tabwriter"
9 "time"
10
11 "google.golang.org/grpc"
12
13 "github.com/apache/thrift/lib/go/thrift"
14 lightstep "github.com/lightstep/lightstep-tracer-go"
15 stdopentracing "github.com/opentracing/opentracing-go"
16 zipkin "github.com/openzipkin/zipkin-go-opentracing"
17 "sourcegraph.com/sourcegraph/appdash"
18 appdashot "sourcegraph.com/sourcegraph/appdash/opentracing"
19
20 "github.com/go-kit/kit/log"
21
22 "github.com/go-kit/kit/examples/addsvc/pkg/addservice"
23 "github.com/go-kit/kit/examples/addsvc/pkg/addtransport"
24 addthrift "github.com/go-kit/kit/examples/addsvc/thrift/gen-go/addsvc"
25 )
26
27 func main() {
28 // The addcli presumes no service discovery system, and expects users to
29 // provide the direct address of an addsvc. This presumption is reflected in
30 // the addcli binary and the the client packages: the -transport.addr flags
31 // and various client constructors both expect host:port strings. For an
32 // example service with a client built on top of a service discovery system,
33 // see profilesvc.
34 fs := flag.NewFlagSet("addcli", flag.ExitOnError)
35 var (
36 httpAddr = fs.String("http-addr", "", "HTTP address of addsvc")
37 grpcAddr = fs.String("grpc-addr", "", "gRPC address of addsvc")
38 thriftAddr = fs.String("thrift-addr", "", "Thrift address of addsvc")
39 thriftProtocol = fs.String("thrift-protocol", "binary", "binary, compact, json, simplejson")
40 thriftBuffer = fs.Int("thrift-buffer", 0, "0 for unbuffered")
41 thriftFramed = fs.Bool("thrift-framed", false, "true to enable framing")
42 zipkinURL = fs.String("zipkin-url", "", "Enable Zipkin tracing via a collector URL e.g. http://localhost:9411/api/v1/spans")
43 lightstepToken = flag.String("lightstep-token", "", "Enable LightStep tracing via a LightStep access token")
44 appdashAddr = flag.String("appdash-addr", "", "Enable Appdash tracing via an Appdash server host:port")
45 method = fs.String("method", "sum", "sum, concat")
46 )
47 fs.Usage = usageFor(fs, os.Args[0]+" [flags] <a> <b>")
48 fs.Parse(os.Args[1:])
49 if len(fs.Args()) != 2 {
50 fs.Usage()
51 os.Exit(1)
52 }
53
54 // This is a demonstration client, which supports multiple tracers.
55 // Your clients will probably just use one tracer.
56 var tracer stdopentracing.Tracer
57 {
58 if *zipkinURL != "" {
59 collector, err := zipkin.NewHTTPCollector(*zipkinURL)
60 if err != nil {
61 fmt.Fprintln(os.Stderr, err.Error())
62 os.Exit(1)
63 }
64 defer collector.Close()
65 var (
66 debug = false
67 hostPort = "localhost:80"
68 serviceName = "addsvc"
69 )
70 recorder := zipkin.NewRecorder(collector, debug, hostPort, serviceName)
71 tracer, err = zipkin.NewTracer(recorder)
72 if err != nil {
73 fmt.Fprintln(os.Stderr, err.Error())
74 os.Exit(1)
75 }
76 } else if *lightstepToken != "" {
77 tracer = lightstep.NewTracer(lightstep.Options{
78 AccessToken: *lightstepToken,
79 })
80 defer lightstep.FlushLightStepTracer(tracer)
81 } else if *appdashAddr != "" {
82 tracer = appdashot.NewTracer(appdash.NewRemoteCollector(*appdashAddr))
83 } else {
84 tracer = stdopentracing.GlobalTracer() // no-op
85 }
86 }
87
88 // This is a demonstration client, which supports multiple transports.
89 // Your clients will probably just define and stick with 1 transport.
90 var (
91 svc addservice.Service
92 err error
93 )
94 if *httpAddr != "" {
95 svc, err = addtransport.NewHTTPClient(*httpAddr, tracer, log.NewNopLogger())
96 } else if *grpcAddr != "" {
97 conn, err := grpc.Dial(*grpcAddr, grpc.WithInsecure(), grpc.WithTimeout(time.Second))
98 if err != nil {
99 fmt.Fprintf(os.Stderr, "error: %v", err)
100 os.Exit(1)
101 }
102 defer conn.Close()
103 svc = addtransport.NewGRPCClient(conn, tracer, log.NewNopLogger())
104 } else if *thriftAddr != "" {
105 // It's necessary to do all of this construction in the func main,
106 // because (among other reasons) we need to control the lifecycle of the
107 // Thrift transport, i.e. close it eventually.
108 var protocolFactory thrift.TProtocolFactory
109 switch *thriftProtocol {
110 case "compact":
111 protocolFactory = thrift.NewTCompactProtocolFactory()
112 case "simplejson":
113 protocolFactory = thrift.NewTSimpleJSONProtocolFactory()
114 case "json":
115 protocolFactory = thrift.NewTJSONProtocolFactory()
116 case "binary", "":
117 protocolFactory = thrift.NewTBinaryProtocolFactoryDefault()
118 default:
119 fmt.Fprintf(os.Stderr, "error: invalid protocol %q\n", *thriftProtocol)
120 os.Exit(1)
121 }
122 var transportFactory thrift.TTransportFactory
123 if *thriftBuffer > 0 {
124 transportFactory = thrift.NewTBufferedTransportFactory(*thriftBuffer)
125 } else {
126 transportFactory = thrift.NewTTransportFactory()
127 }
128 if *thriftFramed {
129 transportFactory = thrift.NewTFramedTransportFactory(transportFactory)
130 }
131 transportSocket, err := thrift.NewTSocket(*thriftAddr)
132 if err != nil {
133 fmt.Fprintf(os.Stderr, "error: %v\n", err)
134 os.Exit(1)
135 }
136 transport, err := transportFactory.GetTransport(transportSocket)
137 if err != nil {
138 fmt.Fprintf(os.Stderr, "error: %v\n", err)
139 os.Exit(1)
140 }
141 if err := transport.Open(); err != nil {
142 fmt.Fprintf(os.Stderr, "error: %v\n", err)
143 os.Exit(1)
144 }
145 defer transport.Close()
146 client := addthrift.NewAddServiceClientFactory(transport, protocolFactory)
147 svc = addtransport.NewThriftClient(client)
148 } else {
149 fmt.Fprintf(os.Stderr, "error: no remote address specified\n")
150 os.Exit(1)
151 }
152 if err != nil {
153 fmt.Fprintf(os.Stderr, "error: %v\n", err)
154 os.Exit(1)
155 }
156
157 switch *method {
158 case "sum":
159 a, _ := strconv.ParseInt(fs.Args()[0], 10, 64)
160 b, _ := strconv.ParseInt(fs.Args()[1], 10, 64)
161 v, err := svc.Sum(context.Background(), int(a), int(b))
162 if err != nil {
163 fmt.Fprintf(os.Stderr, "error: %v\n", err)
164 os.Exit(1)
165 }
166 fmt.Fprintf(os.Stdout, "%d + %d = %d\n", a, b, v)
167
168 case "concat":
169 a := fs.Args()[0]
170 b := fs.Args()[1]
171 v, err := svc.Concat(context.Background(), a, b)
172 if err != nil {
173 fmt.Fprintf(os.Stderr, "error: %v\n", err)
174 os.Exit(1)
175 }
176 fmt.Fprintf(os.Stdout, "%q + %q = %q\n", a, b, v)
177
178 default:
179 fmt.Fprintf(os.Stderr, "error: invalid method %q\n", method)
180 os.Exit(1)
181 }
182 }
183
184 func usageFor(fs *flag.FlagSet, short string) func() {
185 return func() {
186 fmt.Fprintf(os.Stderr, "USAGE\n")
187 fmt.Fprintf(os.Stderr, " %s\n", short)
188 fmt.Fprintf(os.Stderr, "\n")
189 fmt.Fprintf(os.Stderr, "FLAGS\n")
190 w := tabwriter.NewWriter(os.Stderr, 0, 2, 2, ' ', 0)
191 fs.VisitAll(func(f *flag.Flag) {
192 fmt.Fprintf(w, "\t-%s %s\t%s\n", f.Name, f.DefValue, f.Usage)
193 })
194 w.Flush()
195 fmt.Fprintf(os.Stderr, "\n")
196 }
197 }
+0
-201
examples/addsvc/cmd/addcli/main.go less more
0 package main
1
2 import (
3 "context"
4 "flag"
5 "fmt"
6 "os"
7 "strconv"
8 "strings"
9 "time"
10
11 "github.com/apache/thrift/lib/go/thrift"
12 "github.com/lightstep/lightstep-tracer-go"
13 stdopentracing "github.com/opentracing/opentracing-go"
14 zipkin "github.com/openzipkin/zipkin-go-opentracing"
15 "google.golang.org/grpc"
16 "sourcegraph.com/sourcegraph/appdash"
17 appdashot "sourcegraph.com/sourcegraph/appdash/opentracing"
18
19 "github.com/go-kit/kit/examples/addsvc"
20 grpcclient "github.com/go-kit/kit/examples/addsvc/client/grpc"
21 httpclient "github.com/go-kit/kit/examples/addsvc/client/http"
22 thriftclient "github.com/go-kit/kit/examples/addsvc/client/thrift"
23 thriftadd "github.com/go-kit/kit/examples/addsvc/thrift/gen-go/addsvc"
24 "github.com/go-kit/kit/log"
25 )
26
27 func main() {
28 // The addcli presumes no service discovery system, and expects users to
29 // provide the direct address of an addsvc. This presumption is reflected in
30 // the addcli binary and the the client packages: the -transport.addr flags
31 // and various client constructors both expect host:port strings. For an
32 // example service with a client built on top of a service discovery system,
33 // see profilesvc.
34
35 var (
36 httpAddr = flag.String("http.addr", "", "HTTP address of addsvc")
37 grpcAddr = flag.String("grpc.addr", "", "gRPC (HTTP) address of addsvc")
38 thriftAddr = flag.String("thrift.addr", "", "Thrift address of addsvc")
39 thriftProtocol = flag.String("thrift.protocol", "binary", "binary, compact, json, simplejson")
40 thriftBufferSize = flag.Int("thrift.buffer.size", 0, "0 for unbuffered")
41 thriftFramed = flag.Bool("thrift.framed", false, "true to enable framing")
42 zipkinAddr = flag.String("zipkin.addr", "", "Enable Zipkin tracing via a Zipkin HTTP Collector endpoint")
43 zipkinKafkaAddr = flag.String("zipkin.kafka.addr", "", "Enable Zipkin tracing via a Kafka server host:port")
44 appdashAddr = flag.String("appdash.addr", "", "Enable Appdash tracing via an Appdash server host:port")
45 lightstepToken = flag.String("lightstep.token", "", "Enable LightStep tracing via a LightStep access token")
46 method = flag.String("method", "sum", "sum, concat")
47 )
48 flag.Parse()
49
50 if len(flag.Args()) != 2 {
51 fmt.Fprintf(os.Stderr, "usage: addcli [flags] <a> <b>\n")
52 os.Exit(1)
53 }
54
55 // This is a demonstration client, which supports multiple tracers.
56 // Your clients will probably just use one tracer.
57 var tracer stdopentracing.Tracer
58 {
59 if *zipkinAddr != "" {
60 // endpoint typically looks like: http://zipkinhost:9411/api/v1/spans
61 collector, err := zipkin.NewHTTPCollector(*zipkinAddr)
62 if err != nil {
63 fmt.Fprintf(os.Stderr, "%v\n", err)
64 os.Exit(1)
65 }
66 defer collector.Close()
67
68 tracer, err = zipkin.NewTracer(
69 zipkin.NewRecorder(collector, false, "0.0.0.0:0", "addcli"),
70 )
71 if err != nil {
72 fmt.Fprintf(os.Stderr, "%v\n", err)
73 os.Exit(1)
74 }
75 } else if *zipkinKafkaAddr != "" {
76 collector, err := zipkin.NewKafkaCollector(
77 strings.Split(*zipkinKafkaAddr, ","),
78 zipkin.KafkaLogger(log.NewNopLogger()),
79 )
80 if err != nil {
81 fmt.Fprintf(os.Stderr, "%v\n", err)
82 os.Exit(1)
83 }
84 defer collector.Close()
85
86 tracer, err = zipkin.NewTracer(
87 zipkin.NewRecorder(collector, false, "0.0.0.0:0", "addcli"),
88 )
89 if err != nil {
90 fmt.Fprintf(os.Stderr, "%v\n", err)
91 os.Exit(1)
92 }
93 } else if *appdashAddr != "" {
94 tracer = appdashot.NewTracer(appdash.NewRemoteCollector(*appdashAddr))
95 } else if *lightstepToken != "" {
96 tracer = lightstep.NewTracer(lightstep.Options{
97 AccessToken: *lightstepToken,
98 })
99 defer lightstep.FlushLightStepTracer(tracer)
100 } else {
101 tracer = stdopentracing.GlobalTracer() // no-op
102 }
103 }
104
105 // This is a demonstration client, which supports multiple transports.
106 // Your clients will probably just define and stick with 1 transport.
107
108 var (
109 service addsvc.Service
110 err error
111 )
112 if *httpAddr != "" {
113 service, err = httpclient.New(*httpAddr, tracer, log.NewNopLogger())
114 } else if *grpcAddr != "" {
115 conn, err := grpc.Dial(*grpcAddr, grpc.WithInsecure(), grpc.WithTimeout(time.Second))
116 if err != nil {
117 fmt.Fprintf(os.Stderr, "error: %v", err)
118 os.Exit(1)
119 }
120 defer conn.Close()
121 service = grpcclient.New(conn, tracer, log.NewNopLogger())
122 } else if *thriftAddr != "" {
123 // It's necessary to do all of this construction in the func main,
124 // because (among other reasons) we need to control the lifecycle of the
125 // Thrift transport, i.e. close it eventually.
126 var protocolFactory thrift.TProtocolFactory
127 switch *thriftProtocol {
128 case "compact":
129 protocolFactory = thrift.NewTCompactProtocolFactory()
130 case "simplejson":
131 protocolFactory = thrift.NewTSimpleJSONProtocolFactory()
132 case "json":
133 protocolFactory = thrift.NewTJSONProtocolFactory()
134 case "binary", "":
135 protocolFactory = thrift.NewTBinaryProtocolFactoryDefault()
136 default:
137 fmt.Fprintf(os.Stderr, "error: invalid protocol %q\n", *thriftProtocol)
138 os.Exit(1)
139 }
140 var transportFactory thrift.TTransportFactory
141 if *thriftBufferSize > 0 {
142 transportFactory = thrift.NewTBufferedTransportFactory(*thriftBufferSize)
143 } else {
144 transportFactory = thrift.NewTTransportFactory()
145 }
146 if *thriftFramed {
147 transportFactory = thrift.NewTFramedTransportFactory(transportFactory)
148 }
149 transportSocket, err := thrift.NewTSocket(*thriftAddr)
150 if err != nil {
151 fmt.Fprintf(os.Stderr, "error: %v\n", err)
152 os.Exit(1)
153 }
154 transport, err := transportFactory.GetTransport(transportSocket)
155 if err != nil {
156 fmt.Fprintf(os.Stderr, "error: %v\n", err)
157 os.Exit(1)
158 }
159 if err := transport.Open(); err != nil {
160 fmt.Fprintf(os.Stderr, "error: %v\n", err)
161 os.Exit(1)
162 }
163 defer transport.Close()
164 client := thriftadd.NewAddServiceClientFactory(transport, protocolFactory)
165 service = thriftclient.New(client)
166 } else {
167 fmt.Fprintf(os.Stderr, "error: no remote address specified\n")
168 os.Exit(1)
169 }
170 if err != nil {
171 fmt.Fprintf(os.Stderr, "error: %v\n", err)
172 os.Exit(1)
173 }
174
175 switch *method {
176 case "sum":
177 a, _ := strconv.ParseInt(flag.Args()[0], 10, 64)
178 b, _ := strconv.ParseInt(flag.Args()[1], 10, 64)
179 v, err := service.Sum(context.Background(), int(a), int(b))
180 if err != nil {
181 fmt.Fprintf(os.Stderr, "error: %v\n", err)
182 os.Exit(1)
183 }
184 fmt.Fprintf(os.Stdout, "%d + %d = %d\n", a, b, v)
185
186 case "concat":
187 a := flag.Args()[0]
188 b := flag.Args()[1]
189 v, err := service.Concat(context.Background(), a, b)
190 if err != nil {
191 fmt.Fprintf(os.Stderr, "error: %v\n", err)
192 os.Exit(1)
193 }
194 fmt.Fprintf(os.Stdout, "%q + %q = %q\n", a, b, v)
195
196 default:
197 fmt.Fprintf(os.Stderr, "error: invalid method %q\n", method)
198 os.Exit(1)
199 }
200 }
0 package main
1
2 import (
3 "context"
4 "flag"
5 "fmt"
6 "net"
7 "net/http"
8 "os"
9 "os/signal"
10 "syscall"
11 "text/tabwriter"
12
13 "github.com/apache/thrift/lib/go/thrift"
14 lightstep "github.com/lightstep/lightstep-tracer-go"
15 "github.com/oklog/oklog/pkg/group"
16 stdopentracing "github.com/opentracing/opentracing-go"
17 zipkin "github.com/openzipkin/zipkin-go-opentracing"
18 stdprometheus "github.com/prometheus/client_golang/prometheus"
19 "github.com/prometheus/client_golang/prometheus/promhttp"
20 "google.golang.org/grpc"
21 "sourcegraph.com/sourcegraph/appdash"
22 appdashot "sourcegraph.com/sourcegraph/appdash/opentracing"
23
24 "github.com/go-kit/kit/log"
25 "github.com/go-kit/kit/metrics"
26 "github.com/go-kit/kit/metrics/prometheus"
27
28 addpb "github.com/go-kit/kit/examples/addsvc/pb"
29 "github.com/go-kit/kit/examples/addsvc/pkg/addendpoint"
30 "github.com/go-kit/kit/examples/addsvc/pkg/addservice"
31 "github.com/go-kit/kit/examples/addsvc/pkg/addtransport"
32 addthrift "github.com/go-kit/kit/examples/addsvc/thrift/gen-go/addsvc"
33 )
34
35 func main() {
36 // Define our flags. Your service probably won't need to bind listeners for
37 // *all* supported transports, or support both Zipkin and LightStep, and so
38 // on, but we do it here for demonstration purposes.
39 fs := flag.NewFlagSet("addsvc", flag.ExitOnError)
40 var (
41 debugAddr = fs.String("debug.addr", ":8080", "Debug and metrics listen address")
42 httpAddr = fs.String("http-addr", ":8081", "HTTP listen address")
43 grpcAddr = fs.String("grpc-addr", ":8082", "gRPC listen address")
44 thriftAddr = fs.String("thrift-addr", ":8083", "Thrift listen address")
45 thriftProtocol = fs.String("thrift-protocol", "binary", "binary, compact, json, simplejson")
46 thriftBuffer = fs.Int("thrift-buffer", 0, "0 for unbuffered")
47 thriftFramed = fs.Bool("thrift-framed", false, "true to enable framing")
48 zipkinURL = fs.String("zipkin-url", "", "Enable Zipkin tracing via a collector URL e.g. http://localhost:9411/api/v1/spans")
49 lightstepToken = flag.String("lightstep-token", "", "Enable LightStep tracing via a LightStep access token")
50 appdashAddr = flag.String("appdash-addr", "", "Enable Appdash tracing via an Appdash server host:port")
51 )
52 fs.Usage = usageFor(fs, os.Args[0]+" [flags]")
53 fs.Parse(os.Args[1:])
54
55 // Create a single logger, which we'll use and give to other components.
56 var logger log.Logger
57 {
58 logger = log.NewLogfmtLogger(os.Stderr)
59 logger = log.With(logger, "ts", log.DefaultTimestampUTC)
60 logger = log.With(logger, "caller", log.DefaultCaller)
61 }
62
63 // Determine which tracer to use. We'll pass the tracer to all the
64 // components that use it, as a dependency.
65 var tracer stdopentracing.Tracer
66 {
67 if *zipkinURL != "" {
68 logger.Log("tracer", "Zipkin", "URL", *zipkinURL)
69 collector, err := zipkin.NewHTTPCollector(*zipkinURL)
70 if err != nil {
71 logger.Log("err", err)
72 os.Exit(1)
73 }
74 defer collector.Close()
75 var (
76 debug = false
77 hostPort = "localhost:80"
78 serviceName = "addsvc"
79 )
80 recorder := zipkin.NewRecorder(collector, debug, hostPort, serviceName)
81 tracer, err = zipkin.NewTracer(recorder)
82 if err != nil {
83 logger.Log("err", err)
84 os.Exit(1)
85 }
86 } else if *lightstepToken != "" {
87 logger.Log("tracer", "LightStep") // probably don't want to print out the token :)
88 tracer = lightstep.NewTracer(lightstep.Options{
89 AccessToken: *lightstepToken,
90 })
91 defer lightstep.FlushLightStepTracer(tracer)
92 } else if *appdashAddr != "" {
93 logger.Log("tracer", "Appdash", "addr", *appdashAddr)
94 tracer = appdashot.NewTracer(appdash.NewRemoteCollector(*appdashAddr))
95 } else {
96 logger.Log("tracer", "none")
97 tracer = stdopentracing.GlobalTracer() // no-op
98 }
99 }
100
101 // Create the (sparse) metrics we'll use in the service. They, too, are
102 // dependencies that we pass to components that use them.
103 var ints, chars metrics.Counter
104 {
105 // Business-level metrics.
106 ints = prometheus.NewCounterFrom(stdprometheus.CounterOpts{
107 Namespace: "example",
108 Subsystem: "addsvc",
109 Name: "integers_summed",
110 Help: "Total count of integers summed via the Sum method.",
111 }, []string{})
112 chars = prometheus.NewCounterFrom(stdprometheus.CounterOpts{
113 Namespace: "example",
114 Subsystem: "addsvc",
115 Name: "characters_concatenated",
116 Help: "Total count of characters concatenated via the Concat method.",
117 }, []string{})
118 }
119 var duration metrics.Histogram
120 {
121 // Endpoint-level metrics.
122 duration = prometheus.NewSummaryFrom(stdprometheus.SummaryOpts{
123 Namespace: "example",
124 Subsystem: "addsvc",
125 Name: "request_duration_seconds",
126 Help: "Request duration in seconds.",
127 }, []string{"method", "success"})
128 }
129 http.DefaultServeMux.Handle("/metrics", promhttp.Handler())
130
131 // Build the layers of the service "onion" from the inside out. First, the
132 // business logic service; then, the set of endpoints that wrap the service;
133 // and finally, a series of concrete transport adapters. The adapters, like
134 // the HTTP handler or the gRPC server, are the bridge between Go kit and
135 // the interfaces that the transports expect. Note that we're not binding
136 // them to ports or anything yet; we'll do that next.
137 var (
138 service = addservice.New(logger, ints, chars)
139 endpoints = addendpoint.New(service, logger, duration, tracer)
140 httpHandler = addtransport.NewHTTPHandler(endpoints, tracer, logger)
141 grpcServer = addtransport.NewGRPCServer(endpoints, tracer, logger)
142 thriftServer = addtransport.NewThriftServer(context.Background(), endpoints)
143 )
144
145 // Now we're to the part of the func main where we want to start actually
146 // running things, like servers bound to listeners to receive connections.
147 //
148 // The method is the same for each component: add a new actor to the group
149 // struct, which is a combination of 2 anonymous functions: the first
150 // function actually runs the component, and the second function should
151 // interrupt the first function and cause it to return. It's in these
152 // functions that we actually bin the Go kit server/handler structs to the
153 // concrete transports and start them running.
154 //
155 // Putting each component into its own block is mostly for aesthetics: it
156 // clearly demarcates the scope in which each listener/socket may be used.
157 var g group.Group
158 {
159 // The debug listener mounts the http.DefaultServeMux, and serves up
160 // stuff like the Prometheus metrics route, the Go debug and profiling
161 // routes, and so on.
162 debugListener, err := net.Listen("tcp", *debugAddr)
163 if err != nil {
164 logger.Log("transport", "debug/HTTP", "during", "Listen", "err", err)
165 os.Exit(1)
166 }
167 g.Add(func() error {
168 logger.Log("transport", "debug/HTTP", "addr", *debugAddr)
169 return http.Serve(debugListener, http.DefaultServeMux)
170 }, func(error) {
171 debugListener.Close()
172 })
173 }
174 {
175 // The HTTP listener mounts the Go kit HTTP handler we created.
176 httpListener, err := net.Listen("tcp", *httpAddr)
177 if err != nil {
178 logger.Log("transport", "HTTP", "during", "Listen", "err", err)
179 os.Exit(1)
180 }
181 g.Add(func() error {
182 logger.Log("transport", "HTTP", "addr", *httpAddr)
183 return http.Serve(httpListener, httpHandler)
184 }, func(error) {
185 httpListener.Close()
186 })
187 }
188 {
189 // The gRPC listener mounts the Go kit gRPC server we created.
190 grpcListener, err := net.Listen("tcp", *grpcAddr)
191 if err != nil {
192 logger.Log("transport", "gRPC", "during", "Listen", "err", err)
193 os.Exit(1)
194 }
195 g.Add(func() error {
196 logger.Log("transport", "gRPC", "addr", *grpcAddr)
197 baseServer := grpc.NewServer()
198 addpb.RegisterAddServer(baseServer, grpcServer)
199 return baseServer.Serve(grpcListener)
200 }, func(error) {
201 grpcListener.Close()
202 })
203 }
204 {
205 // The Thrift socket mounts the Go kit Thrift server we created earlier.
206 // There's a lot of boilerplate involved here, related to configuring
207 // the protocol and transport; blame Thrift.
208 thriftSocket, err := thrift.NewTServerSocket(*thriftAddr)
209 if err != nil {
210 logger.Log("transport", "Thrift", "during", "Listen", "err", err)
211 os.Exit(1)
212 }
213 g.Add(func() error {
214 logger.Log("transport", "Thrift", "addr", *thriftAddr)
215 var protocolFactory thrift.TProtocolFactory
216 switch *thriftProtocol {
217 case "binary":
218 protocolFactory = thrift.NewTBinaryProtocolFactoryDefault()
219 case "compact":
220 protocolFactory = thrift.NewTCompactProtocolFactory()
221 case "json":
222 protocolFactory = thrift.NewTJSONProtocolFactory()
223 case "simplejson":
224 protocolFactory = thrift.NewTSimpleJSONProtocolFactory()
225 default:
226 return fmt.Errorf("invalid Thrift protocol %q", *thriftProtocol)
227 }
228 var transportFactory thrift.TTransportFactory
229 if *thriftBuffer > 0 {
230 transportFactory = thrift.NewTBufferedTransportFactory(*thriftBuffer)
231 } else {
232 transportFactory = thrift.NewTTransportFactory()
233 }
234 if *thriftFramed {
235 transportFactory = thrift.NewTFramedTransportFactory(transportFactory)
236 }
237 return thrift.NewTSimpleServer4(
238 addthrift.NewAddServiceProcessor(thriftServer),
239 thriftSocket,
240 transportFactory,
241 protocolFactory,
242 ).Serve()
243 }, func(error) {
244 thriftSocket.Close()
245 })
246 }
247 {
248 // This function just sits and waits for ctrl-C.
249 cancelInterrupt := make(chan struct{})
250 g.Add(func() error {
251 c := make(chan os.Signal, 1)
252 signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
253 select {
254 case sig := <-c:
255 return fmt.Errorf("received signal %s", sig)
256 case <-cancelInterrupt:
257 return nil
258 }
259 }, func(error) {
260 close(cancelInterrupt)
261 })
262 }
263 logger.Log("exit", g.Run())
264 }
265
266 func usageFor(fs *flag.FlagSet, short string) func() {
267 return func() {
268 fmt.Fprintf(os.Stderr, "USAGE\n")
269 fmt.Fprintf(os.Stderr, " %s\n", short)
270 fmt.Fprintf(os.Stderr, "\n")
271 fmt.Fprintf(os.Stderr, "FLAGS\n")
272 w := tabwriter.NewWriter(os.Stderr, 0, 2, 2, ' ', 0)
273 fs.VisitAll(func(f *flag.Flag) {
274 fmt.Fprintf(w, "\t-%s %s\t%s\n", f.Name, f.DefValue, f.Usage)
275 })
276 w.Flush()
277 fmt.Fprintf(os.Stderr, "\n")
278 }
279 }
+0
-280
examples/addsvc/cmd/addsvc/main.go less more
0 package main
1
2 import (
3 "context"
4 "flag"
5 "fmt"
6 "net"
7 "net/http"
8 "net/http/pprof"
9 "os"
10 "os/signal"
11 "strings"
12 "syscall"
13
14 "github.com/apache/thrift/lib/go/thrift"
15 lightstep "github.com/lightstep/lightstep-tracer-go"
16 stdopentracing "github.com/opentracing/opentracing-go"
17 zipkin "github.com/openzipkin/zipkin-go-opentracing"
18 stdprometheus "github.com/prometheus/client_golang/prometheus"
19 "github.com/prometheus/client_golang/prometheus/promhttp"
20 "google.golang.org/grpc"
21 "sourcegraph.com/sourcegraph/appdash"
22 appdashot "sourcegraph.com/sourcegraph/appdash/opentracing"
23
24 "github.com/go-kit/kit/endpoint"
25 "github.com/go-kit/kit/examples/addsvc"
26 "github.com/go-kit/kit/examples/addsvc/pb"
27 thriftadd "github.com/go-kit/kit/examples/addsvc/thrift/gen-go/addsvc"
28 "github.com/go-kit/kit/log"
29 "github.com/go-kit/kit/metrics"
30 "github.com/go-kit/kit/metrics/prometheus"
31 "github.com/go-kit/kit/tracing/opentracing"
32 )
33
34 func main() {
35 var (
36 debugAddr = flag.String("debug.addr", ":8080", "Debug and metrics listen address")
37 httpAddr = flag.String("http.addr", ":8081", "HTTP listen address")
38 grpcAddr = flag.String("grpc.addr", ":8082", "gRPC (HTTP) listen address")
39 thriftAddr = flag.String("thrift.addr", ":8083", "Thrift listen address")
40 thriftProtocol = flag.String("thrift.protocol", "binary", "binary, compact, json, simplejson")
41 thriftBufferSize = flag.Int("thrift.buffer.size", 0, "0 for unbuffered")
42 thriftFramed = flag.Bool("thrift.framed", false, "true to enable framing")
43 zipkinAddr = flag.String("zipkin.addr", "", "Enable Zipkin tracing via a Zipkin HTTP Collector endpoint")
44 zipkinKafkaAddr = flag.String("zipkin.kafka.addr", "", "Enable Zipkin tracing via a Kafka server host:port")
45 appdashAddr = flag.String("appdash.addr", "", "Enable Appdash tracing via an Appdash server host:port")
46 lightstepToken = flag.String("lightstep.token", "", "Enable LightStep tracing via a LightStep access token")
47 )
48 flag.Parse()
49
50 // Logging domain.
51 var logger log.Logger
52 {
53 logger = log.NewLogfmtLogger(os.Stdout)
54 logger = log.With(logger, "ts", log.DefaultTimestampUTC)
55 logger = log.With(logger, "caller", log.DefaultCaller)
56 }
57 logger.Log("msg", "hello")
58 defer logger.Log("msg", "goodbye")
59
60 // Metrics domain.
61 var ints, chars metrics.Counter
62 {
63 // Business level metrics.
64 ints = prometheus.NewCounterFrom(stdprometheus.CounterOpts{
65 Namespace: "addsvc",
66 Name: "integers_summed",
67 Help: "Total count of integers summed via the Sum method.",
68 }, []string{})
69 chars = prometheus.NewCounterFrom(stdprometheus.CounterOpts{
70 Namespace: "addsvc",
71 Name: "characters_concatenated",
72 Help: "Total count of characters concatenated via the Concat method.",
73 }, []string{})
74 }
75 var duration metrics.Histogram
76 {
77 // Transport level metrics.
78 duration = prometheus.NewSummaryFrom(stdprometheus.SummaryOpts{
79 Namespace: "addsvc",
80 Name: "request_duration_ns",
81 Help: "Request duration in nanoseconds.",
82 }, []string{"method", "success"})
83 }
84
85 // Tracing domain.
86 var tracer stdopentracing.Tracer
87 {
88 if *zipkinAddr != "" {
89 logger := log.With(logger, "tracer", "ZipkinHTTP")
90 logger.Log("addr", *zipkinAddr)
91
92 // endpoint typically looks like: http://zipkinhost:9411/api/v1/spans
93 collector, err := zipkin.NewHTTPCollector(*zipkinAddr)
94 if err != nil {
95 logger.Log("err", err)
96 os.Exit(1)
97 }
98 defer collector.Close()
99
100 tracer, err = zipkin.NewTracer(
101 zipkin.NewRecorder(collector, false, "localhost:80", "addsvc"),
102 )
103 if err != nil {
104 logger.Log("err", err)
105 os.Exit(1)
106 }
107 } else if *zipkinKafkaAddr != "" {
108 logger := log.With(logger, "tracer", "ZipkinKafka")
109 logger.Log("addr", *zipkinKafkaAddr)
110
111 collector, err := zipkin.NewKafkaCollector(
112 strings.Split(*zipkinKafkaAddr, ","),
113 zipkin.KafkaLogger(log.NewNopLogger()),
114 )
115 if err != nil {
116 logger.Log("err", err)
117 os.Exit(1)
118 }
119 defer collector.Close()
120
121 tracer, err = zipkin.NewTracer(
122 zipkin.NewRecorder(collector, false, "localhost:80", "addsvc"),
123 )
124 if err != nil {
125 logger.Log("err", err)
126 os.Exit(1)
127 }
128 } else if *appdashAddr != "" {
129 logger := log.With(logger, "tracer", "Appdash")
130 logger.Log("addr", *appdashAddr)
131 tracer = appdashot.NewTracer(appdash.NewRemoteCollector(*appdashAddr))
132 } else if *lightstepToken != "" {
133 logger := log.With(logger, "tracer", "LightStep")
134 logger.Log() // probably don't want to print out the token :)
135 tracer = lightstep.NewTracer(lightstep.Options{
136 AccessToken: *lightstepToken,
137 })
138 defer lightstep.FlushLightStepTracer(tracer)
139 } else {
140 logger := log.With(logger, "tracer", "none")
141 logger.Log()
142 tracer = stdopentracing.GlobalTracer() // no-op
143 }
144 }
145
146 // Business domain.
147 var service addsvc.Service
148 {
149 service = addsvc.NewBasicService()
150 service = addsvc.ServiceLoggingMiddleware(logger)(service)
151 service = addsvc.ServiceInstrumentingMiddleware(ints, chars)(service)
152 }
153
154 // Endpoint domain.
155 var sumEndpoint endpoint.Endpoint
156 {
157 sumDuration := duration.With("method", "Sum")
158 sumLogger := log.With(logger, "method", "Sum")
159
160 sumEndpoint = addsvc.MakeSumEndpoint(service)
161 sumEndpoint = opentracing.TraceServer(tracer, "Sum")(sumEndpoint)
162 sumEndpoint = addsvc.EndpointInstrumentingMiddleware(sumDuration)(sumEndpoint)
163 sumEndpoint = addsvc.EndpointLoggingMiddleware(sumLogger)(sumEndpoint)
164 }
165 var concatEndpoint endpoint.Endpoint
166 {
167 concatDuration := duration.With("method", "Concat")
168 concatLogger := log.With(logger, "method", "Concat")
169
170 concatEndpoint = addsvc.MakeConcatEndpoint(service)
171 concatEndpoint = opentracing.TraceServer(tracer, "Concat")(concatEndpoint)
172 concatEndpoint = addsvc.EndpointInstrumentingMiddleware(concatDuration)(concatEndpoint)
173 concatEndpoint = addsvc.EndpointLoggingMiddleware(concatLogger)(concatEndpoint)
174 }
175 endpoints := addsvc.Endpoints{
176 SumEndpoint: sumEndpoint,
177 ConcatEndpoint: concatEndpoint,
178 }
179
180 // Mechanical domain.
181 errc := make(chan error)
182 ctx := context.Background()
183
184 // Interrupt handler.
185 go func() {
186 c := make(chan os.Signal, 1)
187 signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
188 errc <- fmt.Errorf("%s", <-c)
189 }()
190
191 // Debug listener.
192 go func() {
193 logger := log.With(logger, "transport", "debug")
194
195 m := http.NewServeMux()
196 m.Handle("/debug/pprof/", http.HandlerFunc(pprof.Index))
197 m.Handle("/debug/pprof/cmdline", http.HandlerFunc(pprof.Cmdline))
198 m.Handle("/debug/pprof/profile", http.HandlerFunc(pprof.Profile))
199 m.Handle("/debug/pprof/symbol", http.HandlerFunc(pprof.Symbol))
200 m.Handle("/debug/pprof/trace", http.HandlerFunc(pprof.Trace))
201 m.Handle("/metrics", promhttp.Handler())
202
203 logger.Log("addr", *debugAddr)
204 errc <- http.ListenAndServe(*debugAddr, m)
205 }()
206
207 // HTTP transport.
208 go func() {
209 logger := log.With(logger, "transport", "HTTP")
210 h := addsvc.MakeHTTPHandler(endpoints, tracer, logger)
211 logger.Log("addr", *httpAddr)
212 errc <- http.ListenAndServe(*httpAddr, h)
213 }()
214
215 // gRPC transport.
216 go func() {
217 logger := log.With(logger, "transport", "gRPC")
218
219 ln, err := net.Listen("tcp", *grpcAddr)
220 if err != nil {
221 errc <- err
222 return
223 }
224
225 srv := addsvc.MakeGRPCServer(endpoints, tracer, logger)
226 s := grpc.NewServer()
227 pb.RegisterAddServer(s, srv)
228
229 logger.Log("addr", *grpcAddr)
230 errc <- s.Serve(ln)
231 }()
232
233 // Thrift transport.
234 go func() {
235 logger := log.With(logger, "transport", "Thrift")
236
237 var protocolFactory thrift.TProtocolFactory
238 switch *thriftProtocol {
239 case "binary":
240 protocolFactory = thrift.NewTBinaryProtocolFactoryDefault()
241 case "compact":
242 protocolFactory = thrift.NewTCompactProtocolFactory()
243 case "json":
244 protocolFactory = thrift.NewTJSONProtocolFactory()
245 case "simplejson":
246 protocolFactory = thrift.NewTSimpleJSONProtocolFactory()
247 default:
248 errc <- fmt.Errorf("invalid Thrift protocol %q", *thriftProtocol)
249 return
250 }
251
252 var transportFactory thrift.TTransportFactory
253 if *thriftBufferSize > 0 {
254 transportFactory = thrift.NewTBufferedTransportFactory(*thriftBufferSize)
255 } else {
256 transportFactory = thrift.NewTTransportFactory()
257 }
258 if *thriftFramed {
259 transportFactory = thrift.NewTFramedTransportFactory(transportFactory)
260 }
261
262 transport, err := thrift.NewTServerSocket(*thriftAddr)
263 if err != nil {
264 errc <- err
265 return
266 }
267
268 logger.Log("addr", *thriftAddr)
269 errc <- thrift.NewTSimpleServer4(
270 thriftadd.NewAddServiceProcessor(addsvc.MakeThriftHandler(ctx, endpoints)),
271 transport,
272 transportFactory,
273 protocolFactory,
274 ).Serve()
275 }()
276
277 // Run!
278 logger.Log("exit", <-errc)
279 }
0 package main
1
2 import (
3 "fmt"
4 "net/http"
5 "os"
6 "strings"
7 "testing"
8
9 "github.com/pact-foundation/pact-go/dsl"
10 )
11
12 func TestPactStringsvcUppercase(t *testing.T) {
13 if os.Getenv("WRITE_PACTS") == "" {
14 t.Skip("skipping Pact contracts; set WRITE_PACTS environment variable to enable")
15 }
16
17 pact := dsl.Pact{
18 Port: 6666,
19 Consumer: "addsvc",
20 Provider: "stringsvc",
21 }
22 defer pact.Teardown()
23
24 pact.AddInteraction().
25 UponReceiving("stringsvc uppercase").
26 WithRequest(dsl.Request{
27 Headers: map[string]string{"Content-Type": "application/json; charset=utf-8"},
28 Method: "POST",
29 Path: "/uppercase",
30 Body: `{"s":"foo"}`,
31 }).
32 WillRespondWith(dsl.Response{
33 Status: 200,
34 Headers: map[string]string{"Content-Type": "application/json; charset=utf-8"},
35 Body: `{"v":"FOO"}`,
36 })
37
38 if err := pact.Verify(func() error {
39 u := fmt.Sprintf("http://localhost:%d/uppercase", pact.Server.Port)
40 req, err := http.NewRequest("POST", u, strings.NewReader(`{"s":"foo"}`))
41 if err != nil {
42 return err
43 }
44 req.Header.Set("Content-Type", "application/json; charset=utf-8")
45 if _, err = http.DefaultClient.Do(req); err != nil {
46 return err
47 }
48 return nil
49 }); err != nil {
50 t.Fatal(err)
51 }
52
53 pact.WritePact()
54 }
0 package main
1
2 import (
3 "io/ioutil"
4 "net/http"
5 "net/http/httptest"
6 "strings"
7 "testing"
8
9 "github.com/opentracing/opentracing-go"
10
11 "github.com/go-kit/kit/log"
12 "github.com/go-kit/kit/metrics/discard"
13
14 "github.com/go-kit/kit/examples/addsvc/pkg/addendpoint"
15 "github.com/go-kit/kit/examples/addsvc/pkg/addservice"
16 "github.com/go-kit/kit/examples/addsvc/pkg/addtransport"
17 )
18
19 func TestHTTP(t *testing.T) {
20 svc := addservice.New(log.NewNopLogger(), discard.NewCounter(), discard.NewCounter())
21 eps := addendpoint.New(svc, log.NewNopLogger(), discard.NewHistogram(), opentracing.GlobalTracer())
22 mux := addtransport.NewHTTPHandler(eps, opentracing.GlobalTracer(), log.NewNopLogger())
23 srv := httptest.NewServer(mux)
24 defer srv.Close()
25
26 for _, testcase := range []struct {
27 method, url, body, want string
28 }{
29 {"GET", srv.URL + "/concat", `{"a":"1","b":"2"}`, `{"v":"12"}`},
30 {"GET", srv.URL + "/sum", `{"a":1,"b":2}`, `{"v":3}`},
31 } {
32 req, _ := http.NewRequest(testcase.method, testcase.url, strings.NewReader(testcase.body))
33 resp, _ := http.DefaultClient.Do(req)
34 body, _ := ioutil.ReadAll(resp.Body)
35 if want, have := testcase.want, strings.TrimSpace(string(body)); want != have {
36 t.Errorf("%s %s %s: want %q, have %q", testcase.method, testcase.url, testcase.body, want, have)
37 }
38 }
39 }
+0
-5
examples/addsvc/doc.go less more
0 // Package addsvc is an example microservice, useful for education. It can sum
1 // integers and concatenate strings. A client library is available in the client
2 // subdirectory. A server binary is available in cmd/addsvc. An example client
3 // binary is available in cmd/addcli.
4 package addsvc
+0
-134
examples/addsvc/endpoints.go less more
0 package addsvc
1
2 // This file contains methods to make individual endpoints from services,
3 // request and response types to serve those endpoints, as well as encoders and
4 // decoders for those types, for all of our supported transport serialization
5 // formats. It also includes endpoint middlewares.
6
7 import (
8 "context"
9 "fmt"
10 "time"
11
12 "github.com/go-kit/kit/endpoint"
13 "github.com/go-kit/kit/log"
14 "github.com/go-kit/kit/metrics"
15 )
16
17 // Endpoints collects all of the endpoints that compose an add service. It's
18 // meant to be used as a helper struct, to collect all of the endpoints into a
19 // single parameter.
20 //
21 // In a server, it's useful for functions that need to operate on a per-endpoint
22 // basis. For example, you might pass an Endpoints to a function that produces
23 // an http.Handler, with each method (endpoint) wired up to a specific path. (It
24 // is probably a mistake in design to invoke the Service methods on the
25 // Endpoints struct in a server.)
26 //
27 // In a client, it's useful to collect individually constructed endpoints into a
28 // single type that implements the Service interface. For example, you might
29 // construct individual endpoints using transport/http.NewClient, combine them
30 // into an Endpoints, and return it to the caller as a Service.
31 type Endpoints struct {
32 SumEndpoint endpoint.Endpoint
33 ConcatEndpoint endpoint.Endpoint
34 }
35
36 // Sum implements Service. Primarily useful in a client.
37 func (e Endpoints) Sum(ctx context.Context, a, b int) (int, error) {
38 request := sumRequest{A: a, B: b}
39 response, err := e.SumEndpoint(ctx, request)
40 if err != nil {
41 return 0, err
42 }
43 return response.(sumResponse).V, response.(sumResponse).Err
44 }
45
46 // Concat implements Service. Primarily useful in a client.
47 func (e Endpoints) Concat(ctx context.Context, a, b string) (string, error) {
48 request := concatRequest{A: a, B: b}
49 response, err := e.ConcatEndpoint(ctx, request)
50 if err != nil {
51 return "", err
52 }
53 return response.(concatResponse).V, response.(concatResponse).Err
54 }
55
56 // MakeSumEndpoint returns an endpoint that invokes Sum on the service.
57 // Primarily useful in a server.
58 func MakeSumEndpoint(s Service) endpoint.Endpoint {
59 return func(ctx context.Context, request interface{}) (response interface{}, err error) {
60 sumReq := request.(sumRequest)
61 v, err := s.Sum(ctx, sumReq.A, sumReq.B)
62 if err == ErrIntOverflow {
63 return nil, err // special case; see comment on ErrIntOverflow
64 }
65 return sumResponse{
66 V: v,
67 Err: err,
68 }, nil
69 }
70 }
71
72 // MakeConcatEndpoint returns an endpoint that invokes Concat on the service.
73 // Primarily useful in a server.
74 func MakeConcatEndpoint(s Service) endpoint.Endpoint {
75 return func(ctx context.Context, request interface{}) (response interface{}, err error) {
76 concatReq := request.(concatRequest)
77 v, err := s.Concat(ctx, concatReq.A, concatReq.B)
78 return concatResponse{
79 V: v,
80 Err: err,
81 }, nil
82 }
83 }
84
85 // EndpointInstrumentingMiddleware returns an endpoint middleware that records
86 // the duration of each invocation to the passed histogram. The middleware adds
87 // a single field: "success", which is "true" if no error is returned, and
88 // "false" otherwise.
89 func EndpointInstrumentingMiddleware(duration metrics.Histogram) endpoint.Middleware {
90 return func(next endpoint.Endpoint) endpoint.Endpoint {
91 return func(ctx context.Context, request interface{}) (response interface{}, err error) {
92
93 defer func(begin time.Time) {
94 duration.With("success", fmt.Sprint(err == nil)).Observe(time.Since(begin).Seconds())
95 }(time.Now())
96 return next(ctx, request)
97
98 }
99 }
100 }
101
102 // EndpointLoggingMiddleware returns an endpoint middleware that logs the
103 // duration of each invocation, and the resulting error, if any.
104 func EndpointLoggingMiddleware(logger log.Logger) endpoint.Middleware {
105 return func(next endpoint.Endpoint) endpoint.Endpoint {
106 return func(ctx context.Context, request interface{}) (response interface{}, err error) {
107
108 defer func(begin time.Time) {
109 logger.Log("error", err, "took", time.Since(begin))
110 }(time.Now())
111 return next(ctx, request)
112
113 }
114 }
115 }
116
117 // These types are unexported because they only exist to serve the endpoint
118 // domain, which is totally encapsulated in this package. They are otherwise
119 // opaque to all callers.
120
121 type sumRequest struct{ A, B int }
122
123 type sumResponse struct {
124 V int
125 Err error
126 }
127
128 type concatRequest struct{ A, B string }
129
130 type concatResponse struct {
131 V string
132 Err error
133 }
0 // Code generated by protoc-gen-go.
0 // Code generated by protoc-gen-go. DO NOT EDIT.
11 // source: addsvc.proto
2 // DO NOT EDIT!
32
43 /*
54 Package pb is a generated protocol buffer package.
4645 func (*SumRequest) ProtoMessage() {}
4746 func (*SumRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }
4847
48 func (m *SumRequest) GetA() int64 {
49 if m != nil {
50 return m.A
51 }
52 return 0
53 }
54
55 func (m *SumRequest) GetB() int64 {
56 if m != nil {
57 return m.B
58 }
59 return 0
60 }
61
4962 // The sum response contains the result of the calculation.
5063 type SumReply struct {
5164 V int64 `protobuf:"varint,1,opt,name=v" json:"v,omitempty"`
5770 func (*SumReply) ProtoMessage() {}
5871 func (*SumReply) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} }
5972
73 func (m *SumReply) GetV() int64 {
74 if m != nil {
75 return m.V
76 }
77 return 0
78 }
79
80 func (m *SumReply) GetErr() string {
81 if m != nil {
82 return m.Err
83 }
84 return ""
85 }
86
6087 // The Concat request contains two parameters.
6188 type ConcatRequest struct {
6289 A string `protobuf:"bytes,1,opt,name=a" json:"a,omitempty"`
6895 func (*ConcatRequest) ProtoMessage() {}
6996 func (*ConcatRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} }
7097
98 func (m *ConcatRequest) GetA() string {
99 if m != nil {
100 return m.A
101 }
102 return ""
103 }
104
105 func (m *ConcatRequest) GetB() string {
106 if m != nil {
107 return m.B
108 }
109 return ""
110 }
111
71112 // The Concat response contains the result of the concatenation.
72113 type ConcatReply struct {
73114 V string `protobuf:"bytes,1,opt,name=v" json:"v,omitempty"`
78119 func (m *ConcatReply) String() string { return proto.CompactTextString(m) }
79120 func (*ConcatReply) ProtoMessage() {}
80121 func (*ConcatReply) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} }
122
123 func (m *ConcatReply) GetV() string {
124 if m != nil {
125 return m.V
126 }
127 return ""
128 }
129
130 func (m *ConcatReply) GetErr() string {
131 if m != nil {
132 return m.Err
133 }
134 return ""
135 }
81136
82137 func init() {
83138 proto.RegisterType((*SumRequest)(nil), "pb.SumRequest")
199254
200255 var fileDescriptor0 = []byte{
201256 // 189 bytes of a gzipped FileDescriptorProto
202 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0xe2, 0x49, 0x4c, 0x49, 0x29,
257 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x49, 0x4c, 0x49, 0x29,
203258 0x2e, 0x4b, 0xd6, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x2a, 0x48, 0x52, 0xd2, 0xe0, 0xe2,
204259 0x0a, 0x2e, 0xcd, 0x0d, 0x4a, 0x2d, 0x2c, 0x4d, 0x2d, 0x2e, 0x11, 0xe2, 0xe1, 0x62, 0x4c, 0x94,
205260 0x60, 0x54, 0x60, 0xd4, 0x60, 0x0e, 0x62, 0x4c, 0x04, 0xf1, 0x92, 0x24, 0x98, 0x20, 0xbc, 0x24,
0 package addendpoint
1
2 import (
3 "context"
4 "fmt"
5 "time"
6
7 "github.com/go-kit/kit/endpoint"
8 "github.com/go-kit/kit/log"
9 "github.com/go-kit/kit/metrics"
10 )
11
12 // InstrumentingMiddleware returns an endpoint middleware that records
13 // the duration of each invocation to the passed histogram. The middleware adds
14 // a single field: "success", which is "true" if no error is returned, and
15 // "false" otherwise.
16 func InstrumentingMiddleware(duration metrics.Histogram) endpoint.Middleware {
17 return func(next endpoint.Endpoint) endpoint.Endpoint {
18 return func(ctx context.Context, request interface{}) (response interface{}, err error) {
19
20 defer func(begin time.Time) {
21 duration.With("success", fmt.Sprint(err == nil)).Observe(time.Since(begin).Seconds())
22 }(time.Now())
23 return next(ctx, request)
24
25 }
26 }
27 }
28
29 // LoggingMiddleware returns an endpoint middleware that logs the
30 // duration of each invocation, and the resulting error, if any.
31 func LoggingMiddleware(logger log.Logger) endpoint.Middleware {
32 return func(next endpoint.Endpoint) endpoint.Endpoint {
33 return func(ctx context.Context, request interface{}) (response interface{}, err error) {
34
35 defer func(begin time.Time) {
36 logger.Log("transport_error", err, "took", time.Since(begin))
37 }(time.Now())
38 return next(ctx, request)
39
40 }
41 }
42 }
0 package addendpoint
1
2 import (
3 "context"
4
5 rl "github.com/juju/ratelimit"
6 stdopentracing "github.com/opentracing/opentracing-go"
7 "github.com/sony/gobreaker"
8
9 "github.com/go-kit/kit/circuitbreaker"
10 "github.com/go-kit/kit/endpoint"
11 "github.com/go-kit/kit/log"
12 "github.com/go-kit/kit/metrics"
13 "github.com/go-kit/kit/ratelimit"
14 "github.com/go-kit/kit/tracing/opentracing"
15
16 "github.com/go-kit/kit/examples/addsvc/pkg/addservice"
17 )
18
19 // Set collects all of the endpoints that compose an add service. It's meant to
20 // be used as a helper struct, to collect all of the endpoints into a single
21 // parameter.
22 type Set struct {
23 SumEndpoint endpoint.Endpoint
24 ConcatEndpoint endpoint.Endpoint
25 }
26
27 // New returns a Set that wraps the provided server, and wires in all of the
28 // expected endpoint middlewares via the various parameters.
29 func New(svc addservice.Service, logger log.Logger, duration metrics.Histogram, trace stdopentracing.Tracer) Set {
30 var sumEndpoint endpoint.Endpoint
31 {
32 sumEndpoint = MakeSumEndpoint(svc)
33 sumEndpoint = ratelimit.NewTokenBucketLimiter(rl.NewBucketWithRate(1, 1))(sumEndpoint)
34 sumEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{}))(sumEndpoint)
35 sumEndpoint = opentracing.TraceServer(trace, "Sum")(sumEndpoint)
36 sumEndpoint = LoggingMiddleware(log.With(logger, "method", "Sum"))(sumEndpoint)
37 sumEndpoint = InstrumentingMiddleware(duration.With("method", "Sum"))(sumEndpoint)
38 }
39 var concatEndpoint endpoint.Endpoint
40 {
41 concatEndpoint = MakeConcatEndpoint(svc)
42 concatEndpoint = ratelimit.NewTokenBucketLimiter(rl.NewBucketWithRate(100, 100))(concatEndpoint)
43 concatEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{}))(concatEndpoint)
44 concatEndpoint = opentracing.TraceServer(trace, "Concat")(concatEndpoint)
45 concatEndpoint = LoggingMiddleware(log.With(logger, "method", "Concat"))(concatEndpoint)
46 concatEndpoint = InstrumentingMiddleware(duration.With("method", "Concat"))(concatEndpoint)
47 }
48 return Set{
49 SumEndpoint: sumEndpoint,
50 ConcatEndpoint: concatEndpoint,
51 }
52 }
53
54 // Sum implements the service interface, so Set may be used as a service.
55 // This is primarily useful in the context of a client library.
56 func (s Set) Sum(ctx context.Context, a, b int) (int, error) {
57 resp, err := s.SumEndpoint(ctx, SumRequest{A: a, B: b})
58 if err != nil {
59 return 0, err
60 }
61 response := resp.(SumResponse)
62 return response.V, response.Err
63 }
64
65 // Concat implements the service interface, so Set may be used as a
66 // service. This is primarily useful in the context of a client library.
67 func (s Set) Concat(ctx context.Context, a, b string) (string, error) {
68 resp, err := s.ConcatEndpoint(ctx, ConcatRequest{A: a, B: b})
69 if err != nil {
70 return "", err
71 }
72 response := resp.(ConcatResponse)
73 return response.V, response.Err
74 }
75
76 // MakeSumEndpoint constructs a Sum endpoint wrapping the service.
77 func MakeSumEndpoint(s addservice.Service) endpoint.Endpoint {
78 return func(ctx context.Context, request interface{}) (response interface{}, err error) {
79 req := request.(SumRequest)
80 v, err := s.Sum(ctx, req.A, req.B)
81 return SumResponse{V: v, Err: err}, nil
82 }
83 }
84
85 // MakeConcatEndpoint constructs a Concat endpoint wrapping the service.
86 func MakeConcatEndpoint(s addservice.Service) endpoint.Endpoint {
87 return func(ctx context.Context, request interface{}) (response interface{}, err error) {
88 req := request.(ConcatRequest)
89 v, err := s.Concat(ctx, req.A, req.B)
90 return ConcatResponse{V: v, Err: err}, nil
91 }
92 }
93
94 // Failer is an interface that should be implemented by response types.
95 // Response encoders can check if responses are Failer, and if so if they've
96 // failed, and if so encode them using a separate write path based on the error.
97 type Failer interface {
98 Failed() error
99 }
100
101 // SumRequest collects the request parameters for the Sum method.
102 type SumRequest struct {
103 A, B int
104 }
105
106 // SumResponse collects the response values for the Sum method.
107 type SumResponse struct {
108 V int `json:"v"`
109 Err error `json:"-"` // should be intercepted by Failed/errorEncoder
110 }
111
112 // Failed implements Failer.
113 func (r SumResponse) Failed() error { return r.Err }
114
115 // ConcatRequest collects the request parameters for the Concat method.
116 type ConcatRequest struct {
117 A, B string
118 }
119
120 // ConcatResponse collects the response values for the Concat method.
121 type ConcatResponse struct {
122 V string `json:"v"`
123 Err error `json:"-"`
124 }
125
126 // Failed implements Failer.
127 func (r ConcatResponse) Failed() error { return r.Err }
0 package addservice
1
2 import (
3 "context"
4
5 "github.com/go-kit/kit/log"
6 "github.com/go-kit/kit/metrics"
7 )
8
9 // Middleware describes a service (as opposed to endpoint) middleware.
10 type Middleware func(Service) Service
11
12 // LoggingMiddleware takes a logger as a dependency
13 // and returns a ServiceMiddleware.
14 func LoggingMiddleware(logger log.Logger) Middleware {
15 return func(next Service) Service {
16 return loggingMiddleware{logger, next}
17 }
18 }
19
20 type loggingMiddleware struct {
21 logger log.Logger
22 next Service
23 }
24
25 func (mw loggingMiddleware) Sum(ctx context.Context, a, b int) (v int, err error) {
26 defer func() {
27 mw.logger.Log("method", "Sum", "a", a, "b", b, "v", v, "err", err)
28 }()
29 return mw.next.Sum(ctx, a, b)
30 }
31
32 func (mw loggingMiddleware) Concat(ctx context.Context, a, b string) (v string, err error) {
33 defer func() {
34 mw.logger.Log("method", "Concat", "a", a, "b", b, "v", v, "err", err)
35 }()
36 return mw.next.Concat(ctx, a, b)
37 }
38
39 // InstrumentingMiddleware returns a service middleware that instruments
40 // the number of integers summed and characters concatenated over the lifetime of
41 // the service.
42 func InstrumentingMiddleware(ints, chars metrics.Counter) Middleware {
43 return func(next Service) Service {
44 return instrumentingMiddleware{
45 ints: ints,
46 chars: chars,
47 next: next,
48 }
49 }
50 }
51
52 type instrumentingMiddleware struct {
53 ints metrics.Counter
54 chars metrics.Counter
55 next Service
56 }
57
58 func (mw instrumentingMiddleware) Sum(ctx context.Context, a, b int) (int, error) {
59 v, err := mw.next.Sum(ctx, a, b)
60 mw.ints.Add(float64(v))
61 return v, err
62 }
63
64 func (mw instrumentingMiddleware) Concat(ctx context.Context, a, b string) (string, error) {
65 v, err := mw.next.Concat(ctx, a, b)
66 mw.chars.Add(float64(len(v)))
67 return v, err
68 }
0 package addservice
1
2 import (
3 "context"
4 "errors"
5
6 "github.com/go-kit/kit/log"
7 "github.com/go-kit/kit/metrics"
8 )
9
10 // Service describes a service that adds things together.
11 type Service interface {
12 Sum(ctx context.Context, a, b int) (int, error)
13 Concat(ctx context.Context, a, b string) (string, error)
14 }
15
16 // New returns a basic Service with all of the expected middlewares wired in.
17 func New(logger log.Logger, ints, chars metrics.Counter) Service {
18 var svc Service
19 {
20 svc = NewBasicService()
21 svc = LoggingMiddleware(logger)(svc)
22 svc = InstrumentingMiddleware(ints, chars)(svc)
23 }
24 return svc
25 }
26
27 var (
28 // ErrTwoZeroes is an arbitrary business rule for the Add method.
29 ErrTwoZeroes = errors.New("can't sum two zeroes")
30
31 // ErrIntOverflow protects the Add method. We've decided that this error
32 // indicates a misbehaving service and should count against e.g. circuit
33 // breakers. So, we return it directly in endpoints, to illustrate the
34 // difference. In a real service, this probably wouldn't be the case.
35 ErrIntOverflow = errors.New("integer overflow")
36
37 // ErrMaxSizeExceeded protects the Concat method.
38 ErrMaxSizeExceeded = errors.New("result exceeds maximum size")
39 )
40
41 // NewBasicService returns a naïve, stateless implementation of Service.
42 func NewBasicService() Service {
43 return basicService{}
44 }
45
46 type basicService struct{}
47
48 const (
49 intMax = 1<<31 - 1
50 intMin = -(intMax + 1)
51 maxLen = 10
52 )
53
54 func (s basicService) Sum(_ context.Context, a, b int) (int, error) {
55 if a == 0 && b == 0 {
56 return 0, ErrTwoZeroes
57 }
58 if (b > 0 && a > (intMax-b)) || (b < 0 && a < (intMin-b)) {
59 return 0, ErrIntOverflow
60 }
61 return a + b, nil
62 }
63
64 // Concat implements Service.
65 func (s basicService) Concat(_ context.Context, a, b string) (string, error) {
66 if len(a)+len(b) > maxLen {
67 return "", ErrMaxSizeExceeded
68 }
69 return a + b, nil
70 }
0 package addtransport
1
2 import (
3 "context"
4 "errors"
5 "time"
6
7 "google.golang.org/grpc"
8
9 jujuratelimit "github.com/juju/ratelimit"
10 stdopentracing "github.com/opentracing/opentracing-go"
11 "github.com/sony/gobreaker"
12 oldcontext "golang.org/x/net/context"
13
14 "github.com/go-kit/kit/circuitbreaker"
15 "github.com/go-kit/kit/endpoint"
16 "github.com/go-kit/kit/log"
17 "github.com/go-kit/kit/ratelimit"
18 "github.com/go-kit/kit/tracing/opentracing"
19 grpctransport "github.com/go-kit/kit/transport/grpc"
20
21 "github.com/go-kit/kit/examples/addsvc/pb"
22 "github.com/go-kit/kit/examples/addsvc/pkg/addendpoint"
23 "github.com/go-kit/kit/examples/addsvc/pkg/addservice"
24 )
25
26 type grpcServer struct {
27 sum grpctransport.Handler
28 concat grpctransport.Handler
29 }
30
31 // NewGRPCServer makes a set of endpoints available as a gRPC AddServer.
32 func NewGRPCServer(endpoints addendpoint.Set, tracer stdopentracing.Tracer, logger log.Logger) pb.AddServer {
33 options := []grpctransport.ServerOption{
34 grpctransport.ServerErrorLogger(logger),
35 }
36 return &grpcServer{
37 sum: grpctransport.NewServer(
38 endpoints.SumEndpoint,
39 decodeGRPCSumRequest,
40 encodeGRPCSumResponse,
41 append(options, grpctransport.ServerBefore(opentracing.FromGRPCRequest(tracer, "Sum", logger)))...,
42 ),
43 concat: grpctransport.NewServer(
44 endpoints.ConcatEndpoint,
45 decodeGRPCConcatRequest,
46 encodeGRPCConcatResponse,
47 append(options, grpctransport.ServerBefore(opentracing.FromGRPCRequest(tracer, "Concat", logger)))...,
48 ),
49 }
50 }
51
52 func (s *grpcServer) Sum(ctx oldcontext.Context, req *pb.SumRequest) (*pb.SumReply, error) {
53 _, rep, err := s.sum.ServeGRPC(ctx, req)
54 if err != nil {
55 return nil, err
56 }
57 return rep.(*pb.SumReply), nil
58 }
59
60 func (s *grpcServer) Concat(ctx oldcontext.Context, req *pb.ConcatRequest) (*pb.ConcatReply, error) {
61 _, rep, err := s.concat.ServeGRPC(ctx, req)
62 if err != nil {
63 return nil, err
64 }
65 return rep.(*pb.ConcatReply), nil
66 }
67
68 // NewGRPCClient returns an AddService backed by a gRPC server at the other end
69 // of the conn. The caller is responsible for constructing the conn, and
70 // eventually closing the underlying transport. We bake-in certain middlewares,
71 // implementing the client library pattern.
72 func NewGRPCClient(conn *grpc.ClientConn, tracer stdopentracing.Tracer, logger log.Logger) addservice.Service {
73 // We construct a single ratelimiter middleware, to limit the total outgoing
74 // QPS from this client to all methods on the remote instance. We also
75 // construct per-endpoint circuitbreaker middlewares to demonstrate how
76 // that's done, although they could easily be combined into a single breaker
77 // for the entire remote instance, too.
78 limiter := ratelimit.NewTokenBucketLimiter(jujuratelimit.NewBucketWithRate(100, 100))
79
80 // Each individual endpoint is an http/transport.Client (which implements
81 // endpoint.Endpoint) that gets wrapped with various middlewares. If you
82 // made your own client library, you'd do this work there, so your server
83 // could rely on a consistent set of client behavior.
84 var sumEndpoint endpoint.Endpoint
85 {
86 sumEndpoint = grpctransport.NewClient(
87 conn,
88 "pb.Add",
89 "Sum",
90 encodeGRPCSumRequest,
91 decodeGRPCSumResponse,
92 pb.SumReply{},
93 grpctransport.ClientBefore(opentracing.ToGRPCRequest(tracer, logger)),
94 ).Endpoint()
95 sumEndpoint = opentracing.TraceClient(tracer, "Sum")(sumEndpoint)
96 sumEndpoint = limiter(sumEndpoint)
97 sumEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{
98 Name: "Sum",
99 Timeout: 30 * time.Second,
100 }))(sumEndpoint)
101 }
102
103 // The Concat endpoint is the same thing, with slightly different
104 // middlewares to demonstrate how to specialize per-endpoint.
105 var concatEndpoint endpoint.Endpoint
106 {
107 concatEndpoint = grpctransport.NewClient(
108 conn,
109 "pb.Add",
110 "Concat",
111 encodeGRPCConcatRequest,
112 decodeGRPCConcatResponse,
113 pb.ConcatReply{},
114 grpctransport.ClientBefore(opentracing.ToGRPCRequest(tracer, logger)),
115 ).Endpoint()
116 concatEndpoint = opentracing.TraceClient(tracer, "Concat")(concatEndpoint)
117 concatEndpoint = limiter(concatEndpoint)
118 concatEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{
119 Name: "Concat",
120 Timeout: 10 * time.Second,
121 }))(concatEndpoint)
122 }
123
124 // Returning the endpoint.Set as a service.Service relies on the
125 // endpoint.Set implementing the Service methods. That's just a simple bit
126 // of glue code.
127 return addendpoint.Set{
128 SumEndpoint: sumEndpoint,
129 ConcatEndpoint: concatEndpoint,
130 }
131 }
132
133 // decodeGRPCSumRequest is a transport/grpc.DecodeRequestFunc that converts a
134 // gRPC sum request to a user-domain sum request. Primarily useful in a server.
135 func decodeGRPCSumRequest(_ context.Context, grpcReq interface{}) (interface{}, error) {
136 req := grpcReq.(*pb.SumRequest)
137 return addendpoint.SumRequest{A: int(req.A), B: int(req.B)}, nil
138 }
139
140 // decodeGRPCConcatRequest is a transport/grpc.DecodeRequestFunc that converts a
141 // gRPC concat request to a user-domain concat request. Primarily useful in a
142 // server.
143 func decodeGRPCConcatRequest(_ context.Context, grpcReq interface{}) (interface{}, error) {
144 req := grpcReq.(*pb.ConcatRequest)
145 return addendpoint.ConcatRequest{A: req.A, B: req.B}, nil
146 }
147
148 // decodeGRPCSumResponse is a transport/grpc.DecodeResponseFunc that converts a
149 // gRPC sum reply to a user-domain sum response. Primarily useful in a client.
150 func decodeGRPCSumResponse(_ context.Context, grpcReply interface{}) (interface{}, error) {
151 reply := grpcReply.(*pb.SumReply)
152 return addendpoint.SumResponse{V: int(reply.V), Err: str2err(reply.Err)}, nil
153 }
154
155 // decodeGRPCConcatResponse is a transport/grpc.DecodeResponseFunc that converts
156 // a gRPC concat reply to a user-domain concat response. Primarily useful in a
157 // client.
158 func decodeGRPCConcatResponse(_ context.Context, grpcReply interface{}) (interface{}, error) {
159 reply := grpcReply.(*pb.ConcatReply)
160 return addendpoint.ConcatResponse{V: reply.V, Err: str2err(reply.Err)}, nil
161 }
162
163 // encodeGRPCSumResponse is a transport/grpc.EncodeResponseFunc that converts a
164 // user-domain sum response to a gRPC sum reply. Primarily useful in a server.
165 func encodeGRPCSumResponse(_ context.Context, response interface{}) (interface{}, error) {
166 resp := response.(addendpoint.SumResponse)
167 return &pb.SumReply{V: int64(resp.V), Err: err2str(resp.Err)}, nil
168 }
169
170 // encodeGRPCConcatResponse is a transport/grpc.EncodeResponseFunc that converts
171 // a user-domain concat response to a gRPC concat reply. Primarily useful in a
172 // server.
173 func encodeGRPCConcatResponse(_ context.Context, response interface{}) (interface{}, error) {
174 resp := response.(addendpoint.ConcatResponse)
175 return &pb.ConcatReply{V: resp.V, Err: err2str(resp.Err)}, nil
176 }
177
178 // encodeGRPCSumRequest is a transport/grpc.EncodeRequestFunc that converts a
179 // user-domain sum request to a gRPC sum request. Primarily useful in a client.
180 func encodeGRPCSumRequest(_ context.Context, request interface{}) (interface{}, error) {
181 req := request.(addendpoint.SumRequest)
182 return &pb.SumRequest{A: int64(req.A), B: int64(req.B)}, nil
183 }
184
185 // encodeGRPCConcatRequest is a transport/grpc.EncodeRequestFunc that converts a
186 // user-domain concat request to a gRPC concat request. Primarily useful in a
187 // client.
188 func encodeGRPCConcatRequest(_ context.Context, request interface{}) (interface{}, error) {
189 req := request.(addendpoint.ConcatRequest)
190 return &pb.ConcatRequest{A: req.A, B: req.B}, nil
191 }
192
193 // These annoying helper functions are required to translate Go error types to
194 // and from strings, which is the type we use in our IDLs to represent errors.
195 // There is special casing to treat empty strings as nil errors.
196
197 func str2err(s string) error {
198 if s == "" {
199 return nil
200 }
201 return errors.New(s)
202 }
203
204 func err2str(err error) string {
205 if err == nil {
206 return ""
207 }
208 return err.Error()
209 }
0 package addtransport
1
2 import (
3 "bytes"
4 "context"
5 "encoding/json"
6 "errors"
7 "io/ioutil"
8 "net/http"
9 "net/url"
10 "strings"
11 "time"
12
13 jujuratelimit "github.com/juju/ratelimit"
14 stdopentracing "github.com/opentracing/opentracing-go"
15 "github.com/sony/gobreaker"
16
17 "github.com/go-kit/kit/circuitbreaker"
18 "github.com/go-kit/kit/endpoint"
19 "github.com/go-kit/kit/log"
20 "github.com/go-kit/kit/ratelimit"
21 "github.com/go-kit/kit/tracing/opentracing"
22 httptransport "github.com/go-kit/kit/transport/http"
23
24 "github.com/go-kit/kit/examples/addsvc/pkg/addendpoint"
25 "github.com/go-kit/kit/examples/addsvc/pkg/addservice"
26 )
27
28 // NewHTTPHandler returns an HTTP handler that makes a set of endpoints
29 // available on predefined paths.
30 func NewHTTPHandler(endpoints addendpoint.Set, tracer stdopentracing.Tracer, logger log.Logger) http.Handler {
31 options := []httptransport.ServerOption{
32 httptransport.ServerErrorEncoder(errorEncoder),
33 httptransport.ServerErrorLogger(logger),
34 }
35 m := http.NewServeMux()
36 m.Handle("/sum", httptransport.NewServer(
37 endpoints.SumEndpoint,
38 decodeHTTPSumRequest,
39 encodeHTTPGenericResponse,
40 append(options, httptransport.ServerBefore(opentracing.FromHTTPRequest(tracer, "Sum", logger)))...,
41 ))
42 m.Handle("/concat", httptransport.NewServer(
43 endpoints.ConcatEndpoint,
44 decodeHTTPConcatRequest,
45 encodeHTTPGenericResponse,
46 append(options, httptransport.ServerBefore(opentracing.FromHTTPRequest(tracer, "Concat", logger)))...,
47 ))
48 return m
49 }
50
51 // NewHTTPClient returns an AddService backed by an HTTP server living at the
52 // remote instance. We expect instance to come from a service discovery system,
53 // so likely of the form "host:port". We bake-in certain middlewares,
54 // implementing the client library pattern.
55 func NewHTTPClient(instance string, tracer stdopentracing.Tracer, logger log.Logger) (addservice.Service, error) {
56 // Quickly sanitize the instance string.
57 if !strings.HasPrefix(instance, "http") {
58 instance = "http://" + instance
59 }
60 u, err := url.Parse(instance)
61 if err != nil {
62 return nil, err
63 }
64
65 // We construct a single ratelimiter middleware, to limit the total outgoing
66 // QPS from this client to all methods on the remote instance. We also
67 // construct per-endpoint circuitbreaker middlewares to demonstrate how
68 // that's done, although they could easily be combined into a single breaker
69 // for the entire remote instance, too.
70 limiter := ratelimit.NewTokenBucketLimiter(jujuratelimit.NewBucketWithRate(100, 100))
71
72 // Each individual endpoint is an http/transport.Client (which implements
73 // endpoint.Endpoint) that gets wrapped with various middlewares. If you
74 // made your own client library, you'd do this work there, so your server
75 // could rely on a consistent set of client behavior.
76 var sumEndpoint endpoint.Endpoint
77 {
78 sumEndpoint = httptransport.NewClient(
79 "POST",
80 copyURL(u, "/sum"),
81 encodeHTTPGenericRequest,
82 decodeHTTPSumResponse,
83 httptransport.ClientBefore(opentracing.ToHTTPRequest(tracer, logger)),
84 ).Endpoint()
85 sumEndpoint = opentracing.TraceClient(tracer, "Sum")(sumEndpoint)
86 sumEndpoint = limiter(sumEndpoint)
87 sumEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{
88 Name: "Sum",
89 Timeout: 30 * time.Second,
90 }))(sumEndpoint)
91 }
92
93 // The Concat endpoint is the same thing, with slightly different
94 // middlewares to demonstrate how to specialize per-endpoint.
95 var concatEndpoint endpoint.Endpoint
96 {
97 concatEndpoint = httptransport.NewClient(
98 "POST",
99 copyURL(u, "/concat"),
100 encodeHTTPGenericRequest,
101 decodeHTTPConcatResponse,
102 httptransport.ClientBefore(opentracing.ToHTTPRequest(tracer, logger)),
103 ).Endpoint()
104 concatEndpoint = opentracing.TraceClient(tracer, "Concat")(concatEndpoint)
105 concatEndpoint = limiter(concatEndpoint)
106 concatEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{
107 Name: "Concat",
108 Timeout: 10 * time.Second,
109 }))(concatEndpoint)
110 }
111
112 // Returning the endpoint.Set as a service.Service relies on the
113 // endpoint.Set implementing the Service methods. That's just a simple bit
114 // of glue code.
115 return addendpoint.Set{
116 SumEndpoint: sumEndpoint,
117 ConcatEndpoint: concatEndpoint,
118 }, nil
119 }
120
121 func copyURL(base *url.URL, path string) *url.URL {
122 next := *base
123 next.Path = path
124 return &next
125 }
126
127 func errorEncoder(_ context.Context, err error, w http.ResponseWriter) {
128 w.WriteHeader(err2code(err))
129 json.NewEncoder(w).Encode(errorWrapper{Error: err.Error()})
130 }
131
132 func err2code(err error) int {
133 switch err {
134 case addservice.ErrTwoZeroes, addservice.ErrMaxSizeExceeded, addservice.ErrIntOverflow:
135 return http.StatusBadRequest
136 }
137 return http.StatusInternalServerError
138 }
139
140 func errorDecoder(r *http.Response) error {
141 var w errorWrapper
142 if err := json.NewDecoder(r.Body).Decode(&w); err != nil {
143 return err
144 }
145 return errors.New(w.Error)
146 }
147
148 type errorWrapper struct {
149 Error string `json:"error"`
150 }
151
152 // decodeHTTPSumRequest is a transport/http.DecodeRequestFunc that decodes a
153 // JSON-encoded sum request from the HTTP request body. Primarily useful in a
154 // server.
155 func decodeHTTPSumRequest(_ context.Context, r *http.Request) (interface{}, error) {
156 var req addendpoint.SumRequest
157 err := json.NewDecoder(r.Body).Decode(&req)
158 return req, err
159 }
160
161 // decodeHTTPConcatRequest is a transport/http.DecodeRequestFunc that decodes a
162 // JSON-encoded concat request from the HTTP request body. Primarily useful in a
163 // server.
164 func decodeHTTPConcatRequest(_ context.Context, r *http.Request) (interface{}, error) {
165 var req addendpoint.ConcatRequest
166 err := json.NewDecoder(r.Body).Decode(&req)
167 return req, err
168 }
169
170 // decodeHTTPSumResponse is a transport/http.DecodeResponseFunc that decodes a
171 // JSON-encoded sum response from the HTTP response body. If the response has a
172 // non-200 status code, we will interpret that as an error and attempt to decode
173 // the specific error message from the response body. Primarily useful in a
174 // client.
175 func decodeHTTPSumResponse(_ context.Context, r *http.Response) (interface{}, error) {
176 if r.StatusCode != http.StatusOK {
177 return nil, errors.New(r.Status)
178 }
179 var resp addendpoint.SumResponse
180 err := json.NewDecoder(r.Body).Decode(&resp)
181 return resp, err
182 }
183
184 // decodeHTTPConcatResponse is a transport/http.DecodeResponseFunc that decodes
185 // a JSON-encoded concat response from the HTTP response body. If the response
186 // has a non-200 status code, we will interpret that as an error and attempt to
187 // decode the specific error message from the response body. Primarily useful in
188 // a client.
189 func decodeHTTPConcatResponse(_ context.Context, r *http.Response) (interface{}, error) {
190 if r.StatusCode != http.StatusOK {
191 return nil, errors.New(r.Status)
192 }
193 var resp addendpoint.ConcatResponse
194 err := json.NewDecoder(r.Body).Decode(&resp)
195 return resp, err
196 }
197
198 // encodeHTTPGenericRequest is a transport/http.EncodeRequestFunc that
199 // JSON-encodes any request to the request body. Primarily useful in a client.
200 func encodeHTTPGenericRequest(_ context.Context, r *http.Request, request interface{}) error {
201 var buf bytes.Buffer
202 if err := json.NewEncoder(&buf).Encode(request); err != nil {
203 return err
204 }
205 r.Body = ioutil.NopCloser(&buf)
206 return nil
207 }
208
209 // encodeHTTPGenericResponse is a transport/http.EncodeResponseFunc that encodes
210 // the response as JSON to the response writer. Primarily useful in a server.
211 func encodeHTTPGenericResponse(ctx context.Context, w http.ResponseWriter, response interface{}) error {
212 if f, ok := response.(addendpoint.Failer); ok && f.Failed() != nil {
213 errorEncoder(ctx, f.Failed(), w)
214 return nil
215 }
216 w.Header().Set("Content-Type", "application/json; charset=utf-8")
217 return json.NewEncoder(w).Encode(response)
218 }
0 package addtransport
1
2 import (
3 "context"
4 "time"
5
6 jujuratelimit "github.com/juju/ratelimit"
7 "github.com/sony/gobreaker"
8
9 "github.com/go-kit/kit/circuitbreaker"
10 "github.com/go-kit/kit/endpoint"
11 "github.com/go-kit/kit/ratelimit"
12
13 "github.com/go-kit/kit/examples/addsvc/pkg/addendpoint"
14 "github.com/go-kit/kit/examples/addsvc/pkg/addservice"
15 addthrift "github.com/go-kit/kit/examples/addsvc/thrift/gen-go/addsvc"
16 )
17
18 type thriftServer struct {
19 ctx context.Context
20 endpoints addendpoint.Set
21 }
22
23 // NewThriftServer makes a set of endpoints available as a Thrift service.
24 func NewThriftServer(ctx context.Context, endpoints addendpoint.Set) addthrift.AddService {
25 return &thriftServer{
26 ctx: ctx,
27 endpoints: endpoints,
28 }
29 }
30
31 func (s *thriftServer) Sum(a int64, b int64) (*addthrift.SumReply, error) {
32 request := addendpoint.SumRequest{A: int(a), B: int(b)}
33 response, err := s.endpoints.SumEndpoint(s.ctx, request)
34 if err != nil {
35 return nil, err
36 }
37 resp := response.(addendpoint.SumResponse)
38 return &addthrift.SumReply{Value: int64(resp.V), Err: err2str(resp.Err)}, nil
39 }
40
41 func (s *thriftServer) Concat(a string, b string) (*addthrift.ConcatReply, error) {
42 request := addendpoint.ConcatRequest{A: a, B: b}
43 response, err := s.endpoints.ConcatEndpoint(s.ctx, request)
44 if err != nil {
45 return nil, err
46 }
47 resp := response.(addendpoint.ConcatResponse)
48 return &addthrift.ConcatReply{Value: resp.V, Err: err2str(resp.Err)}, nil
49 }
50
51 // NewThriftClient returns an AddService backed by a Thrift server described by
52 // the provided client. The caller is responsible for constructing the client,
53 // and eventually closing the underlying transport. We bake-in certain middlewares,
54 // implementing the client library pattern.
55 func NewThriftClient(client *addthrift.AddServiceClient) addservice.Service {
56 // We construct a single ratelimiter middleware, to limit the total outgoing
57 // QPS from this client to all methods on the remote instance. We also
58 // construct per-endpoint circuitbreaker middlewares to demonstrate how
59 // that's done, although they could easily be combined into a single breaker
60 // for the entire remote instance, too.
61 limiter := ratelimit.NewTokenBucketLimiter(jujuratelimit.NewBucketWithRate(100, 100))
62
63 // Each individual endpoint is an http/transport.Client (which implements
64 // endpoint.Endpoint) that gets wrapped with various middlewares. If you
65 // could rely on a consistent set of client behavior.
66 var sumEndpoint endpoint.Endpoint
67 {
68 sumEndpoint = MakeThriftSumEndpoint(client)
69 sumEndpoint = limiter(sumEndpoint)
70 sumEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{
71 Name: "Sum",
72 Timeout: 30 * time.Second,
73 }))(sumEndpoint)
74 }
75
76 // The Concat endpoint is the same thing, with slightly different
77 // middlewares to demonstrate how to specialize per-endpoint.
78 var concatEndpoint endpoint.Endpoint
79 {
80 concatEndpoint = MakeThriftConcatEndpoint(client)
81 concatEndpoint = limiter(concatEndpoint)
82 concatEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{
83 Name: "Concat",
84 Timeout: 10 * time.Second,
85 }))(concatEndpoint)
86 }
87
88 // Returning the endpoint.Set as a service.Service relies on the
89 // endpoint.Set implementing the Service methods. That's just a simple bit
90 // of glue code.
91 return addendpoint.Set{
92 SumEndpoint: sumEndpoint,
93 ConcatEndpoint: concatEndpoint,
94 }
95 }
96
97 // MakeThriftSumEndpoint returns an endpoint that invokes the passed Thrift client.
98 // Useful only in clients, and only until a proper transport/thrift.Client exists.
99 func MakeThriftSumEndpoint(client *addthrift.AddServiceClient) endpoint.Endpoint {
100 return func(ctx context.Context, request interface{}) (interface{}, error) {
101 req := request.(addendpoint.SumRequest)
102 reply, err := client.Sum(int64(req.A), int64(req.B))
103 if err == addservice.ErrIntOverflow {
104 return nil, err // special case; see comment on ErrIntOverflow
105 }
106 return addendpoint.SumResponse{V: int(reply.Value), Err: err}, nil
107 }
108 }
109
110 // MakeThriftConcatEndpoint returns an endpoint that invokes the passed Thrift
111 // client. Useful only in clients, and only until a proper
112 // transport/thrift.Client exists.
113 func MakeThriftConcatEndpoint(client *addthrift.AddServiceClient) endpoint.Endpoint {
114 return func(ctx context.Context, request interface{}) (interface{}, error) {
115 req := request.(addendpoint.ConcatRequest)
116 reply, err := client.Concat(req.A, req.B)
117 return addendpoint.ConcatResponse{V: reply.Value, Err: err}, nil
118 }
119 }
+0
-163
examples/addsvc/service.go less more
0 package addsvc
1
2 // This file contains the Service definition, and a basic service
3 // implementation. It also includes service middlewares.
4
5 import (
6 "context"
7 "errors"
8 "time"
9
10 "github.com/go-kit/kit/log"
11 "github.com/go-kit/kit/metrics"
12 )
13
14 // Service describes a service that adds things together.
15 type Service interface {
16 Sum(ctx context.Context, a, b int) (int, error)
17 Concat(ctx context.Context, a, b string) (string, error)
18 }
19
20 // Business-domain errors like these may be served in two ways: returned
21 // directly by endpoints, or bundled into the response struct. Both methods can
22 // be made to work, but errors returned directly by endpoints are counted by
23 // middlewares that check errors, like circuit breakers.
24 //
25 // If you don't want that behavior -- and you probably don't -- then it's better
26 // to bundle errors into the response struct.
27
28 var (
29 // ErrTwoZeroes is an arbitrary business rule for the Add method.
30 ErrTwoZeroes = errors.New("can't sum two zeroes")
31
32 // ErrIntOverflow protects the Add method. We've decided that this error
33 // indicates a misbehaving service and should count against e.g. circuit
34 // breakers. So, we return it directly in endpoints, to illustrate the
35 // difference. In a real service, this probably wouldn't be the case.
36 ErrIntOverflow = errors.New("integer overflow")
37
38 // ErrMaxSizeExceeded protects the Concat method.
39 ErrMaxSizeExceeded = errors.New("result exceeds maximum size")
40 )
41
42 // These annoying helper functions are required to translate Go error types to
43 // and from strings, which is the type we use in our IDLs to represent errors.
44 // There is special casing to treat empty strings as nil errors.
45
46 func str2err(s string) error {
47 if s == "" {
48 return nil
49 }
50 return errors.New(s)
51 }
52
53 func err2str(err error) string {
54 if err == nil {
55 return ""
56 }
57 return err.Error()
58 }
59
60 // NewBasicService returns a naïve, stateless implementation of Service.
61 func NewBasicService() Service {
62 return basicService{}
63 }
64
65 type basicService struct{}
66
67 const (
68 intMax = 1<<31 - 1
69 intMin = -(intMax + 1)
70 maxLen = 102400
71 )
72
73 // Sum implements Service.
74 func (s basicService) Sum(_ context.Context, a, b int) (int, error) {
75 if a == 0 && b == 0 {
76 return 0, ErrTwoZeroes
77 }
78 if (b > 0 && a > (intMax-b)) || (b < 0 && a < (intMin-b)) {
79 return 0, ErrIntOverflow
80 }
81 return a + b, nil
82 }
83
84 // Concat implements Service.
85 func (s basicService) Concat(_ context.Context, a, b string) (string, error) {
86 if len(a)+len(b) > maxLen {
87 return "", ErrMaxSizeExceeded
88 }
89 return a + b, nil
90 }
91
92 // Middleware describes a service (as opposed to endpoint) middleware.
93 type Middleware func(Service) Service
94
95 // ServiceLoggingMiddleware returns a service middleware that logs the
96 // parameters and result of each method invocation.
97 func ServiceLoggingMiddleware(logger log.Logger) Middleware {
98 return func(next Service) Service {
99 return serviceLoggingMiddleware{
100 logger: logger,
101 next: next,
102 }
103 }
104 }
105
106 type serviceLoggingMiddleware struct {
107 logger log.Logger
108 next Service
109 }
110
111 func (mw serviceLoggingMiddleware) Sum(ctx context.Context, a, b int) (v int, err error) {
112 defer func(begin time.Time) {
113 mw.logger.Log(
114 "method", "Sum",
115 "a", a, "b", b, "result", v, "error", err,
116 "took", time.Since(begin),
117 )
118 }(time.Now())
119 return mw.next.Sum(ctx, a, b)
120 }
121
122 func (mw serviceLoggingMiddleware) Concat(ctx context.Context, a, b string) (v string, err error) {
123 defer func(begin time.Time) {
124 mw.logger.Log(
125 "method", "Concat",
126 "a", a, "b", b, "result", v, "error", err,
127 "took", time.Since(begin),
128 )
129 }(time.Now())
130 return mw.next.Concat(ctx, a, b)
131 }
132
133 // ServiceInstrumentingMiddleware returns a service middleware that instruments
134 // the number of integers summed and characters concatenated over the lifetime of
135 // the service.
136 func ServiceInstrumentingMiddleware(ints, chars metrics.Counter) Middleware {
137 return func(next Service) Service {
138 return serviceInstrumentingMiddleware{
139 ints: ints,
140 chars: chars,
141 next: next,
142 }
143 }
144 }
145
146 type serviceInstrumentingMiddleware struct {
147 ints metrics.Counter
148 chars metrics.Counter
149 next Service
150 }
151
152 func (mw serviceInstrumentingMiddleware) Sum(ctx context.Context, a, b int) (int, error) {
153 v, err := mw.next.Sum(ctx, a, b)
154 mw.ints.Add(float64(v))
155 return v, err
156 }
157
158 func (mw serviceInstrumentingMiddleware) Concat(ctx context.Context, a, b string) (string, error) {
159 v, err := mw.next.Concat(ctx, a, b)
160 mw.chars.Add(float64(len(v)))
161 return v, err
162 }
+0
-118
examples/addsvc/transport_grpc.go less more
0 package addsvc
1
2 // This file provides server-side bindings for the gRPC transport.
3 // It utilizes the transport/grpc.Server.
4
5 import (
6 "context"
7
8 stdopentracing "github.com/opentracing/opentracing-go"
9 oldcontext "golang.org/x/net/context"
10
11 "github.com/go-kit/kit/examples/addsvc/pb"
12 "github.com/go-kit/kit/log"
13 "github.com/go-kit/kit/tracing/opentracing"
14 grpctransport "github.com/go-kit/kit/transport/grpc"
15 )
16
17 // MakeGRPCServer makes a set of endpoints available as a gRPC AddServer.
18 func MakeGRPCServer(endpoints Endpoints, tracer stdopentracing.Tracer, logger log.Logger) pb.AddServer {
19 options := []grpctransport.ServerOption{
20 grpctransport.ServerErrorLogger(logger),
21 }
22 return &grpcServer{
23 sum: grpctransport.NewServer(
24 endpoints.SumEndpoint,
25 DecodeGRPCSumRequest,
26 EncodeGRPCSumResponse,
27 append(options, grpctransport.ServerBefore(opentracing.FromGRPCRequest(tracer, "Sum", logger)))...,
28 ),
29 concat: grpctransport.NewServer(
30 endpoints.ConcatEndpoint,
31 DecodeGRPCConcatRequest,
32 EncodeGRPCConcatResponse,
33 append(options, grpctransport.ServerBefore(opentracing.FromGRPCRequest(tracer, "Concat", logger)))...,
34 ),
35 }
36 }
37
38 type grpcServer struct {
39 sum grpctransport.Handler
40 concat grpctransport.Handler
41 }
42
43 func (s *grpcServer) Sum(ctx oldcontext.Context, req *pb.SumRequest) (*pb.SumReply, error) {
44 _, rep, err := s.sum.ServeGRPC(ctx, req)
45 if err != nil {
46 return nil, err
47 }
48 return rep.(*pb.SumReply), nil
49 }
50
51 func (s *grpcServer) Concat(ctx oldcontext.Context, req *pb.ConcatRequest) (*pb.ConcatReply, error) {
52 _, rep, err := s.concat.ServeGRPC(ctx, req)
53 if err != nil {
54 return nil, err
55 }
56 return rep.(*pb.ConcatReply), nil
57 }
58
59 // DecodeGRPCSumRequest is a transport/grpc.DecodeRequestFunc that converts a
60 // gRPC sum request to a user-domain sum request. Primarily useful in a server.
61 func DecodeGRPCSumRequest(_ context.Context, grpcReq interface{}) (interface{}, error) {
62 req := grpcReq.(*pb.SumRequest)
63 return sumRequest{A: int(req.A), B: int(req.B)}, nil
64 }
65
66 // DecodeGRPCConcatRequest is a transport/grpc.DecodeRequestFunc that converts a
67 // gRPC concat request to a user-domain concat request. Primarily useful in a
68 // server.
69 func DecodeGRPCConcatRequest(_ context.Context, grpcReq interface{}) (interface{}, error) {
70 req := grpcReq.(*pb.ConcatRequest)
71 return concatRequest{A: req.A, B: req.B}, nil
72 }
73
74 // DecodeGRPCSumResponse is a transport/grpc.DecodeResponseFunc that converts a
75 // gRPC sum reply to a user-domain sum response. Primarily useful in a client.
76 func DecodeGRPCSumResponse(_ context.Context, grpcReply interface{}) (interface{}, error) {
77 reply := grpcReply.(*pb.SumReply)
78 return sumResponse{V: int(reply.V), Err: str2err(reply.Err)}, nil
79 }
80
81 // DecodeGRPCConcatResponse is a transport/grpc.DecodeResponseFunc that converts
82 // a gRPC concat reply to a user-domain concat response. Primarily useful in a
83 // client.
84 func DecodeGRPCConcatResponse(_ context.Context, grpcReply interface{}) (interface{}, error) {
85 reply := grpcReply.(*pb.ConcatReply)
86 return concatResponse{V: reply.V, Err: str2err(reply.Err)}, nil
87 }
88
89 // EncodeGRPCSumResponse is a transport/grpc.EncodeResponseFunc that converts a
90 // user-domain sum response to a gRPC sum reply. Primarily useful in a server.
91 func EncodeGRPCSumResponse(_ context.Context, response interface{}) (interface{}, error) {
92 resp := response.(sumResponse)
93 return &pb.SumReply{V: int64(resp.V), Err: err2str(resp.Err)}, nil
94 }
95
96 // EncodeGRPCConcatResponse is a transport/grpc.EncodeResponseFunc that converts
97 // a user-domain concat response to a gRPC concat reply. Primarily useful in a
98 // server.
99 func EncodeGRPCConcatResponse(_ context.Context, response interface{}) (interface{}, error) {
100 resp := response.(concatResponse)
101 return &pb.ConcatReply{V: resp.V, Err: err2str(resp.Err)}, nil
102 }
103
104 // EncodeGRPCSumRequest is a transport/grpc.EncodeRequestFunc that converts a
105 // user-domain sum request to a gRPC sum request. Primarily useful in a client.
106 func EncodeGRPCSumRequest(_ context.Context, request interface{}) (interface{}, error) {
107 req := request.(sumRequest)
108 return &pb.SumRequest{A: int64(req.A), B: int64(req.B)}, nil
109 }
110
111 // EncodeGRPCConcatRequest is a transport/grpc.EncodeRequestFunc that converts a
112 // user-domain concat request to a gRPC concat request. Primarily useful in a
113 // client.
114 func EncodeGRPCConcatRequest(_ context.Context, request interface{}) (interface{}, error) {
115 req := request.(concatRequest)
116 return &pb.ConcatRequest{A: req.A, B: req.B}, nil
117 }
+0
-130
examples/addsvc/transport_http.go less more
0 package addsvc
1
2 // This file provides server-side bindings for the HTTP transport.
3 // It utilizes the transport/http.Server.
4
5 import (
6 "bytes"
7 "context"
8 "encoding/json"
9 "errors"
10 "io/ioutil"
11 "net/http"
12
13 stdopentracing "github.com/opentracing/opentracing-go"
14
15 "github.com/go-kit/kit/log"
16 "github.com/go-kit/kit/tracing/opentracing"
17 httptransport "github.com/go-kit/kit/transport/http"
18 )
19
20 // MakeHTTPHandler returns a handler that makes a set of endpoints available
21 // on predefined paths.
22 func MakeHTTPHandler(endpoints Endpoints, tracer stdopentracing.Tracer, logger log.Logger) http.Handler {
23 options := []httptransport.ServerOption{
24 httptransport.ServerErrorEncoder(errorEncoder),
25 httptransport.ServerErrorLogger(logger),
26 }
27 m := http.NewServeMux()
28 m.Handle("/sum", httptransport.NewServer(
29 endpoints.SumEndpoint,
30 DecodeHTTPSumRequest,
31 EncodeHTTPGenericResponse,
32 append(options, httptransport.ServerBefore(opentracing.FromHTTPRequest(tracer, "Sum", logger)))...,
33 ))
34 m.Handle("/concat", httptransport.NewServer(
35 endpoints.ConcatEndpoint,
36 DecodeHTTPConcatRequest,
37 EncodeHTTPGenericResponse,
38 append(options, httptransport.ServerBefore(opentracing.FromHTTPRequest(tracer, "Concat", logger)))...,
39 ))
40 return m
41 }
42
43 func errorEncoder(_ context.Context, err error, w http.ResponseWriter) {
44 code := http.StatusInternalServerError
45 msg := err.Error()
46
47 switch err {
48 case ErrTwoZeroes, ErrMaxSizeExceeded, ErrIntOverflow:
49 code = http.StatusBadRequest
50 }
51
52 w.WriteHeader(code)
53 json.NewEncoder(w).Encode(errorWrapper{Error: msg})
54 }
55
56 func errorDecoder(r *http.Response) error {
57 var w errorWrapper
58 if err := json.NewDecoder(r.Body).Decode(&w); err != nil {
59 return err
60 }
61 return errors.New(w.Error)
62 }
63
64 type errorWrapper struct {
65 Error string `json:"error"`
66 }
67
68 // DecodeHTTPSumRequest is a transport/http.DecodeRequestFunc that decodes a
69 // JSON-encoded sum request from the HTTP request body. Primarily useful in a
70 // server.
71 func DecodeHTTPSumRequest(_ context.Context, r *http.Request) (interface{}, error) {
72 var req sumRequest
73 err := json.NewDecoder(r.Body).Decode(&req)
74 return req, err
75 }
76
77 // DecodeHTTPConcatRequest is a transport/http.DecodeRequestFunc that decodes a
78 // JSON-encoded concat request from the HTTP request body. Primarily useful in a
79 // server.
80 func DecodeHTTPConcatRequest(_ context.Context, r *http.Request) (interface{}, error) {
81 var req concatRequest
82 err := json.NewDecoder(r.Body).Decode(&req)
83 return req, err
84 }
85
86 // DecodeHTTPSumResponse is a transport/http.DecodeResponseFunc that decodes a
87 // JSON-encoded sum response from the HTTP response body. If the response has a
88 // non-200 status code, we will interpret that as an error and attempt to decode
89 // the specific error message from the response body. Primarily useful in a
90 // client.
91 func DecodeHTTPSumResponse(_ context.Context, r *http.Response) (interface{}, error) {
92 if r.StatusCode != http.StatusOK {
93 return nil, errorDecoder(r)
94 }
95 var resp sumResponse
96 err := json.NewDecoder(r.Body).Decode(&resp)
97 return resp, err
98 }
99
100 // DecodeHTTPConcatResponse is a transport/http.DecodeResponseFunc that decodes
101 // a JSON-encoded concat response from the HTTP response body. If the response
102 // has a non-200 status code, we will interpret that as an error and attempt to
103 // decode the specific error message from the response body. Primarily useful in
104 // a client.
105 func DecodeHTTPConcatResponse(_ context.Context, r *http.Response) (interface{}, error) {
106 if r.StatusCode != http.StatusOK {
107 return nil, errorDecoder(r)
108 }
109 var resp concatResponse
110 err := json.NewDecoder(r.Body).Decode(&resp)
111 return resp, err
112 }
113
114 // EncodeHTTPGenericRequest is a transport/http.EncodeRequestFunc that
115 // JSON-encodes any request to the request body. Primarily useful in a client.
116 func EncodeHTTPGenericRequest(_ context.Context, r *http.Request, request interface{}) error {
117 var buf bytes.Buffer
118 if err := json.NewEncoder(&buf).Encode(request); err != nil {
119 return err
120 }
121 r.Body = ioutil.NopCloser(&buf)
122 return nil
123 }
124
125 // EncodeHTTPGenericResponse is a transport/http.EncodeResponseFunc that encodes
126 // the response as JSON to the response writer. Primarily useful in a server.
127 func EncodeHTTPGenericResponse(_ context.Context, w http.ResponseWriter, response interface{}) error {
128 return json.NewEncoder(w).Encode(response)
129 }
+0
-73
examples/addsvc/transport_thrift.go less more
0 package addsvc
1
2 // This file provides server-side bindings for the Thrift transport.
3 //
4 // This file also provides endpoint constructors that utilize a Thrift client,
5 // for use in client packages, because package transport/thrift doesn't exist
6 // yet. See https://github.com/go-kit/kit/issues/184.
7
8 import (
9 "context"
10
11 "github.com/go-kit/kit/endpoint"
12 thriftadd "github.com/go-kit/kit/examples/addsvc/thrift/gen-go/addsvc"
13 )
14
15 // MakeThriftHandler makes a set of endpoints available as a Thrift service.
16 func MakeThriftHandler(ctx context.Context, e Endpoints) thriftadd.AddService {
17 return &thriftServer{
18 ctx: ctx,
19 sum: e.SumEndpoint,
20 concat: e.ConcatEndpoint,
21 }
22 }
23
24 type thriftServer struct {
25 ctx context.Context
26 sum endpoint.Endpoint
27 concat endpoint.Endpoint
28 }
29
30 func (s *thriftServer) Sum(a int64, b int64) (*thriftadd.SumReply, error) {
31 request := sumRequest{A: int(a), B: int(b)}
32 response, err := s.sum(s.ctx, request)
33 if err != nil {
34 return nil, err
35 }
36 resp := response.(sumResponse)
37 return &thriftadd.SumReply{Value: int64(resp.V), Err: err2str(resp.Err)}, nil
38 }
39
40 func (s *thriftServer) Concat(a string, b string) (*thriftadd.ConcatReply, error) {
41 request := concatRequest{A: a, B: b}
42 response, err := s.concat(s.ctx, request)
43 if err != nil {
44 return nil, err
45 }
46 resp := response.(concatResponse)
47 return &thriftadd.ConcatReply{Value: resp.V, Err: err2str(resp.Err)}, nil
48 }
49
50 // MakeThriftSumEndpoint returns an endpoint that invokes the passed Thrift client.
51 // Useful only in clients, and only until a proper transport/thrift.Client exists.
52 func MakeThriftSumEndpoint(client *thriftadd.AddServiceClient) endpoint.Endpoint {
53 return func(ctx context.Context, request interface{}) (interface{}, error) {
54 req := request.(sumRequest)
55 reply, err := client.Sum(int64(req.A), int64(req.B))
56 if err == ErrIntOverflow {
57 return nil, err // special case; see comment on ErrIntOverflow
58 }
59 return sumResponse{V: int(reply.Value), Err: err}, nil
60 }
61 }
62
63 // MakeThriftConcatEndpoint returns an endpoint that invokes the passed Thrift
64 // client. Useful only in clients, and only until a proper
65 // transport/thrift.Client exists.
66 func MakeThriftConcatEndpoint(client *thriftadd.AddServiceClient) endpoint.Endpoint {
67 return func(ctx context.Context, request interface{}) (interface{}, error) {
68 req := request.(concatRequest)
69 reply, err := client.Concat(req.A, req.B)
70 return concatResponse{V: reply.Value, Err: err}, nil
71 }
72 }
1515 "syscall"
1616 "time"
1717
18 consulsd "github.com/go-kit/kit/sd/consul"
1819 "github.com/gorilla/mux"
1920 "github.com/hashicorp/consul/api"
2021 stdopentracing "github.com/opentracing/opentracing-go"
22 "google.golang.org/grpc"
2123
2224 "github.com/go-kit/kit/endpoint"
23 "github.com/go-kit/kit/examples/addsvc"
24 addsvcgrpcclient "github.com/go-kit/kit/examples/addsvc/client/grpc"
2525 "github.com/go-kit/kit/log"
2626 "github.com/go-kit/kit/sd"
27 consulsd "github.com/go-kit/kit/sd/consul"
2827 "github.com/go-kit/kit/sd/lb"
2928 httptransport "github.com/go-kit/kit/transport/http"
30 "google.golang.org/grpc"
29
30 "github.com/go-kit/kit/examples/addsvc/pkg/addendpoint"
31 "github.com/go-kit/kit/examples/addsvc/pkg/addservice"
32 "github.com/go-kit/kit/examples/addsvc/pkg/addtransport"
3133 )
3234
3335 func main() {
7880 // addsvc client package to construct a complete service. We can then
7981 // leverage the addsvc.Make{Sum,Concat}Endpoint constructors to convert
8082 // the complete service to specific endpoint.
81
8283 var (
8384 tags = []string{}
8485 passingOnly = true
85 endpoints = addsvc.Endpoints{}
86 endpoints = addendpoint.Set{}
8687 instancer = consulsd.NewInstancer(client, logger, "addsvc", tags, passingOnly)
8788 )
8889 {
89 factory := addsvcFactory(addsvc.MakeSumEndpoint, tracer, logger)
90 factory := addsvcFactory(addendpoint.MakeSumEndpoint, tracer, logger)
9091 endpointer := sd.NewEndpointer(instancer, factory, logger)
9192 balancer := lb.NewRoundRobin(endpointer)
9293 retry := lb.Retry(*retryMax, *retryTimeout, balancer)
9394 endpoints.SumEndpoint = retry
9495 }
9596 {
96 factory := addsvcFactory(addsvc.MakeConcatEndpoint, tracer, logger)
97 factory := addsvcFactory(addendpoint.MakeConcatEndpoint, tracer, logger)
9798 endpointer := sd.NewEndpointer(instancer, factory, logger)
9899 balancer := lb.NewRoundRobin(endpointer)
99100 retry := lb.Retry(*retryMax, *retryTimeout, balancer)
104105 // HTTP handler, and just install it under a particular path prefix in
105106 // our router.
106107
107 r.PathPrefix("/addsvc").Handler(http.StripPrefix("/addsvc", addsvc.MakeHTTPHandler(endpoints, tracer, logger)))
108 r.PathPrefix("/addsvc").Handler(http.StripPrefix("/addsvc", addtransport.NewHTTPHandler(endpoints, tracer, logger)))
108109 }
109110
110111 // stringsvc routes.
163164 logger.Log("exit", <-errc)
164165 }
165166
166 func addsvcFactory(makeEndpoint func(addsvc.Service) endpoint.Endpoint, tracer stdopentracing.Tracer, logger log.Logger) sd.Factory {
167 func addsvcFactory(makeEndpoint func(addservice.Service) endpoint.Endpoint, tracer stdopentracing.Tracer, logger log.Logger) sd.Factory {
167168 return func(instance string) (endpoint.Endpoint, io.Closer, error) {
168169 // We could just as easily use the HTTP or Thrift client package to make
169170 // the connection to addsvc. We've chosen gRPC arbitrarily. Note that
174175 if err != nil {
175176 return nil, nil, err
176177 }
177 service := addsvcgrpcclient.New(conn, tracer, logger)
178 service := addtransport.NewGRPCClient(conn, tracer, logger)
178179 endpoint := makeEndpoint(service)
179180
180181 // Notice that the addsvc gRPC client converts the connection to a