package grpc
import (
"context"
"fmt"
"reflect"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"github.com/go-kit/kit/endpoint"
)
// Client wraps a gRPC connection and provides a method that implements
// endpoint.Endpoint.
type Client struct {
client *grpc.ClientConn
serviceName string
method string
enc EncodeRequestFunc
dec DecodeResponseFunc
grpcReply reflect.Type
before []ClientRequestFunc
after []ClientResponseFunc
finalizer []ClientFinalizerFunc
}
// NewClient constructs a usable Client for a single remote endpoint.
// Pass an zero-value protobuf message of the RPC response type as
// the grpcReply argument.
func NewClient(
cc *grpc.ClientConn,
serviceName string,
method string,
enc EncodeRequestFunc,
dec DecodeResponseFunc,
grpcReply interface{},
options ...ClientOption,
) *Client {
c := &Client{
client: cc,
method: fmt.Sprintf("/%s/%s", serviceName, method),
enc: enc,
dec: dec,
// We are using reflect.Indirect here to allow both reply structs and
// pointers to these reply structs. New consumers of the client should
// use structs directly, while existing consumers will not break if they
// remain to use pointers to structs.
grpcReply: reflect.TypeOf(
reflect.Indirect(
reflect.ValueOf(grpcReply),
).Interface(),
),
before: []ClientRequestFunc{},
after: []ClientResponseFunc{},
}
for _, option := range options {
option(c)
}
return c
}
// ClientOption sets an optional parameter for clients.
type ClientOption func(*Client)
// ClientBefore sets the RequestFuncs that are applied to the outgoing gRPC
// request before it's invoked.
func ClientBefore(before ...ClientRequestFunc) ClientOption {
return func(c *Client) { c.before = append(c.before, before...) }
}
// ClientAfter sets the ClientResponseFuncs that are applied to the incoming
// gRPC response prior to it being decoded. This is useful for obtaining
// response metadata and adding onto the context prior to decoding.
func ClientAfter(after ...ClientResponseFunc) ClientOption {
return func(c *Client) { c.after = append(c.after, after...) }
}
// ClientFinalizer is executed at the end of every gRPC request.
// By default, no finalizer is registered.
func ClientFinalizer(f ...ClientFinalizerFunc) ClientOption {
return func(s *Client) { s.finalizer = append(s.finalizer, f...) }
}
// Endpoint returns a usable endpoint that will invoke the gRPC specified by the
// client.
func (c Client) Endpoint() endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (response interface{}, err error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
if c.finalizer != nil {
defer func() {
for _, f := range c.finalizer {
f(ctx, err)
}
}()
}
ctx = context.WithValue(ctx, ContextKeyRequestMethod, c.method)
req, err := c.enc(ctx, request)
if err != nil {
return nil, err
}
md := &metadata.MD{}
for _, f := range c.before {
ctx = f(ctx, md)
}
ctx = metadata.NewOutgoingContext(ctx, *md)
var header, trailer metadata.MD
grpcReply := reflect.New(c.grpcReply).Interface()
if err = c.client.Invoke(
ctx, c.method, req, grpcReply, grpc.Header(&header),
grpc.Trailer(&trailer),
); err != nil {
return nil, err
}
for _, f := range c.after {
ctx = f(ctx, header, trailer)
}
response, err = c.dec(ctx, grpcReply)
if err != nil {
return nil, err
}
return response, nil
}
}
// ClientFinalizerFunc can be used to perform work at the end of a client gRPC
// request, after the response is returned. The principal
// intended use is for error logging. Additional response parameters are
// provided in the context under keys with the ContextKeyResponse prefix.
// Note: err may be nil. There maybe also no additional response parameters depending on
// when an error occurs.
type ClientFinalizerFunc func(ctx context.Context, err error)