Codebase list golang-github-go-kit-kit / 30beee1
gRPC Server and Client helpers similar to the existing JSON over HTTP helpers. Bas van Beek 8 years ago
5 changed file(s) with 371 addition(s) and 0 deletion(s). Raw diff Collapse all Expand all
55 "strconv"
66
77 "golang.org/x/net/context"
8 "google.golang.org/grpc/metadata"
89
910 "github.com/go-kit/kit/endpoint"
1011 "github.com/go-kit/kit/log"
12 "github.com/go-kit/kit/transport/grpc"
1113 )
1214
1315 // In Zipkin, "spans are considered to start and stop with the client." The
2628 traceIDHTTPHeader = "X-B3-TraceId"
2729 spanIDHTTPHeader = "X-B3-SpanId"
2830 parentSpanIDHTTPHeader = "X-B3-ParentSpanId"
31 // gRPC keys are always lowercase
32 traceIDGRPCKey = "x-b3-traceid"
33 spanIDGRPCKey = "x-b3-spanid"
34 parentSpanIDGRPCKey = "x-b3-parentspanid"
2935
3036 // ClientSend is the annotation value used to mark a client sending a
3137 // request to a server.
97103 }
98104 }
99105
106 // ToGRPCContext returns a function that satisfies transport/grpc.BeforeFunc. It
107 // takes a Zipkin span from the incoming GRPC request, and saves it in the
108 // request context. It's designed to be wired into a server's GRPC transport
109 // Before stack. The logger is used to report errors.
110 func ToGRPCContext(newSpan NewSpanFunc, logger log.Logger) func(ctx context.Context, md *metadata.MD) context.Context {
111 return func(ctx context.Context, md *metadata.MD) context.Context {
112 return context.WithValue(ctx, SpanContextKey, fromGRPC(newSpan, *md, logger))
113 }
114 }
115
100116 // ToRequest returns a function that satisfies transport/http.BeforeFunc. It
101117 // takes a Zipkin span from the context, and injects it into the HTTP request.
102118 // It's designed to be wired into a client's HTTP transport Before stack. It's
121137 }
122138 }
123139
140 // ToGRPCRequest returns a function that satisfies transport/grpc.BeforeFunc. It
141 // takes a Zipkin span from the context, and injects it into the GRPC context.
142 // It's designed to be wired into a client's GRPC transport Before stack. It's
143 // expected that AnnotateClient has already ensured the span in the context is
144 // a child/client span.
145 func ToGRPCRequest(newSpan NewSpanFunc) func(ctx context.Context, md *metadata.MD) context.Context {
146 return func(ctx context.Context, md *metadata.MD) context.Context {
147 span, ok := fromContext(ctx)
148 if !ok {
149 span = newSpan(newID(), newID(), 0)
150 }
151 if id := span.TraceID(); id > 0 {
152 key, value := grpc.EncodeKeyValue(traceIDGRPCKey, strconv.FormatInt(id, 16))
153 (*md)[key] = append((*md)[key], value)
154 }
155 if id := span.SpanID(); id > 0 {
156 key, value := grpc.EncodeKeyValue(spanIDGRPCKey, strconv.FormatInt(id, 16))
157 (*md)[key] = append((*md)[key], value)
158 }
159 if id := span.ParentSpanID(); id > 0 {
160 key, value := grpc.EncodeKeyValue(parentSpanIDGRPCKey, strconv.FormatInt(id, 16))
161 (*md)[key] = append((*md)[key], value)
162 }
163 return ctx
164 }
165 }
166
124167 func fromHTTP(newSpan NewSpanFunc, r *http.Request, logger log.Logger) *Span {
125168 traceIDStr := r.Header.Get(traceIDHTTPHeader)
126169 if traceIDStr == "" {
148191 parentSpanID, err := strconv.ParseInt(parentSpanIDStr, 16, 64)
149192 if err != nil {
150193 logger.Log(parentSpanIDHTTPHeader, parentSpanIDStr, "err", err) // abnormal
194 parentSpanID = 0 // the only way to deal with it
195 }
196 return newSpan(traceID, spanID, parentSpanID)
197 }
198
199 func fromGRPC(newSpan NewSpanFunc, md metadata.MD, logger log.Logger) *Span {
200 traceIDSlc := md[traceIDGRPCKey]
201 pos := len(traceIDSlc) - 1
202 if pos < 0 {
203 return newSpan(newID(), newID(), 0) // normal; just make a new one
204 }
205 traceID, err := strconv.ParseInt(traceIDSlc[pos], 16, 64)
206 if err != nil {
207 logger.Log(traceIDHTTPHeader, traceIDSlc, "err", err)
208 return newSpan(newID(), newID(), 0)
209 }
210 spanIDSlc := md[spanIDGRPCKey]
211 pos = len(spanIDSlc) - 1
212 if pos < 0 {
213 spanIDSlc = make([]string, 1)
214 pos = 0
215 }
216 if spanIDSlc[pos] == "" {
217 logger.Log("msg", "trace ID without span ID") // abnormal
218 spanIDSlc[pos] = strconv.FormatInt(newID(), 64) // deal with it
219 }
220 spanID, err := strconv.ParseInt(spanIDSlc[len(spanIDSlc)-1], 16, 64)
221 if err != nil {
222 logger.Log(spanIDHTTPHeader, spanIDSlc, "err", err) // abnormal
223 spanID = newID() // deal with it
224 }
225 parentSpanIDSlc := md[parentSpanIDGRPCKey]
226 pos = len(parentSpanIDSlc) - 1
227 if pos < 0 {
228 parentSpanIDSlc = make([]string, 1)
229 pos = 0
230 }
231 if parentSpanIDSlc[pos] == "" {
232 parentSpanIDSlc[pos] = "0" // normal
233 }
234 parentSpanID, err := strconv.ParseInt(parentSpanIDSlc[pos], 16, 64)
235 if err != nil {
236 logger.Log(parentSpanIDHTTPHeader, parentSpanIDSlc, "err", err) // abnormal
151237 parentSpanID = 0 // the only way to deal with it
152238 }
153239 return newSpan(traceID, spanID, parentSpanID)
0 package grpc
1
2 import (
3 "fmt"
4
5 "golang.org/x/net/context"
6 "google.golang.org/grpc"
7 "google.golang.org/grpc/metadata"
8
9 "github.com/go-kit/kit/endpoint"
10 )
11
12 // Client wraps a gRPC connection and provides a method that implements
13 // endpoint.Endpoint.
14 type Client struct {
15 client *grpc.ClientConn
16 serviceName string
17 method string
18 enc EncodeRequestFunc
19 dec DecodeResponseFunc
20 grpcReply interface{}
21 before []RequestFunc
22 }
23
24 // NewClient returns a
25 func NewClient(cc *grpc.ClientConn, serviceName string, method string, enc EncodeRequestFunc, dec DecodeResponseFunc, grpcReply interface{}, options ...ClientOption) *Client {
26 c := &Client{
27 client: cc,
28 method: fmt.Sprintf("/pb.%s/%s", serviceName, method),
29 enc: enc,
30 dec: dec,
31 grpcReply: grpcReply,
32 before: []RequestFunc{},
33 }
34 for _, option := range options {
35 option(c)
36 }
37 return c
38 }
39
40 // ClientOption sets an optional parameter for clients.
41 type ClientOption func(*Client)
42
43 // SetClientBefore sets the RequestFuncs that are applied to the outgoing gRPC
44 // request before it's invoked.
45 func SetClientBefore(before ...RequestFunc) ClientOption {
46 return func(c *Client) { c.before = before }
47 }
48
49 // Endpoint returns a usable endpoint that will invoke the gRPC specified by the
50 // client.
51 func (c Client) Endpoint() endpoint.Endpoint {
52 return func(ctx context.Context, request interface{}) (interface{}, error) {
53 ctx, cancel := context.WithCancel(ctx)
54 defer cancel()
55
56 req, err := c.enc(ctx, request)
57 if err != nil {
58 return nil, fmt.Errorf("Encode: %v", err)
59 }
60
61 md := &metadata.MD{}
62 for _, f := range c.before {
63 ctx = f(ctx, md)
64 }
65 ctx = metadata.NewContext(ctx, *md)
66
67 err = grpc.Invoke(ctx, c.method, req, c.grpcReply, c.client)
68 if err != nil {
69 return nil, fmt.Errorf("Invoke: %v", err)
70 }
71
72 response, err := c.dec(ctx, c.grpcReply)
73 if err != nil {
74 return nil, fmt.Errorf("Decode: %v", err)
75 }
76
77 return response, nil
78 }
79 }
0 package grpc
1
2 import "golang.org/x/net/context"
3
4 // DecodeRequestFunc extracts a user-domain request object from a gRPC request.
5 // It's designed to be used in gRPC servers, for server-side endpoints. One
6 // straightforward DecodeRequestFunc could be something that
7 // decodes from the gRPC request message to the concrete request type.
8 type DecodeRequestFunc func(context.Context, interface{}) (request interface{}, err error)
9
10 // EncodeRequestFunc encodes the passed request object into the gRPC request
11 // object. It's designed to be used in gRPC clients, for client-side
12 // endpoints. One straightforward EncodeRequestFunc could something that
13 // encodes the object directly to the gRPC request message.
14 type EncodeRequestFunc func(context.Context, interface{}) (request interface{}, err error)
15
16 // EncodeResponseFunc encodes the passed response object to the gRPC response
17 // message. It's designed to be used in gRPC servers, for server-side
18 // endpoints. One straightforward EncodeResponseFunc could be something that
19 // encodes the object directly to the gRPC response message.
20 type EncodeResponseFunc func(context.Context, interface{}) (response interface{}, err error)
21
22 // DecodeResponseFunc extracts a user-domain response object from a gRPC
23 // response object. It's designed to be used in gRPC clients, for client-side
24 // endpoints. One straightforward DecodeResponseFunc could be something that
25 // decodes from the gRPC response message to the concrete response type.
26 type DecodeResponseFunc func(context.Context, interface{}) (response interface{}, err error)
0 package grpc
1
2 import (
3 "encoding/base64"
4 "strings"
5
6 "golang.org/x/net/context"
7 "google.golang.org/grpc/metadata"
8 )
9
10 const (
11 binHdrSuffix = "-bin"
12 )
13
14 // RequestFunc may take information from an gRPC request and put it into a
15 // request context. In Servers, BeforeFuncs are executed prior to invoking the
16 // endpoint. In Clients, BeforeFuncs are executed after creating the request
17 // but prior to invoking the gRPC client.
18 type RequestFunc func(context.Context, *metadata.MD) context.Context
19
20 // ResponseFunc may take information from a request context and use it to
21 // manipulate the gRPC metadata header. ResponseFuncs are only executed in
22 // servers, after invoking the endpoint but prior to writing a response.
23 type ResponseFunc func(context.Context, *metadata.MD)
24
25 // SetResponseHeader returns a ResponseFunc that sets the specified metadata
26 // key-value pair.
27 func SetResponseHeader(key, val string) ResponseFunc {
28 return func(_ context.Context, md *metadata.MD) {
29 key, val := EncodeKeyValue(key, val)
30 (*md)[key] = append((*md)[key], val)
31 }
32 }
33
34 // SetRequestHeader returns a RequestFunc that sets the specified metadata
35 // key-value pair..
36 func SetRequestHeader(key, val string) RequestFunc {
37 return func(ctx context.Context, md *metadata.MD) context.Context {
38 key, val := EncodeKeyValue(key, val)
39 (*md)[key] = append((*md)[key], val)
40 return ctx
41 }
42 }
43
44 // EncodeKeyValue sanitizes a key-value pair for use in gRPC metadata headers.
45 func EncodeKeyValue(key, val string) (string, string) {
46 key = strings.ToLower(key)
47 if strings.HasSuffix(key, binHdrSuffix) {
48 v := base64.StdEncoding.EncodeToString([]byte(val))
49 val = string(v)
50 }
51 return key, val
52 }
0 package grpc
1
2 import (
3 "golang.org/x/net/context"
4 "google.golang.org/grpc/metadata"
5
6 "github.com/go-kit/kit/endpoint"
7 "github.com/go-kit/kit/log"
8 )
9
10 // Handler which should be called from the grpc binding of the service
11 // implementation.
12 type Handler interface {
13 ServeGRPC(context.Context, interface{}) (context.Context, interface{}, error)
14 }
15
16 // Server wraps an endpoint and implements grpc.Handler.
17 type Server struct {
18 ctx context.Context
19 e endpoint.Endpoint
20 dec DecodeRequestFunc
21 enc EncodeResponseFunc
22 before []RequestFunc
23 after []ResponseFunc
24 logger log.Logger
25 }
26
27 // NewServer constructs a new server, which implements grpc.Server and wraps
28 // the provided endpoint.
29 func NewServer(
30 ctx context.Context,
31 e endpoint.Endpoint,
32 dec DecodeRequestFunc,
33 enc EncodeResponseFunc,
34 options ...ServerOption,
35 ) *Server {
36 s := &Server{
37 ctx: ctx,
38 e: e,
39 dec: dec,
40 enc: enc,
41 // errorEncoder: defaultErrorEncoder,
42 logger: log.NewNopLogger(),
43 }
44 for _, option := range options {
45 option(s)
46 }
47 return s
48 }
49
50 // ServerOption sets an optional parameter for servers.
51 type ServerOption func(*Server)
52
53 // ServerBefore functions are executed on the HTTP request object before the
54 // request is decoded.
55 func ServerBefore(before ...RequestFunc) ServerOption {
56 return func(s *Server) { s.before = before }
57 }
58
59 // ServerAfter functions are executed on the HTTP response writer after the
60 // endpoint is invoked, but before anything is written to the client.
61 func ServerAfter(after ...ResponseFunc) ServerOption {
62 return func(s *Server) { s.after = after }
63 }
64
65 // ServerErrorLogger is used to log non-terminal errors. By default, no errors
66 // are logged.
67 func ServerErrorLogger(logger log.Logger) ServerOption {
68 return func(s *Server) { s.logger = logger }
69 }
70
71 // ServeGRPC implements grpc.Handler
72 func (s Server) ServeGRPC(grpcCtx context.Context, r interface{}) (context.Context, interface{}, error) {
73 ctx, cancel := context.WithCancel(s.ctx)
74 defer cancel()
75
76 // retrieve gRPC metadata
77 md, ok := metadata.FromContext(grpcCtx)
78 if !ok {
79 md = metadata.MD{}
80 }
81
82 for _, f := range s.before {
83 ctx = f(ctx, &md)
84 }
85
86 // store potentially updated metadata in the gRPC context
87 grpcCtx = metadata.NewContext(grpcCtx, md)
88
89 request, err := s.dec(grpcCtx, r)
90 if err != nil {
91 s.logger.Log("err", err)
92 return grpcCtx, nil, BadRequestError{err}
93 }
94
95 response, err := s.e(ctx, request)
96 if err != nil {
97 s.logger.Log("err", err)
98 return grpcCtx, nil, err
99 }
100
101 for _, f := range s.after {
102 f(ctx, &md)
103 }
104
105 // store potentially updated metadata in the gRPC context
106 grpcCtx = metadata.NewContext(grpcCtx, md)
107
108 grpcResp, err := s.enc(grpcCtx, response)
109 if err != nil {
110 s.logger.Log("err", err)
111 return grpcCtx, nil, err
112 }
113 return grpcCtx, grpcResp, nil
114 }
115
116 // BadRequestError is an error in decoding the request.
117 type BadRequestError struct {
118 Err error
119 }
120
121 // Error implements the error interface.
122 func (err BadRequestError) Error() string {
123 return err.Err.Error()
124 }