Merge pull request #228 from jupp0r/feature/use-loadbalancer-in-addsvc-example
Use loadbalancer in addsvc example
Peter Bourgon
8 years ago
0 | 0 | examples/addsvc/addsvc |
1 | 1 | examples/addsvc/client/client |
2 | examples/apigateway/apigateway | |
2 | 3 | examples/profilesvc/profilesvc |
3 | 4 | examples/stringsvc1/stringsvc1 |
4 | 5 | examples/stringsvc2/stringsvc2 |
0 | package main | |
1 | ||
2 | import ( | |
3 | "golang.org/x/net/context" | |
4 | ||
5 | "github.com/go-kit/kit/endpoint" | |
6 | "github.com/go-kit/kit/examples/addsvc/server" | |
7 | "github.com/go-kit/kit/log" | |
8 | ) | |
9 | ||
10 | // NewClient returns an AddService that's backed by the provided Endpoints | |
11 | func newClient(ctx context.Context, sumEndpoint endpoint.Endpoint, concatEndpoint endpoint.Endpoint, logger log.Logger) server.AddService { | |
12 | return client{ | |
13 | Context: ctx, | |
14 | Logger: logger, | |
15 | sum: sumEndpoint, | |
16 | concat: concatEndpoint, | |
17 | } | |
18 | } | |
19 | ||
20 | type client struct { | |
21 | context.Context | |
22 | log.Logger | |
23 | sum endpoint.Endpoint | |
24 | concat endpoint.Endpoint | |
25 | } | |
26 | ||
27 | // TODO(pb): If your service interface methods don't return an error, we have | |
28 | // no way to signal problems with a service client. If they don't take a | |
29 | // context, we have to provide a global context for any transport that | |
30 | // requires one, effectively making your service a black box to any context- | |
31 | // specific information. So, we should make some recommendations: | |
32 | // | |
33 | // - To get started, a simple service interface is probably fine. | |
34 | // | |
35 | // - To properly deal with transport errors, every method on your service | |
36 | // should return an error. This is probably important. | |
37 | // | |
38 | // - To properly deal with context information, every method on your service | |
39 | // can take a context as its first argument. This may or may not be | |
40 | // important. | |
41 | ||
42 | func (c client) Sum(a, b int) int { | |
43 | request := server.SumRequest{ | |
44 | A: a, | |
45 | B: b, | |
46 | } | |
47 | reply, err := c.sum(c.Context, request) | |
48 | if err != nil { | |
49 | c.Logger.Log("err", err) // Without an error return parameter, we can't do anything else... | |
50 | return 0 | |
51 | } | |
52 | ||
53 | r := reply.(server.SumResponse) | |
54 | return r.V | |
55 | } | |
56 | ||
57 | func (c client) Concat(a, b string) string { | |
58 | request := server.ConcatRequest{ | |
59 | A: a, | |
60 | B: b, | |
61 | } | |
62 | reply, err := c.concat(c.Context, request) | |
63 | if err != nil { | |
64 | c.Logger.Log("err", err) // Without an error return parameter, we can't do anything else... | |
65 | return "" | |
66 | } | |
67 | ||
68 | r := reply.(server.ConcatResponse) | |
69 | return r.V | |
70 | } |
0 | package grpc | |
1 | ||
2 | import ( | |
3 | "golang.org/x/net/context" | |
4 | "google.golang.org/grpc" | |
5 | ||
6 | "github.com/go-kit/kit/endpoint" | |
7 | "github.com/go-kit/kit/examples/addsvc/pb" | |
8 | "github.com/go-kit/kit/examples/addsvc/server" | |
9 | "github.com/go-kit/kit/log" | |
10 | grpctransport "github.com/go-kit/kit/transport/grpc" | |
11 | ) | |
12 | ||
13 | // New returns an AddService that's backed by the provided ClientConn. | |
14 | func New(ctx context.Context, cc *grpc.ClientConn, logger log.Logger) server.AddService { | |
15 | return client{ | |
16 | Context: ctx, | |
17 | Logger: logger, | |
18 | sum: grpctransport.NewClient(cc, "Add", "sum", encodeSumRequest, decodeSumResponse, pb.SumReply{}).Endpoint(), | |
19 | concat: grpctransport.NewClient(cc, "Add", "concat", encodeConcatRequest, decodeConcatResponse, pb.ConcatReply{}).Endpoint(), | |
20 | } | |
21 | } | |
22 | ||
23 | type client struct { | |
24 | context.Context | |
25 | log.Logger | |
26 | sum endpoint.Endpoint | |
27 | concat endpoint.Endpoint | |
28 | } | |
29 | ||
30 | // TODO(pb): If your service interface methods don't return an error, we have | |
31 | // no way to signal problems with a service client. If they don't take a | |
32 | // context, we have to provide a global context for any transport that | |
33 | // requires one, effectively making your service a black box to any context- | |
34 | // specific information. So, we should make some recommendations: | |
35 | // | |
36 | // - To get started, a simple service interface is probably fine. | |
37 | // | |
38 | // - To properly deal with transport errors, every method on your service | |
39 | // should return an error. This is probably important. | |
40 | // | |
41 | // - To properly deal with context information, every method on your service | |
42 | // can take a context as its first argument. This may or may not be | |
43 | // important. | |
44 | ||
45 | func (c client) Sum(a, b int) int { | |
46 | request := &server.SumRequest{ | |
47 | A: a, | |
48 | B: b, | |
49 | } | |
50 | reply, err := c.sum(c.Context, request) | |
51 | if err != nil { | |
52 | c.Logger.Log("err", err) // Without an error return parameter, we can't do anything else... | |
53 | return 0 | |
54 | } | |
55 | ||
56 | r := reply.(server.SumResponse) | |
57 | return r.V | |
58 | } | |
59 | ||
60 | func (c client) Concat(a, b string) string { | |
61 | request := &server.ConcatRequest{ | |
62 | A: a, | |
63 | B: b, | |
64 | } | |
65 | reply, err := c.concat(c.Context, request) | |
66 | if err != nil { | |
67 | c.Logger.Log("err", err) // Without an error return parameter, we can't do anything else... | |
68 | return "" | |
69 | } | |
70 | ||
71 | r := reply.(server.ConcatResponse) | |
72 | return r.V | |
73 | } |
23 | 23 | } |
24 | 24 | |
25 | 25 | func decodeSumResponse(ctx context.Context, response interface{}) (interface{}, error) { |
26 | resp := response.(pb.SumReply) | |
27 | return &server.SumResponse{ | |
26 | resp := response.(*pb.SumReply) | |
27 | return server.SumResponse{ | |
28 | 28 | V: int(resp.V), |
29 | 29 | }, nil |
30 | 30 | } |
31 | 31 | |
32 | 32 | func decodeConcatResponse(ctx context.Context, response interface{}) (interface{}, error) { |
33 | resp := response.(pb.ConcatReply) | |
34 | return &server.ConcatResponse{ | |
33 | resp := response.(*pb.ConcatReply) | |
34 | return server.ConcatResponse{ | |
35 | 35 | V: resp.V, |
36 | 36 | }, nil |
37 | 37 | } |
0 | package grpc | |
1 | ||
2 | import ( | |
3 | "io" | |
4 | ||
5 | "google.golang.org/grpc" | |
6 | ||
7 | "github.com/go-kit/kit/endpoint" | |
8 | "github.com/go-kit/kit/examples/addsvc/pb" | |
9 | grpctransport "github.com/go-kit/kit/transport/grpc" | |
10 | ) | |
11 | ||
12 | // SumEndpointFactory transforms GRPC host:port strings into Endpoints that call the Sum method on a GRPC server | |
13 | // at that address. | |
14 | func SumEndpointFactory(instance string) (endpoint.Endpoint, io.Closer, error) { | |
15 | cc, err := grpc.Dial(instance, grpc.WithInsecure()) | |
16 | return grpctransport.NewClient( | |
17 | cc, | |
18 | "Add", | |
19 | "Sum", | |
20 | encodeSumRequest, | |
21 | decodeSumResponse, | |
22 | pb.SumReply{}, | |
23 | ).Endpoint(), cc, err | |
24 | } | |
25 | ||
26 | // ConcatEndpointFactory transforms GRPC host:port strings into Endpoints that call the Concat method on a GRPC server | |
27 | // at that address. | |
28 | func ConcatEndpointFactory(instance string) (endpoint.Endpoint, io.Closer, error) { | |
29 | cc, err := grpc.Dial(instance, grpc.WithInsecure()) | |
30 | return grpctransport.NewClient( | |
31 | cc, | |
32 | "Add", | |
33 | "Concat", | |
34 | encodeConcatRequest, | |
35 | decodeConcatResponse, | |
36 | pb.ConcatReply{}, | |
37 | ).Endpoint(), cc, err | |
38 | } |
0 | package httpjson | |
1 | ||
2 | import ( | |
3 | "net/http" | |
4 | "net/url" | |
5 | ||
6 | "golang.org/x/net/context" | |
7 | ||
8 | "github.com/go-kit/kit/endpoint" | |
9 | "github.com/go-kit/kit/examples/addsvc/server" | |
10 | "github.com/go-kit/kit/log" | |
11 | httptransport "github.com/go-kit/kit/transport/http" | |
12 | ) | |
13 | ||
14 | // New returns an AddService that's backed by the URL. baseurl will have its | |
15 | // scheme and hostport used, but its path will be overwritten. If client is | |
16 | // nil, http.DefaultClient will be used. | |
17 | func New(ctx context.Context, baseurl *url.URL, logger log.Logger, c *http.Client) server.AddService { | |
18 | sumURL, err := url.Parse(baseurl.String()) | |
19 | if err != nil { | |
20 | panic(err) | |
21 | } | |
22 | sumURL.Path = "/sum" | |
23 | ||
24 | concatURL, err := url.Parse(baseurl.String()) | |
25 | if err != nil { | |
26 | panic(err) | |
27 | } | |
28 | concatURL.Path = "/concat" | |
29 | ||
30 | return client{ | |
31 | Context: ctx, | |
32 | Logger: logger, | |
33 | sum: httptransport.NewClient( | |
34 | "GET", | |
35 | sumURL, | |
36 | server.EncodeSumRequest, | |
37 | server.DecodeSumResponse, | |
38 | httptransport.SetClient(c), | |
39 | ).Endpoint(), | |
40 | concat: httptransport.NewClient( | |
41 | "GET", | |
42 | concatURL, | |
43 | server.EncodeConcatRequest, | |
44 | server.DecodeConcatResponse, | |
45 | httptransport.SetClient(c), | |
46 | ).Endpoint(), | |
47 | } | |
48 | } | |
49 | ||
50 | type client struct { | |
51 | context.Context | |
52 | log.Logger | |
53 | sum endpoint.Endpoint | |
54 | concat endpoint.Endpoint | |
55 | } | |
56 | ||
57 | func (c client) Sum(a, b int) int { | |
58 | response, err := c.sum(c.Context, server.SumRequest{A: a, B: b}) | |
59 | if err != nil { | |
60 | c.Logger.Log("err", err) | |
61 | return 0 | |
62 | } | |
63 | return response.(server.SumResponse).V | |
64 | } | |
65 | ||
66 | func (c client) Concat(a, b string) string { | |
67 | response, err := c.concat(c.Context, server.ConcatRequest{A: a, B: b}) | |
68 | if err != nil { | |
69 | c.Logger.Log("err", err) | |
70 | return "" | |
71 | } | |
72 | return response.(server.ConcatResponse).V | |
73 | } |
0 | package httpjson | |
1 | ||
2 | import ( | |
3 | "io" | |
4 | "net/url" | |
5 | ||
6 | "github.com/go-kit/kit/endpoint" | |
7 | "github.com/go-kit/kit/examples/addsvc/server" | |
8 | httptransport "github.com/go-kit/kit/transport/http" | |
9 | ) | |
10 | ||
11 | // SumEndpointFactory transforms a http url into an Endpoint. | |
12 | // The path of the url is reset to /sum. | |
13 | func SumEndpointFactory(instance string) (endpoint.Endpoint, io.Closer, error) { | |
14 | sumURL, err := url.Parse(instance) | |
15 | if err != nil { | |
16 | return nil, nil, err | |
17 | } | |
18 | sumURL.Path = "/sum" | |
19 | ||
20 | client := httptransport.NewClient( | |
21 | "GET", | |
22 | sumURL, | |
23 | server.EncodeSumRequest, | |
24 | server.DecodeSumResponse, | |
25 | httptransport.SetClient(nil), | |
26 | ) | |
27 | ||
28 | return client.Endpoint(), nil, nil | |
29 | } | |
30 | ||
31 | // ConcatEndpointFactory transforms a http url into an Endpoint. | |
32 | // The path of the url is reset to /concat. | |
33 | func ConcatEndpointFactory(instance string) (endpoint.Endpoint, io.Closer, error) { | |
34 | concatURL, err := url.Parse(instance) | |
35 | if err != nil { | |
36 | return nil, nil, err | |
37 | } | |
38 | concatURL.Path = "/concat" | |
39 | ||
40 | client := httptransport.NewClient( | |
41 | "GET", | |
42 | concatURL, | |
43 | server.EncodeConcatRequest, | |
44 | server.DecodeConcatResponse, | |
45 | httptransport.SetClient(nil), | |
46 | ) | |
47 | ||
48 | return client.Endpoint(), nil, nil | |
49 | } |
2 | 2 | import ( |
3 | 3 | "flag" |
4 | 4 | "fmt" |
5 | "net/rpc" | |
6 | "net/url" | |
7 | 5 | "os" |
8 | 6 | "path/filepath" |
9 | 7 | "strconv" |
10 | 8 | "strings" |
11 | 9 | "time" |
12 | 10 | |
13 | "github.com/apache/thrift/lib/go/thrift" | |
14 | 11 | "golang.org/x/net/context" |
15 | "google.golang.org/grpc" | |
16 | 12 | |
13 | "github.com/go-kit/kit/endpoint" | |
17 | 14 | grpcclient "github.com/go-kit/kit/examples/addsvc/client/grpc" |
18 | 15 | httpjsonclient "github.com/go-kit/kit/examples/addsvc/client/httpjson" |
19 | 16 | netrpcclient "github.com/go-kit/kit/examples/addsvc/client/netrpc" |
20 | 17 | thriftclient "github.com/go-kit/kit/examples/addsvc/client/thrift" |
21 | "github.com/go-kit/kit/examples/addsvc/server" | |
22 | thriftadd "github.com/go-kit/kit/examples/addsvc/thrift/gen-go/add" | |
18 | "github.com/go-kit/kit/loadbalancer" | |
19 | "github.com/go-kit/kit/loadbalancer/static" | |
23 | 20 | "github.com/go-kit/kit/log" |
24 | 21 | ) |
25 | 22 | |
26 | 23 | func main() { |
27 | 24 | var ( |
28 | 25 | transport = flag.String("transport", "httpjson", "httpjson, grpc, netrpc, thrift") |
29 | httpAddr = flag.String("http.addr", "localhost:8001", "Address for HTTP (JSON) server") | |
30 | grpcAddr = flag.String("grpc.addr", "localhost:8002", "Address for gRPC server") | |
31 | netrpcAddr = flag.String("netrpc.addr", "localhost:8003", "Address for net/rpc server") | |
32 | thriftAddr = flag.String("thrift.addr", "localhost:8004", "Address for Thrift server") | |
26 | httpAddrs = flag.String("http.addrs", "localhost:8001", "Comma-separated list of addresses for HTTP (JSON) servers") | |
27 | grpcAddrs = flag.String("grpc.addrs", "localhost:8002", "Comma-separated list of addresses for gRPC servers") | |
28 | netrpcAddrs = flag.String("netrpc.addrs", "localhost:8003", "Comma-separated list of addresses for net/rpc servers") | |
29 | thriftAddrs = flag.String("thrift.addrs", "localhost:8004", "Comma-separated list of addresses for Thrift servers") | |
33 | 30 | thriftProtocol = flag.String("thrift.protocol", "binary", "binary, compact, json, simplejson") |
34 | 31 | thriftBufferSize = flag.Int("thrift.buffer.size", 0, "0 for unbuffered") |
35 | 32 | thriftFramed = flag.Bool("thrift.framed", false, "true to enable framing") |
41 | 38 | os.Exit(1) |
42 | 39 | } |
43 | 40 | |
41 | randomSeed := time.Now().UnixNano() | |
42 | ||
44 | 43 | root := context.Background() |
45 | 44 | method, s1, s2 := flag.Arg(0), flag.Arg(1), flag.Arg(2) |
46 | 45 | |
49 | 48 | logger = log.NewContext(logger).With("caller", log.DefaultCaller) |
50 | 49 | logger = log.NewContext(logger).With("transport", *transport) |
51 | 50 | |
52 | var svc server.AddService | |
51 | var ( | |
52 | instances []string | |
53 | sumFactory, concatFactory loadbalancer.Factory | |
54 | ) | |
55 | ||
53 | 56 | switch *transport { |
54 | 57 | case "grpc": |
55 | cc, err := grpc.Dial(*grpcAddr, grpc.WithInsecure()) | |
56 | if err != nil { | |
57 | logger.Log("err", err) | |
58 | os.Exit(1) | |
59 | } | |
60 | defer cc.Close() | |
61 | svc = grpcclient.New(root, cc, logger) | |
58 | instances = strings.Split(*grpcAddrs, ",") | |
59 | sumFactory = grpcclient.SumEndpointFactory | |
60 | concatFactory = grpcclient.ConcatEndpointFactory | |
62 | 61 | |
63 | 62 | case "httpjson": |
64 | rawurl := *httpAddr | |
65 | if !strings.HasPrefix("http", rawurl) { | |
66 | rawurl = "http://" + rawurl | |
63 | instances = strings.Split(*httpAddrs, ",") | |
64 | for i, rawurl := range instances { | |
65 | if !strings.HasPrefix("http", rawurl) { | |
66 | instances[i] = "http://" + rawurl | |
67 | } | |
67 | 68 | } |
68 | baseurl, err := url.Parse(rawurl) | |
69 | if err != nil { | |
70 | logger.Log("err", err) | |
71 | os.Exit(1) | |
72 | } | |
73 | svc = httpjsonclient.New(root, baseurl, logger, nil) | |
69 | sumFactory = httpjsonclient.SumEndpointFactory | |
70 | concatFactory = httpjsonclient.ConcatEndpointFactory | |
74 | 71 | |
75 | 72 | case "netrpc": |
76 | cli, err := rpc.DialHTTP("tcp", *netrpcAddr) | |
77 | if err != nil { | |
78 | logger.Log("err", err) | |
79 | os.Exit(1) | |
80 | } | |
81 | defer cli.Close() | |
82 | svc = netrpcclient.New(cli, logger) | |
73 | instances = strings.Split(*netrpcAddrs, ",") | |
74 | sumFactory = netrpcclient.SumEndpointFactory | |
75 | concatFactory = netrpcclient.ConcatEndpointFactory | |
83 | 76 | |
84 | 77 | case "thrift": |
85 | var protocolFactory thrift.TProtocolFactory | |
86 | switch *thriftProtocol { | |
87 | case "compact": | |
88 | protocolFactory = thrift.NewTCompactProtocolFactory() | |
89 | case "simplejson": | |
90 | protocolFactory = thrift.NewTSimpleJSONProtocolFactory() | |
91 | case "json": | |
92 | protocolFactory = thrift.NewTJSONProtocolFactory() | |
93 | case "binary", "": | |
94 | protocolFactory = thrift.NewTBinaryProtocolFactoryDefault() | |
95 | default: | |
96 | logger.Log("protocol", *thriftProtocol, "err", "invalid protocol") | |
97 | os.Exit(1) | |
98 | } | |
99 | var transportFactory thrift.TTransportFactory | |
100 | if *thriftBufferSize > 0 { | |
101 | transportFactory = thrift.NewTBufferedTransportFactory(*thriftBufferSize) | |
102 | } else { | |
103 | transportFactory = thrift.NewTTransportFactory() | |
104 | } | |
105 | if *thriftFramed { | |
106 | transportFactory = thrift.NewTFramedTransportFactory(transportFactory) | |
107 | } | |
108 | transportSocket, err := thrift.NewTSocket(*thriftAddr) | |
109 | if err != nil { | |
110 | logger.Log("during", "thrift.NewTSocket", "err", err) | |
111 | os.Exit(1) | |
112 | } | |
113 | trans := transportFactory.GetTransport(transportSocket) | |
114 | defer trans.Close() | |
115 | if err := trans.Open(); err != nil { | |
116 | logger.Log("during", "thrift transport.Open", "err", err) | |
117 | os.Exit(1) | |
118 | } | |
119 | cli := thriftadd.NewAddServiceClientFactory(trans, protocolFactory) | |
120 | svc = thriftclient.New(cli, logger) | |
78 | instances = strings.Split(*thriftAddrs, ",") | |
79 | thriftClient := thriftclient.New(*thriftProtocol, *thriftBufferSize, *thriftFramed, logger) | |
80 | sumFactory = thriftClient.SumEndpoint | |
81 | concatFactory = thriftClient.ConcatEndpoint | |
121 | 82 | |
122 | 83 | default: |
123 | 84 | logger.Log("err", "invalid transport") |
124 | 85 | os.Exit(1) |
125 | 86 | } |
87 | ||
88 | sum := buildEndpoint(instances, sumFactory, randomSeed, logger) | |
89 | concat := buildEndpoint(instances, concatFactory, randomSeed, logger) | |
90 | ||
91 | svc := newClient(root, sum, concat, logger) | |
126 | 92 | |
127 | 93 | begin := time.Now() |
128 | 94 | switch method { |
142 | 108 | os.Exit(1) |
143 | 109 | } |
144 | 110 | } |
111 | ||
112 | func buildEndpoint(instances []string, factory loadbalancer.Factory, seed int64, logger log.Logger) endpoint.Endpoint { | |
113 | publisher := static.NewPublisher(instances, factory, logger) | |
114 | random := loadbalancer.NewRandom(publisher, seed) | |
115 | return loadbalancer.Retry(10, 10*time.Second, random) | |
116 | } |
0 | package netrpc | |
1 | ||
2 | import ( | |
3 | "net/rpc" | |
4 | ||
5 | "github.com/go-kit/kit/examples/addsvc/server" | |
6 | "github.com/go-kit/kit/log" | |
7 | ) | |
8 | ||
9 | // New returns an AddService that's backed by the provided rpc.Client. | |
10 | func New(cli *rpc.Client, logger log.Logger) server.AddService { | |
11 | return client{cli, logger} | |
12 | } | |
13 | ||
14 | type client struct { | |
15 | *rpc.Client | |
16 | log.Logger | |
17 | } | |
18 | ||
19 | func (c client) Sum(a, b int) int { | |
20 | var reply server.SumResponse | |
21 | if err := c.Client.Call("addsvc.Sum", server.SumRequest{A: a, B: b}, &reply); err != nil { | |
22 | c.Logger.Log("err", err) | |
23 | return 0 | |
24 | } | |
25 | return reply.V | |
26 | } | |
27 | ||
28 | func (c client) Concat(a, b string) string { | |
29 | var reply server.ConcatResponse | |
30 | if err := c.Client.Call("addsvc.Concat", server.ConcatRequest{A: a, B: b}, &reply); err != nil { | |
31 | c.Logger.Log("err", err) | |
32 | return "" | |
33 | } | |
34 | return reply.V | |
35 | } |
0 | package netrpc | |
1 | ||
2 | import ( | |
3 | "io" | |
4 | "net/rpc" | |
5 | ||
6 | "golang.org/x/net/context" | |
7 | ||
8 | "github.com/go-kit/kit/endpoint" | |
9 | "github.com/go-kit/kit/examples/addsvc/server" | |
10 | ) | |
11 | ||
12 | // SumEndpointFactory transforms host:port strings into Endpoints. | |
13 | func SumEndpointFactory(instance string) (endpoint.Endpoint, io.Closer, error) { | |
14 | client, err := rpc.DialHTTP("tcp", instance) | |
15 | if err != nil { | |
16 | return nil, nil, err | |
17 | } | |
18 | ||
19 | return func(ctx context.Context, request interface{}) (interface{}, error) { | |
20 | var reply server.SumResponse | |
21 | if err := client.Call("addsvc.Sum", request.(server.SumRequest), &reply); err != nil { | |
22 | return server.SumResponse{}, err | |
23 | } | |
24 | return reply, nil | |
25 | }, client, nil | |
26 | } | |
27 | ||
28 | // ConcatEndpointFactory transforms host:port strings into Endpoints. | |
29 | func ConcatEndpointFactory(instance string) (endpoint.Endpoint, io.Closer, error) { | |
30 | client, err := rpc.DialHTTP("tcp", instance) | |
31 | if err != nil { | |
32 | return nil, nil, err | |
33 | } | |
34 | ||
35 | return func(ctx context.Context, request interface{}) (interface{}, error) { | |
36 | var reply server.ConcatResponse | |
37 | if err := client.Call("addsvc.Concat", request.(server.ConcatRequest), &reply); err != nil { | |
38 | return server.ConcatResponse{}, err | |
39 | } | |
40 | return reply, nil | |
41 | }, client, nil | |
42 | } |
0 | 0 | package thrift |
1 | 1 | |
2 | 2 | import ( |
3 | "io" | |
4 | ||
5 | "github.com/apache/thrift/lib/go/thrift" | |
6 | "github.com/go-kit/kit/endpoint" | |
3 | 7 | "github.com/go-kit/kit/examples/addsvc/server" |
4 | 8 | thriftadd "github.com/go-kit/kit/examples/addsvc/thrift/gen-go/add" |
5 | 9 | "github.com/go-kit/kit/log" |
10 | "golang.org/x/net/context" | |
6 | 11 | ) |
7 | 12 | |
8 | // New returns an AddService that's backed by the Thrift client. | |
9 | func New(cli *thriftadd.AddServiceClient, logger log.Logger) server.AddService { | |
10 | return &client{cli, logger} | |
13 | // New returns a stateful factory for Sum and Concat Endpoints | |
14 | func New(protocol string, bufferSize int, framed bool, logger log.Logger) client { | |
15 | var protocolFactory thrift.TProtocolFactory | |
16 | switch protocol { | |
17 | case "compact": | |
18 | protocolFactory = thrift.NewTCompactProtocolFactory() | |
19 | case "simplejson": | |
20 | protocolFactory = thrift.NewTSimpleJSONProtocolFactory() | |
21 | case "json": | |
22 | protocolFactory = thrift.NewTJSONProtocolFactory() | |
23 | case "binary", "": | |
24 | protocolFactory = thrift.NewTBinaryProtocolFactoryDefault() | |
25 | default: | |
26 | panic("invalid protocol") | |
27 | } | |
28 | ||
29 | var transportFactory thrift.TTransportFactory | |
30 | if bufferSize > 0 { | |
31 | transportFactory = thrift.NewTBufferedTransportFactory(bufferSize) | |
32 | } else { | |
33 | transportFactory = thrift.NewTTransportFactory() | |
34 | } | |
35 | if framed { | |
36 | transportFactory = thrift.NewTFramedTransportFactory(transportFactory) | |
37 | } | |
38 | ||
39 | return client{transportFactory, protocolFactory, logger} | |
11 | 40 | } |
12 | 41 | |
13 | 42 | type client struct { |
14 | *thriftadd.AddServiceClient | |
43 | thrift.TTransportFactory | |
44 | thrift.TProtocolFactory | |
15 | 45 | log.Logger |
16 | 46 | } |
17 | 47 | |
18 | func (c client) Sum(a, b int) int { | |
19 | reply, err := c.AddServiceClient.Sum(int64(a), int64(b)) | |
48 | // SumEndpointFactory transforms host:port strings into Endpoints. | |
49 | func (c client) SumEndpoint(instance string) (endpoint.Endpoint, io.Closer, error) { | |
50 | transportSocket, err := thrift.NewTSocket(instance) | |
20 | 51 | if err != nil { |
21 | c.Logger.Log("err", err) | |
22 | return 0 | |
52 | c.Logger.Log("during", "thrift.NewTSocket", "err", err) | |
53 | return nil, nil, err | |
23 | 54 | } |
24 | return int(reply.Value) | |
55 | trans := c.TTransportFactory.GetTransport(transportSocket) | |
56 | ||
57 | if err := trans.Open(); err != nil { | |
58 | c.Logger.Log("during", "thrift transport.Open", "err", err) | |
59 | return nil, nil, err | |
60 | } | |
61 | cli := thriftadd.NewAddServiceClientFactory(trans, c.TProtocolFactory) | |
62 | ||
63 | return func(ctx context.Context, request interface{}) (interface{}, error) { | |
64 | sumRequest := request.(server.SumRequest) | |
65 | reply, err := cli.Sum(int64(sumRequest.A), int64(sumRequest.B)) | |
66 | if err != nil { | |
67 | return server.SumResponse{}, err | |
68 | } | |
69 | return server.SumResponse{V: int(reply.Value)}, nil | |
70 | }, trans, nil | |
25 | 71 | } |
26 | 72 | |
27 | func (c client) Concat(a, b string) string { | |
28 | reply, err := c.AddServiceClient.Concat(a, b) | |
73 | // ConcatEndpointFactory transforms host:port strings into Endpoints. | |
74 | func (c client) ConcatEndpoint(instance string) (endpoint.Endpoint, io.Closer, error) { | |
75 | transportSocket, err := thrift.NewTSocket(instance) | |
29 | 76 | if err != nil { |
30 | c.Logger.Log("err", err) | |
31 | return "" | |
77 | c.Logger.Log("during", "thrift.NewTSocket", "err", err) | |
78 | return nil, nil, err | |
32 | 79 | } |
33 | return reply.Value | |
80 | trans := c.TTransportFactory.GetTransport(transportSocket) | |
81 | ||
82 | if err := trans.Open(); err != nil { | |
83 | c.Logger.Log("during", "thrift transport.Open", "err", err) | |
84 | return nil, nil, err | |
85 | } | |
86 | cli := thriftadd.NewAddServiceClientFactory(trans, c.TProtocolFactory) | |
87 | ||
88 | return func(ctx context.Context, request interface{}) (interface{}, error) { | |
89 | concatRequest := request.(server.ConcatRequest) | |
90 | reply, err := cli.Concat(concatRequest.A, concatRequest.B) | |
91 | if err != nil { | |
92 | return server.ConcatResponse{}, err | |
93 | } | |
94 | return server.ConcatResponse{V: reply.Value}, nil | |
95 | }, trans, nil | |
34 | 96 | } |
220 | 220 | errc <- err |
221 | 221 | return |
222 | 222 | } |
223 | transportLogger := log.NewContext(logger).With("transport", "net/rpc") | |
223 | transportLogger := log.NewContext(logger).With("transport", "thrift") | |
224 | 224 | transportLogger.Log("addr", *thriftAddr) |
225 | 225 | errc <- thrift.NewTSimpleServer4( |
226 | 226 | thriftadd.NewAddServiceProcessor(thriftBinding{svc}), |
12 | 12 | func DecodeSumRequest(r *http.Request) (interface{}, error) { |
13 | 13 | var request SumRequest |
14 | 14 | err := json.NewDecoder(r.Body).Decode(&request) |
15 | return request, err | |
15 | return &request, err | |
16 | 16 | } |
17 | 17 | |
18 | 18 | // EncodeSumResponse encodes the response to the provided HTTP response |
28 | 28 | func DecodeConcatRequest(r *http.Request) (interface{}, error) { |
29 | 29 | var request ConcatRequest |
30 | 30 | err := json.NewDecoder(r.Body).Decode(&request) |
31 | return request, err | |
31 | return &request, err | |
32 | 32 | } |
33 | 33 | |
34 | 34 | // EncodeConcatResponse encodes the response to the provided HTTP response |
17 | 17 | "github.com/gorilla/mux" |
18 | 18 | "github.com/hashicorp/consul/api" |
19 | 19 | "golang.org/x/net/context" |
20 | "google.golang.org/grpc" | |
21 | 20 | |
22 | 21 | "github.com/go-kit/kit/endpoint" |
23 | addsvc "github.com/go-kit/kit/examples/addsvc/client/grpc" | |
22 | "github.com/go-kit/kit/examples/addsvc/client/grpc" | |
24 | 23 | "github.com/go-kit/kit/examples/addsvc/server" |
25 | 24 | "github.com/go-kit/kit/loadbalancer" |
26 | 25 | "github.com/go-kit/kit/loadbalancer/consul" |
77 | 76 | factory loadbalancer.Factory |
78 | 77 | }{ |
79 | 78 | "addsvc": { |
80 | {path: "/api/addsvc/concat", factory: addsvcGRPCFactory(ctx, makeConcatEndpoint, logger)}, | |
81 | {path: "/api/addsvc/sum", factory: addsvcGRPCFactory(ctx, makeSumEndpoint, logger)}, | |
79 | {path: "/api/addsvc/concat", factory: grpc.ConcatEndpointFactory}, | |
80 | {path: "/api/addsvc/sum", factory: grpc.SumEndpointFactory}, | |
82 | 81 | }, |
83 | 82 | "stringsvc": { |
84 | 83 | {path: "/api/stringsvc/uppercase", factory: httpFactory(ctx, "GET", "uppercase/")}, |
129 | 128 | logger.Log("err", err) |
130 | 129 | return |
131 | 130 | } |
132 | } | |
133 | } | |
134 | ||
135 | func addsvcGRPCFactory(ctx context.Context, makeEndpoint func(server.AddService) endpoint.Endpoint, logger log.Logger) loadbalancer.Factory { | |
136 | return func(instance string) (endpoint.Endpoint, io.Closer, error) { | |
137 | var e endpoint.Endpoint | |
138 | conn, err := grpc.Dial(instance, grpc.WithInsecure()) | |
139 | if err != nil { | |
140 | return e, nil, err | |
141 | } | |
142 | svc := addsvc.New(ctx, conn, logger) | |
143 | return makeEndpoint(svc), nil, nil | |
144 | 131 | } |
145 | 132 | } |
146 | 133 |