package nats
import (
"context"
"encoding/json"
"github.com/go-kit/kit/endpoint"
"github.com/go-kit/kit/transport"
"github.com/go-kit/log"
"github.com/nats-io/nats.go"
)
// Subscriber wraps an endpoint and provides nats.MsgHandler.
type Subscriber struct {
e endpoint.Endpoint
dec DecodeRequestFunc
enc EncodeResponseFunc
before []RequestFunc
after []SubscriberResponseFunc
errorEncoder ErrorEncoder
finalizer []SubscriberFinalizerFunc
errorHandler transport.ErrorHandler
}
// NewSubscriber constructs a new subscriber, which provides nats.MsgHandler and wraps
// the provided endpoint.
func NewSubscriber(
e endpoint.Endpoint,
dec DecodeRequestFunc,
enc EncodeResponseFunc,
options ...SubscriberOption,
) *Subscriber {
s := &Subscriber{
e: e,
dec: dec,
enc: enc,
errorEncoder: DefaultErrorEncoder,
errorHandler: transport.NewLogErrorHandler(log.NewNopLogger()),
}
for _, option := range options {
option(s)
}
return s
}
// SubscriberOption sets an optional parameter for subscribers.
type SubscriberOption func(*Subscriber)
// SubscriberBefore functions are executed on the publisher request object before the
// request is decoded.
func SubscriberBefore(before ...RequestFunc) SubscriberOption {
return func(s *Subscriber) { s.before = append(s.before, before...) }
}
// SubscriberAfter functions are executed on the subscriber reply after the
// endpoint is invoked, but before anything is published to the reply.
func SubscriberAfter(after ...SubscriberResponseFunc) SubscriberOption {
return func(s *Subscriber) { s.after = append(s.after, after...) }
}
// SubscriberErrorEncoder is used to encode errors to the subscriber reply
// whenever they're encountered in the processing of a request. Clients can
// use this to provide custom error formatting. By default,
// errors will be published with the DefaultErrorEncoder.
func SubscriberErrorEncoder(ee ErrorEncoder) SubscriberOption {
return func(s *Subscriber) { s.errorEncoder = ee }
}
// SubscriberErrorLogger is used to log non-terminal errors. By default, no errors
// are logged. This is intended as a diagnostic measure. Finer-grained control
// of error handling, including logging in more detail, should be performed in a
// custom SubscriberErrorEncoder which has access to the context.
// Deprecated: Use SubscriberErrorHandler instead.
func SubscriberErrorLogger(logger log.Logger) SubscriberOption {
return func(s *Subscriber) { s.errorHandler = transport.NewLogErrorHandler(logger) }
}
// SubscriberErrorHandler is used to handle non-terminal errors. By default, non-terminal errors
// are ignored. This is intended as a diagnostic measure. Finer-grained control
// of error handling, including logging in more detail, should be performed in a
// custom SubscriberErrorEncoder which has access to the context.
func SubscriberErrorHandler(errorHandler transport.ErrorHandler) SubscriberOption {
return func(s *Subscriber) { s.errorHandler = errorHandler }
}
// SubscriberFinalizer is executed at the end of every request from a publisher through NATS.
// By default, no finalizer is registered.
func SubscriberFinalizer(f ...SubscriberFinalizerFunc) SubscriberOption {
return func(s *Subscriber) { s.finalizer = f }
}
// ServeMsg provides nats.MsgHandler.
func (s Subscriber) ServeMsg(nc *nats.Conn) func(msg *nats.Msg) {
return func(msg *nats.Msg) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
if len(s.finalizer) > 0 {
defer func() {
for _, f := range s.finalizer {
f(ctx, msg)
}
}()
}
for _, f := range s.before {
ctx = f(ctx, msg)
}
request, err := s.dec(ctx, msg)
if err != nil {
s.errorHandler.Handle(ctx, err)
if msg.Reply == "" {
return
}
s.errorEncoder(ctx, err, msg.Reply, nc)
return
}
response, err := s.e(ctx, request)
if err != nil {
s.errorHandler.Handle(ctx, err)
if msg.Reply == "" {
return
}
s.errorEncoder(ctx, err, msg.Reply, nc)
return
}
for _, f := range s.after {
ctx = f(ctx, nc)
}
if msg.Reply == "" {
return
}
if err := s.enc(ctx, msg.Reply, nc, response); err != nil {
s.errorHandler.Handle(ctx, err)
s.errorEncoder(ctx, err, msg.Reply, nc)
return
}
}
}
// ErrorEncoder is responsible for encoding an error to the subscriber reply.
// Users are encouraged to use custom ErrorEncoders to encode errors to
// their replies, and will likely want to pass and check for their own error
// types.
type ErrorEncoder func(ctx context.Context, err error, reply string, nc *nats.Conn)
// ServerFinalizerFunc can be used to perform work at the end of an request
// from a publisher, after the response has been written to the publisher. The principal
// intended use is for request logging.
type SubscriberFinalizerFunc func(ctx context.Context, msg *nats.Msg)
// NopRequestDecoder is a DecodeRequestFunc that can be used for requests that do not
// need to be decoded, and simply returns nil, nil.
func NopRequestDecoder(_ context.Context, _ *nats.Msg) (interface{}, error) {
return nil, nil
}
// EncodeJSONResponse is a EncodeResponseFunc that serializes the response as a
// JSON object to the subscriber reply. Many JSON-over services can use it as
// a sensible default.
func EncodeJSONResponse(_ context.Context, reply string, nc *nats.Conn, response interface{}) error {
b, err := json.Marshal(response)
if err != nil {
return err
}
return nc.Publish(reply, b)
}
// DefaultErrorEncoder writes the error to the subscriber reply.
func DefaultErrorEncoder(_ context.Context, err error, reply string, nc *nats.Conn) {
logger := log.NewNopLogger()
type Response struct {
Error string `json:"err"`
}
var response Response
response.Error = err.Error()
b, err := json.Marshal(response)
if err != nil {
logger.Log("err", err)
return
}
if err := nc.Publish(reply, b); err != nil {
logger.Log("err", err)
}
}