Use loadbalancer in addsvc example
This change makes changes to the addsvc example in order to demonstrate
usage of package loadbalancer. As prerequisite, endpoint handling is
abstracted from transport implementations. Transport specific factories
now generate functions for setup and teardown of connections.
Jupp Müller
6 years ago
0 | 0 | examples/addsvc/addsvc |
1 | 1 | examples/addsvc/client/client |
2 | examples/apigateway/apigateway | |
2 | 3 | examples/profilesvc/profilesvc |
3 | 4 | examples/stringsvc1/stringsvc1 |
4 | 5 | examples/stringsvc2/stringsvc2 |
0 | package main | |
1 | ||
2 | import ( | |
3 | "golang.org/x/net/context" | |
4 | ||
5 | "github.com/go-kit/kit/endpoint" | |
6 | "github.com/go-kit/kit/examples/addsvc/server" | |
7 | "github.com/go-kit/kit/log" | |
8 | ) | |
9 | ||
10 | // NewClient returns an AddService that's backed by the provided Endpoints | |
11 | func newClient(ctx context.Context, sumEndpoint endpoint.Endpoint, concatEndpoint endpoint.Endpoint, logger log.Logger) server.AddService { | |
12 | return client{ | |
13 | Context: ctx, | |
14 | Logger: logger, | |
15 | sum: sumEndpoint, | |
16 | concat: concatEndpoint, | |
17 | } | |
18 | } | |
19 | ||
20 | type client struct { | |
21 | context.Context | |
22 | log.Logger | |
23 | sum endpoint.Endpoint | |
24 | concat endpoint.Endpoint | |
25 | } | |
26 | ||
27 | // TODO(pb): If your service interface methods don't return an error, we have | |
28 | // no way to signal problems with a service client. If they don't take a | |
29 | // context, we have to provide a global context for any transport that | |
30 | // requires one, effectively making your service a black box to any context- | |
31 | // specific information. So, we should make some recommendations: | |
32 | // | |
33 | // - To get started, a simple service interface is probably fine. | |
34 | // | |
35 | // - To properly deal with transport errors, every method on your service | |
36 | // should return an error. This is probably important. | |
37 | // | |
38 | // - To properly deal with context information, every method on your service | |
39 | // can take a context as its first argument. This may or may not be | |
40 | // important. | |
41 | ||
42 | func (c client) Sum(a, b int) int { | |
43 | request := server.SumRequest{ | |
44 | A: a, | |
45 | B: b, | |
46 | } | |
47 | reply, err := c.sum(c.Context, request) | |
48 | if err != nil { | |
49 | c.Logger.Log("err", err) // Without an error return parameter, we can't do anything else... | |
50 | return 0 | |
51 | } | |
52 | ||
53 | r := reply.(server.SumResponse) | |
54 | return r.V | |
55 | } | |
56 | ||
57 | func (c client) Concat(a, b string) string { | |
58 | request := server.ConcatRequest{ | |
59 | A: a, | |
60 | B: b, | |
61 | } | |
62 | reply, err := c.concat(c.Context, request) | |
63 | if err != nil { | |
64 | c.Logger.Log("err", err) // Without an error return parameter, we can't do anything else... | |
65 | return "" | |
66 | } | |
67 | ||
68 | r := reply.(server.ConcatResponse) | |
69 | return r.V | |
70 | } |
0 | package grpc | |
1 | ||
2 | import ( | |
3 | "golang.org/x/net/context" | |
4 | "google.golang.org/grpc" | |
5 | ||
6 | "github.com/go-kit/kit/endpoint" | |
7 | "github.com/go-kit/kit/examples/addsvc/pb" | |
8 | "github.com/go-kit/kit/examples/addsvc/server" | |
9 | "github.com/go-kit/kit/log" | |
10 | grpctransport "github.com/go-kit/kit/transport/grpc" | |
11 | ) | |
12 | ||
13 | // New returns an AddService that's backed by the provided ClientConn. | |
14 | func New(ctx context.Context, cc *grpc.ClientConn, logger log.Logger) server.AddService { | |
15 | return client{ | |
16 | Context: ctx, | |
17 | Logger: logger, | |
18 | sum: grpctransport.NewClient(cc, "Add", "sum", encodeSumRequest, decodeSumResponse, pb.SumReply{}).Endpoint(), | |
19 | concat: grpctransport.NewClient(cc, "Add", "concat", encodeConcatRequest, decodeConcatResponse, pb.ConcatReply{}).Endpoint(), | |
20 | } | |
21 | } | |
22 | ||
23 | type client struct { | |
24 | context.Context | |
25 | log.Logger | |
26 | sum endpoint.Endpoint | |
27 | concat endpoint.Endpoint | |
28 | } | |
29 | ||
30 | // TODO(pb): If your service interface methods don't return an error, we have | |
31 | // no way to signal problems with a service client. If they don't take a | |
32 | // context, we have to provide a global context for any transport that | |
33 | // requires one, effectively making your service a black box to any context- | |
34 | // specific information. So, we should make some recommendations: | |
35 | // | |
36 | // - To get started, a simple service interface is probably fine. | |
37 | // | |
38 | // - To properly deal with transport errors, every method on your service | |
39 | // should return an error. This is probably important. | |
40 | // | |
41 | // - To properly deal with context information, every method on your service | |
42 | // can take a context as its first argument. This may or may not be | |
43 | // important. | |
44 | ||
45 | func (c client) Sum(a, b int) int { | |
46 | request := &server.SumRequest{ | |
47 | A: a, | |
48 | B: b, | |
49 | } | |
50 | reply, err := c.sum(c.Context, request) | |
51 | if err != nil { | |
52 | c.Logger.Log("err", err) // Without an error return parameter, we can't do anything else... | |
53 | return 0 | |
54 | } | |
55 | ||
56 | r := reply.(server.SumResponse) | |
57 | return r.V | |
58 | } | |
59 | ||
60 | func (c client) Concat(a, b string) string { | |
61 | request := &server.ConcatRequest{ | |
62 | A: a, | |
63 | B: b, | |
64 | } | |
65 | reply, err := c.concat(c.Context, request) | |
66 | if err != nil { | |
67 | c.Logger.Log("err", err) // Without an error return parameter, we can't do anything else... | |
68 | return "" | |
69 | } | |
70 | ||
71 | r := reply.(server.ConcatResponse) | |
72 | return r.V | |
73 | } |
23 | 23 | } |
24 | 24 | |
25 | 25 | func decodeSumResponse(ctx context.Context, response interface{}) (interface{}, error) { |
26 | resp := response.(pb.SumReply) | |
27 | return &server.SumResponse{ | |
26 | resp := response.(*pb.SumReply) | |
27 | return server.SumResponse{ | |
28 | 28 | V: int(resp.V), |
29 | 29 | }, nil |
30 | 30 | } |
31 | 31 | |
32 | 32 | func decodeConcatResponse(ctx context.Context, response interface{}) (interface{}, error) { |
33 | resp := response.(pb.ConcatReply) | |
34 | return &server.ConcatResponse{ | |
33 | resp := response.(*pb.ConcatReply) | |
34 | return server.ConcatResponse{ | |
35 | 35 | V: resp.V, |
36 | 36 | }, nil |
37 | 37 | } |
0 | package grpc | |
1 | ||
2 | import ( | |
3 | "io" | |
4 | ||
5 | "google.golang.org/grpc" | |
6 | ||
7 | "github.com/go-kit/kit/endpoint" | |
8 | "github.com/go-kit/kit/examples/addsvc/pb" | |
9 | grpctransport "github.com/go-kit/kit/transport/grpc" | |
10 | ) | |
11 | ||
12 | // SumEndpointFactory transforms GRPC host:port strings into Endpoints that call the Sum method on a GRPC server | |
13 | // at that address. | |
14 | func SumEndpointFactory(instance string) (endpoint.Endpoint, io.Closer, error) { | |
15 | cc, err := grpc.Dial(instance, grpc.WithInsecure()) | |
16 | return grpctransport.NewClient( | |
17 | cc, | |
18 | "Add", | |
19 | "Sum", | |
20 | encodeSumRequest, | |
21 | decodeSumResponse, | |
22 | pb.SumReply{}, | |
23 | ).Endpoint(), cc, err | |
24 | } | |
25 | ||
26 | // ConcatEndpointFactory transforms GRPC host:port strings into Endpoints that call the Concat method on a GRPC server | |
27 | // at that address. | |
28 | func ConcatEndpointFactory(instance string) (endpoint.Endpoint, io.Closer, error) { | |
29 | cc, err := grpc.Dial(instance, grpc.WithInsecure()) | |
30 | return grpctransport.NewClient( | |
31 | cc, | |
32 | "Add", | |
33 | "Concat", | |
34 | encodeConcatRequest, | |
35 | decodeConcatResponse, | |
36 | pb.ConcatReply{}, | |
37 | ).Endpoint(), cc, err | |
38 | } |
0 | package httpjson | |
1 | ||
2 | import ( | |
3 | "net/http" | |
4 | "net/url" | |
5 | ||
6 | "golang.org/x/net/context" | |
7 | ||
8 | "github.com/go-kit/kit/endpoint" | |
9 | "github.com/go-kit/kit/examples/addsvc/server" | |
10 | "github.com/go-kit/kit/log" | |
11 | httptransport "github.com/go-kit/kit/transport/http" | |
12 | ) | |
13 | ||
14 | // New returns an AddService that's backed by the URL. baseurl will have its | |
15 | // scheme and hostport used, but its path will be overwritten. If client is | |
16 | // nil, http.DefaultClient will be used. | |
17 | func New(ctx context.Context, baseurl *url.URL, logger log.Logger, c *http.Client) server.AddService { | |
18 | sumURL, err := url.Parse(baseurl.String()) | |
19 | if err != nil { | |
20 | panic(err) | |
21 | } | |
22 | sumURL.Path = "/sum" | |
23 | ||
24 | concatURL, err := url.Parse(baseurl.String()) | |
25 | if err != nil { | |
26 | panic(err) | |
27 | } | |
28 | concatURL.Path = "/concat" | |
29 | ||
30 | return client{ | |
31 | Context: ctx, | |
32 | Logger: logger, | |
33 | sum: httptransport.NewClient( | |
34 | "GET", | |
35 | sumURL, | |
36 | server.EncodeSumRequest, | |
37 | server.DecodeSumResponse, | |
38 | httptransport.SetClient(c), | |
39 | ).Endpoint(), | |
40 | concat: httptransport.NewClient( | |
41 | "GET", | |
42 | concatURL, | |
43 | server.EncodeConcatRequest, | |
44 | server.DecodeConcatResponse, | |
45 | httptransport.SetClient(c), | |
46 | ).Endpoint(), | |
47 | } | |
48 | } | |
49 | ||
50 | type client struct { | |
51 | context.Context | |
52 | log.Logger | |
53 | sum endpoint.Endpoint | |
54 | concat endpoint.Endpoint | |
55 | } | |
56 | ||
57 | func (c client) Sum(a, b int) int { | |
58 | response, err := c.sum(c.Context, server.SumRequest{A: a, B: b}) | |
59 | if err != nil { | |
60 | c.Logger.Log("err", err) | |
61 | return 0 | |
62 | } | |
63 | return response.(server.SumResponse).V | |
64 | } | |
65 | ||
66 | func (c client) Concat(a, b string) string { | |
67 | response, err := c.concat(c.Context, server.ConcatRequest{A: a, B: b}) | |
68 | if err != nil { | |
69 | c.Logger.Log("err", err) | |
70 | return "" | |
71 | } | |
72 | return response.(server.ConcatResponse).V | |
73 | } |
0 | package httpjson | |
1 | ||
2 | import ( | |
3 | "io" | |
4 | "net/url" | |
5 | ||
6 | "github.com/go-kit/kit/endpoint" | |
7 | "github.com/go-kit/kit/examples/addsvc/server" | |
8 | httptransport "github.com/go-kit/kit/transport/http" | |
9 | ) | |
10 | ||
11 | // SumEndpointFactory transforms a http url into an Endpoint. | |
12 | // The path of the url is reset to /sum. | |
13 | func SumEndpointFactory(instance string) (endpoint.Endpoint, io.Closer, error) { | |
14 | sumURL, err := url.Parse(instance) | |
15 | if err != nil { | |
16 | return nil, nil, err | |
17 | } | |
18 | sumURL.Path = "/sum" | |
19 | ||
20 | client := httptransport.NewClient( | |
21 | "GET", | |
22 | sumURL, | |
23 | server.EncodeSumRequest, | |
24 | server.DecodeSumResponse, | |
25 | httptransport.SetClient(nil), | |
26 | ) | |
27 | ||
28 | return client.Endpoint(), nil, nil | |
29 | } | |
30 | ||
31 | // ConcatEndpointFactory transforms a http url into an Endpoint. | |
32 | // The path of the url is reset to /concat. | |
33 | func ConcatEndpointFactory(instance string) (endpoint.Endpoint, io.Closer, error) { | |
34 | concatURL, err := url.Parse(instance) | |
35 | if err != nil { | |
36 | return nil, nil, err | |
37 | } | |
38 | concatURL.Path = "/concat" | |
39 | ||
40 | client := httptransport.NewClient( | |
41 | "GET", | |
42 | concatURL, | |
43 | server.EncodeConcatRequest, | |
44 | server.DecodeConcatResponse, | |
45 | httptransport.SetClient(nil), | |
46 | ) | |
47 | ||
48 | return client.Endpoint(), nil, nil | |
49 | } |
2 | 2 | import ( |
3 | 3 | "flag" |
4 | 4 | "fmt" |
5 | "net/rpc" | |
6 | "net/url" | |
7 | 5 | "os" |
8 | 6 | "path/filepath" |
9 | 7 | "strconv" |
10 | 8 | "strings" |
11 | 9 | "time" |
12 | 10 | |
13 | "github.com/apache/thrift/lib/go/thrift" | |
14 | 11 | "golang.org/x/net/context" |
15 | "google.golang.org/grpc" | |
16 | 12 | |
13 | "github.com/go-kit/kit/endpoint" | |
17 | 14 | grpcclient "github.com/go-kit/kit/examples/addsvc/client/grpc" |
18 | 15 | httpjsonclient "github.com/go-kit/kit/examples/addsvc/client/httpjson" |
19 | 16 | netrpcclient "github.com/go-kit/kit/examples/addsvc/client/netrpc" |
20 | 17 | thriftclient "github.com/go-kit/kit/examples/addsvc/client/thrift" |
21 | "github.com/go-kit/kit/examples/addsvc/server" | |
22 | thriftadd "github.com/go-kit/kit/examples/addsvc/thrift/gen-go/add" | |
18 | "github.com/go-kit/kit/loadbalancer" | |
19 | "github.com/go-kit/kit/loadbalancer/static" | |
23 | 20 | "github.com/go-kit/kit/log" |
24 | 21 | ) |
25 | 22 | |
26 | 23 | func main() { |
27 | 24 | var ( |
28 | 25 | transport = flag.String("transport", "httpjson", "httpjson, grpc, netrpc, thrift") |
29 | httpAddr = flag.String("http.addr", "localhost:8001", "Address for HTTP (JSON) server") | |
30 | grpcAddr = flag.String("grpc.addr", "localhost:8002", "Address for gRPC server") | |
31 | netrpcAddr = flag.String("netrpc.addr", "localhost:8003", "Address for net/rpc server") | |
32 | thriftAddr = flag.String("thrift.addr", "localhost:8004", "Address for Thrift server") | |
26 | httpAddrs = flag.String("http.addrs", "localhost:8001", "Comma-separated list of addresses for HTTP (JSON) servers") | |
27 | grpcAddrs = flag.String("grpc.addrs", "localhost:8002", "Comma-separated list of addresses for gRPC servers") | |
28 | netrpcAddrs = flag.String("netrpc.addrs", "localhost:8003", "Comma-separated list of addresses for net/rpc servers") | |
29 | thriftAddrs = flag.String("thrift.addrs", "localhost:8004", "Comma-separated list of addresses for Thrift servers") | |
33 | 30 | thriftProtocol = flag.String("thrift.protocol", "binary", "binary, compact, json, simplejson") |
34 | 31 | thriftBufferSize = flag.Int("thrift.buffer.size", 0, "0 for unbuffered") |
35 | 32 | thriftFramed = flag.Bool("thrift.framed", false, "true to enable framing") |
41 | 38 | os.Exit(1) |
42 | 39 | } |
43 | 40 | |
41 | randomSeed := time.Now().UnixNano() | |
42 | ||
44 | 43 | root := context.Background() |
45 | 44 | method, s1, s2 := flag.Arg(0), flag.Arg(1), flag.Arg(2) |
46 | 45 | |
49 | 48 | logger = log.NewContext(logger).With("caller", log.DefaultCaller) |
50 | 49 | logger = log.NewContext(logger).With("transport", *transport) |
51 | 50 | |
52 | var svc server.AddService | |
51 | var ( | |
52 | instances []string | |
53 | sumFactory, concatFactory loadbalancer.Factory | |
54 | ) | |
55 | ||
53 | 56 | switch *transport { |
54 | 57 | case "grpc": |
55 | cc, err := grpc.Dial(*grpcAddr, grpc.WithInsecure()) | |
56 | if err != nil { | |
57 | logger.Log("err", err) | |
58 | os.Exit(1) | |
59 | } | |
60 | defer cc.Close() | |
61 | svc = grpcclient.New(root, cc, logger) | |
58 | instances = strings.Split(*grpcAddrs, ",") | |
59 | sumFactory = grpcclient.SumEndpointFactory | |
60 | concatFactory = grpcclient.ConcatEndpointFactory | |
62 | 61 | |
63 | 62 | case "httpjson": |
64 | rawurl := *httpAddr | |
65 | if !strings.HasPrefix("http", rawurl) { | |
66 | rawurl = "http://" + rawurl | |
63 | instances = strings.Split(*httpAddrs, ",") | |
64 | for i, rawurl := range instances { | |
65 | if !strings.HasPrefix("http", rawurl) { | |
66 | instances[i] = "http://" + rawurl | |
67 | } | |
67 | 68 | } |
68 | baseurl, err := url.Parse(rawurl) | |
69 | if err != nil { | |
70 | logger.Log("err", err) | |
71 | os.Exit(1) | |
72 | } | |
73 | svc = httpjsonclient.New(root, baseurl, logger, nil) | |
69 | sumFactory = httpjsonclient.SumEndpointFactory | |
70 | concatFactory = httpjsonclient.ConcatEndpointFactory | |
74 | 71 | |
75 | 72 | case "netrpc": |
76 | cli, err := rpc.DialHTTP("tcp", *netrpcAddr) | |
77 | if err != nil { | |
78 | logger.Log("err", err) | |
79 | os.Exit(1) | |
80 | } | |
81 | defer cli.Close() | |
82 | svc = netrpcclient.New(cli, logger) | |
73 | instances = strings.Split(*netrpcAddrs, ",") | |
74 | sumFactory = netrpcclient.SumEndpointFactory | |
75 | concatFactory = netrpcclient.ConcatEndpointFactory | |
83 | 76 | |
84 | 77 | case "thrift": |
85 | var protocolFactory thrift.TProtocolFactory | |
86 | switch *thriftProtocol { | |
87 | case "compact": | |
88 | protocolFactory = thrift.NewTCompactProtocolFactory() | |
89 | case "simplejson": | |
90 | protocolFactory = thrift.NewTSimpleJSONProtocolFactory() | |
91 | case "json": | |
92 | protocolFactory = thrift.NewTJSONProtocolFactory() | |
93 | case "binary", "": | |
94 | protocolFactory = thrift.NewTBinaryProtocolFactoryDefault() | |
95 | default: | |
96 | logger.Log("protocol", *thriftProtocol, "err", "invalid protocol") | |
97 | os.Exit(1) | |
98 | } | |
99 | var transportFactory thrift.TTransportFactory | |
100 | if *thriftBufferSize > 0 { | |
101 | transportFactory = thrift.NewTBufferedTransportFactory(*thriftBufferSize) | |
102 | } else { | |
103 | transportFactory = thrift.NewTTransportFactory() | |
104 | } | |
105 | if *thriftFramed { | |
106 | transportFactory = thrift.NewTFramedTransportFactory(transportFactory) | |
107 | } | |
108 | transportSocket, err := thrift.NewTSocket(*thriftAddr) | |
109 | if err != nil { | |
110 | logger.Log("during", "thrift.NewTSocket", "err", err) | |
111 | os.Exit(1) | |
112 | } | |
113 | trans := transportFactory.GetTransport(transportSocket) | |
114 | defer trans.Close() | |
115 | if err := trans.Open(); err != nil { | |
116 | logger.Log("during", "thrift transport.Open", "err", err) | |
117 | os.Exit(1) | |
118 | } | |
119 | cli := thriftadd.NewAddServiceClientFactory(trans, protocolFactory) | |
120 | svc = thriftclient.New(cli, logger) | |
78 | instances = strings.Split(*thriftAddrs, ",") | |
79 | thriftClient := thriftclient.New(*thriftProtocol, *thriftBufferSize, *thriftFramed, logger) | |
80 | sumFactory = thriftClient.SumEndpoint | |
81 | concatFactory = thriftClient.ConcatEndpoint | |
121 | 82 | |
122 | 83 | default: |
123 | 84 | logger.Log("err", "invalid transport") |
124 | 85 | os.Exit(1) |
125 | 86 | } |
87 | ||
88 | sum := buildEndpoint(instances, sumFactory, randomSeed, logger) | |
89 | concat := buildEndpoint(instances, concatFactory, randomSeed, logger) | |
90 | ||
91 | svc := newClient(root, sum, concat, logger) | |
126 | 92 | |
127 | 93 | begin := time.Now() |
128 | 94 | switch method { |
142 | 108 | os.Exit(1) |
143 | 109 | } |
144 | 110 | } |
111 | ||
112 | func buildEndpoint(instances []string, factory loadbalancer.Factory, seed int64, logger log.Logger) endpoint.Endpoint { | |
113 | publisher := static.NewPublisher(instances, factory, logger) | |
114 | random := loadbalancer.NewRandom(publisher, seed) | |
115 | return loadbalancer.Retry(10, 10*time.Second, random) | |
116 | } |
0 | package netrpc | |
1 | ||
2 | import ( | |
3 | "net/rpc" | |
4 | ||
5 | "github.com/go-kit/kit/examples/addsvc/server" | |
6 | "github.com/go-kit/kit/log" | |
7 | ) | |
8 | ||
9 | // New returns an AddService that's backed by the provided rpc.Client. | |
10 | func New(cli *rpc.Client, logger log.Logger) server.AddService { | |
11 | return client{cli, logger} | |
12 | } | |
13 | ||
14 | type client struct { | |
15 | *rpc.Client | |
16 | log.Logger | |
17 | } | |
18 | ||
19 | func (c client) Sum(a, b int) int { | |
20 | var reply server.SumResponse | |
21 | if err := c.Client.Call("addsvc.Sum", server.SumRequest{A: a, B: b}, &reply); err != nil { | |
22 | c.Logger.Log("err", err) | |
23 | return 0 | |
24 | } | |
25 | return reply.V | |
26 | } | |
27 | ||
28 | func (c client) Concat(a, b string) string { | |
29 | var reply server.ConcatResponse | |
30 | if err := c.Client.Call("addsvc.Concat", server.ConcatRequest{A: a, B: b}, &reply); err != nil { | |
31 | c.Logger.Log("err", err) | |
32 | return "" | |
33 | } | |
34 | return reply.V | |
35 | } |
0 | package netrpc | |
1 | ||
2 | import ( | |
3 | "io" | |
4 | "net/rpc" | |
5 | ||
6 | "golang.org/x/net/context" | |
7 | ||
8 | "github.com/go-kit/kit/endpoint" | |
9 | "github.com/go-kit/kit/examples/addsvc/server" | |
10 | ) | |
11 | ||
12 | // SumEndpointFactory transforms host:port strings into Endpoints. | |
13 | func SumEndpointFactory(instance string) (endpoint.Endpoint, io.Closer, error) { | |
14 | client, err := rpc.DialHTTP("tcp", instance) | |
15 | if err != nil { | |
16 | return nil, nil, err | |
17 | } | |
18 | ||
19 | return func(ctx context.Context, request interface{}) (interface{}, error) { | |
20 | var reply server.SumResponse | |
21 | if err := client.Call("addsvc.Sum", request.(server.SumRequest), &reply); err != nil { | |
22 | return server.SumResponse{}, err | |
23 | } | |
24 | return reply, nil | |
25 | }, client, nil | |
26 | } | |
27 | ||
28 | // ConcatEndpointFactory transforms host:port strings into Endpoints. | |
29 | func ConcatEndpointFactory(instance string) (endpoint.Endpoint, io.Closer, error) { | |
30 | client, err := rpc.DialHTTP("tcp", instance) | |
31 | if err != nil { | |
32 | return nil, nil, err | |
33 | } | |
34 | ||
35 | return func(ctx context.Context, request interface{}) (interface{}, error) { | |
36 | var reply server.ConcatResponse | |
37 | if err := client.Call("addsvc.Concat", request.(server.ConcatRequest), &reply); err != nil { | |
38 | return server.ConcatResponse{}, err | |
39 | } | |
40 | return reply, nil | |
41 | }, client, nil | |
42 | } |
0 | 0 | package thrift |
1 | 1 | |
2 | 2 | import ( |
3 | "io" | |
4 | ||
5 | "github.com/apache/thrift/lib/go/thrift" | |
6 | "github.com/go-kit/kit/endpoint" | |
3 | 7 | "github.com/go-kit/kit/examples/addsvc/server" |
4 | 8 | thriftadd "github.com/go-kit/kit/examples/addsvc/thrift/gen-go/add" |
5 | 9 | "github.com/go-kit/kit/log" |
10 | "golang.org/x/net/context" | |
6 | 11 | ) |
7 | 12 | |
8 | // New returns an AddService that's backed by the Thrift client. | |
9 | func New(cli *thriftadd.AddServiceClient, logger log.Logger) server.AddService { | |
10 | return &client{cli, logger} | |
13 | // New returns a stateful factory for Sum and Concat Endpoints | |
14 | func New(protocol string, bufferSize int, framed bool, logger log.Logger) client { | |
15 | var protocolFactory thrift.TProtocolFactory | |
16 | switch protocol { | |
17 | case "compact": | |
18 | protocolFactory = thrift.NewTCompactProtocolFactory() | |
19 | case "simplejson": | |
20 | protocolFactory = thrift.NewTSimpleJSONProtocolFactory() | |
21 | case "json": | |
22 | protocolFactory = thrift.NewTJSONProtocolFactory() | |
23 | case "binary", "": | |
24 | protocolFactory = thrift.NewTBinaryProtocolFactoryDefault() | |
25 | default: | |
26 | panic("invalid protocol") | |
27 | } | |
28 | ||
29 | var transportFactory thrift.TTransportFactory | |
30 | if bufferSize > 0 { | |
31 | transportFactory = thrift.NewTBufferedTransportFactory(bufferSize) | |
32 | } else { | |
33 | transportFactory = thrift.NewTTransportFactory() | |
34 | } | |
35 | if framed { | |
36 | transportFactory = thrift.NewTFramedTransportFactory(transportFactory) | |
37 | } | |
38 | ||
39 | return client{transportFactory, protocolFactory, logger} | |
11 | 40 | } |
12 | 41 | |
13 | 42 | type client struct { |
14 | *thriftadd.AddServiceClient | |
43 | thrift.TTransportFactory | |
44 | thrift.TProtocolFactory | |
15 | 45 | log.Logger |
16 | 46 | } |
17 | 47 | |
18 | func (c client) Sum(a, b int) int { | |
19 | reply, err := c.AddServiceClient.Sum(int64(a), int64(b)) | |
48 | // SumEndpointFactory transforms host:port strings into Endpoints. | |
49 | func (c client) SumEndpoint(instance string) (endpoint.Endpoint, io.Closer, error) { | |
50 | transportSocket, err := thrift.NewTSocket(instance) | |
20 | 51 | if err != nil { |
21 | c.Logger.Log("err", err) | |
22 | return 0 | |
52 | c.Logger.Log("during", "thrift.NewTSocket", "err", err) | |
53 | return nil, nil, err | |
23 | 54 | } |
24 | return int(reply.Value) | |
55 | trans := c.TTransportFactory.GetTransport(transportSocket) | |
56 | ||
57 | if err := trans.Open(); err != nil { | |
58 | c.Logger.Log("during", "thrift transport.Open", "err", err) | |
59 | return nil, nil, err | |
60 | } | |
61 | cli := thriftadd.NewAddServiceClientFactory(trans, c.TProtocolFactory) | |
62 | ||
63 | return func(ctx context.Context, request interface{}) (interface{}, error) { | |
64 | sumRequest := request.(server.SumRequest) | |
65 | reply, err := cli.Sum(int64(sumRequest.A), int64(sumRequest.B)) | |
66 | if err != nil { | |
67 | return server.SumResponse{}, err | |
68 | } | |
69 | return server.SumResponse{V: int(reply.Value)}, nil | |
70 | }, trans, nil | |
25 | 71 | } |
26 | 72 | |
27 | func (c client) Concat(a, b string) string { | |
28 | reply, err := c.AddServiceClient.Concat(a, b) | |
73 | // ConcatEndpointFactory transforms host:port strings into Endpoints. | |
74 | func (c client) ConcatEndpoint(instance string) (endpoint.Endpoint, io.Closer, error) { | |
75 | transportSocket, err := thrift.NewTSocket(instance) | |
29 | 76 | if err != nil { |
30 | c.Logger.Log("err", err) | |
31 | return "" | |
77 | c.Logger.Log("during", "thrift.NewTSocket", "err", err) | |
78 | return nil, nil, err | |
32 | 79 | } |
33 | return reply.Value | |
80 | trans := c.TTransportFactory.GetTransport(transportSocket) | |
81 | ||
82 | if err := trans.Open(); err != nil { | |
83 | c.Logger.Log("during", "thrift transport.Open", "err", err) | |
84 | return nil, nil, err | |
85 | } | |
86 | cli := thriftadd.NewAddServiceClientFactory(trans, c.TProtocolFactory) | |
87 | ||
88 | return func(ctx context.Context, request interface{}) (interface{}, error) { | |
89 | concatRequest := request.(server.ConcatRequest) | |
90 | reply, err := cli.Concat(concatRequest.A, concatRequest.B) | |
91 | if err != nil { | |
92 | return server.ConcatResponse{}, err | |
93 | } | |
94 | return server.ConcatResponse{V: reply.Value}, nil | |
95 | }, trans, nil | |
34 | 96 | } |
220 | 220 | errc <- err |
221 | 221 | return |
222 | 222 | } |
223 | transportLogger := log.NewContext(logger).With("transport", "net/rpc") | |
223 | transportLogger := log.NewContext(logger).With("transport", "thrift") | |
224 | 224 | transportLogger.Log("addr", *thriftAddr) |
225 | 225 | errc <- thrift.NewTSimpleServer4( |
226 | 226 | thriftadd.NewAddServiceProcessor(thriftBinding{svc}), |
12 | 12 | func DecodeSumRequest(r *http.Request) (interface{}, error) { |
13 | 13 | var request SumRequest |
14 | 14 | err := json.NewDecoder(r.Body).Decode(&request) |
15 | return request, err | |
15 | return &request, err | |
16 | 16 | } |
17 | 17 | |
18 | 18 | // EncodeSumResponse encodes the response to the provided HTTP response |
28 | 28 | func DecodeConcatRequest(r *http.Request) (interface{}, error) { |
29 | 29 | var request ConcatRequest |
30 | 30 | err := json.NewDecoder(r.Body).Decode(&request) |
31 | return request, err | |
31 | return &request, err | |
32 | 32 | } |
33 | 33 | |
34 | 34 | // EncodeConcatResponse encodes the response to the provided HTTP response |
17 | 17 | "github.com/gorilla/mux" |
18 | 18 | "github.com/hashicorp/consul/api" |
19 | 19 | "golang.org/x/net/context" |
20 | "google.golang.org/grpc" | |
21 | 20 | |
22 | 21 | "github.com/go-kit/kit/endpoint" |
23 | addsvc "github.com/go-kit/kit/examples/addsvc/client/grpc" | |
22 | "github.com/go-kit/kit/examples/addsvc/client/grpc" | |
24 | 23 | "github.com/go-kit/kit/examples/addsvc/server" |
25 | 24 | "github.com/go-kit/kit/loadbalancer" |
26 | 25 | "github.com/go-kit/kit/loadbalancer/consul" |
77 | 76 | factory loadbalancer.Factory |
78 | 77 | }{ |
79 | 78 | "addsvc": { |
80 | {path: "/api/addsvc/concat", factory: addsvcGRPCFactory(ctx, makeConcatEndpoint, logger)}, | |
81 | {path: "/api/addsvc/sum", factory: addsvcGRPCFactory(ctx, makeSumEndpoint, logger)}, | |
79 | {path: "/api/addsvc/concat", factory: grpc.ConcatEndpointFactory}, | |
80 | {path: "/api/addsvc/sum", factory: grpc.SumEndpointFactory}, | |
82 | 81 | }, |
83 | 82 | "stringsvc": { |
84 | 83 | {path: "/api/stringsvc/uppercase", factory: httpFactory(ctx, "GET", "uppercase/")}, |
129 | 128 | logger.Log("err", err) |
130 | 129 | return |
131 | 130 | } |
132 | } | |
133 | } | |
134 | ||
135 | func addsvcGRPCFactory(ctx context.Context, makeEndpoint func(server.AddService) endpoint.Endpoint, logger log.Logger) loadbalancer.Factory { | |
136 | return func(instance string) (endpoint.Endpoint, io.Closer, error) { | |
137 | var e endpoint.Endpoint | |
138 | conn, err := grpc.Dial(instance, grpc.WithInsecure()) | |
139 | if err != nil { | |
140 | return e, nil, err | |
141 | } | |
142 | svc := addsvc.New(ctx, conn, logger) | |
143 | return makeEndpoint(svc), nil, nil | |
144 | 131 | } |
145 | 132 | } |
146 | 133 |