Codebase list golang-github-go-kit-kit / e9965a6 transport / grpc / client.go
e9965a6

Tree @e9965a6 (Download .tar.gz)

client.go @e9965a6

30beee1
 
 
45cd825
30beee1
c095d92
111d8a1
30beee1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c095d92
30beee1
e9965a6
30beee1
 
18f8d91
574f3ab
3a993fa
18f8d91
 
 
 
 
 
 
 
 
111d8a1
 
 
30beee1
c095d92
111d8a1
c095d92
 
bf7c05f
 
 
 
c095d92
 
 
 
 
 
e9965a6
30beee1
 
 
 
 
 
 
 
 
 
fe0aa96
30beee1
fe0aa96
e9965a6
 
 
 
 
 
 
 
30beee1
 
 
 
 
 
 
 
 
 
 
b693d11
30beee1
 
 
 
 
 
 
 
e9965a6
c095d92
e9965a6
 
 
 
b693d11
30beee1
 
e9965a6
 
 
 
c095d92
30beee1
b693d11
30beee1
 
 
 
package grpc

import (
	"context"
	"fmt"
	"reflect"
	"strings"

	"google.golang.org/grpc"
	"google.golang.org/grpc/metadata"

	"github.com/go-kit/kit/endpoint"
)

// Client wraps a gRPC connection and provides a method that implements
// endpoint.Endpoint.
type Client struct {
	client      *grpc.ClientConn
	serviceName string
	method      string
	enc         EncodeRequestFunc
	dec         DecodeResponseFunc
	grpcReply   reflect.Type
	before      []RequestFunc
	after       []ClientResponseFunc
}

// NewClient constructs a usable Client for a single remote endpoint.
// Pass an zero-value protobuf message of the RPC response type as
// the grpcReply argument.
func NewClient(
	cc *grpc.ClientConn,
	serviceName string,
	method string,
	enc EncodeRequestFunc,
	dec DecodeResponseFunc,
	grpcReply interface{},
	options ...ClientOption,
) *Client {
	if strings.IndexByte(serviceName, '.') == -1 {
		serviceName = "pb." + serviceName
	}
	c := &Client{
		client: cc,
		method: fmt.Sprintf("/%s/%s", serviceName, method),
		enc:    enc,
		dec:    dec,
		// We are using reflect.Indirect here to allow both reply structs and
		// pointers to these reply structs. New consumers of the client should
		// use structs directly, while existing consumers will not break if they
		// remain to use pointers to structs.
		grpcReply: reflect.TypeOf(
			reflect.Indirect(
				reflect.ValueOf(grpcReply),
			).Interface(),
		),
		before: []RequestFunc{},
		after:  []ClientResponseFunc{},
	}
	for _, option := range options {
		option(c)
	}
	return c
}

// ClientOption sets an optional parameter for clients.
type ClientOption func(*Client)

// ClientBefore sets the RequestFuncs that are applied to the outgoing gRPC
// request before it's invoked.
func ClientBefore(before ...RequestFunc) ClientOption {
	return func(c *Client) { c.before = append(c.before, before...) }
}

// ClientAfter sets the ClientResponseFuncs that are applied to the incoming
// gRPC response prior to it being decoded. This is useful for obtaining
// response metadata and adding onto the context prior to decoding.
func ClientAfter(after ...ClientResponseFunc) ClientOption {
	return func(c *Client) { c.after = append(c.after, after...) }
}

// Endpoint returns a usable endpoint that will invoke the gRPC specified by the
// client.
func (c Client) Endpoint() endpoint.Endpoint {
	return func(ctx context.Context, request interface{}) (interface{}, error) {
		ctx, cancel := context.WithCancel(ctx)
		defer cancel()

		req, err := c.enc(ctx, request)
		if err != nil {
			return nil, err
		}

		md := &metadata.MD{}
		for _, f := range c.before {
			ctx = f(ctx, md)
		}
		ctx = metadata.NewContext(ctx, *md)

		var header, trailer metadata.MD
		grpcReply := reflect.New(c.grpcReply).Interface()
		if err = grpc.Invoke(
			ctx, c.method, req, grpcReply, c.client,
			grpc.Header(&header), grpc.Trailer(&trailer),
		); err != nil {
			return nil, err
		}

		for _, f := range c.after {
			ctx = f(ctx, &header, &trailer)
		}

		response, err := c.dec(ctx, grpcReply)
		if err != nil {
			return nil, err
		}
		return response, nil
	}
}