package amqp
import (
"context"
"encoding/json"
"time"
"github.com/go-kit/kit/endpoint"
"github.com/go-kit/kit/log"
"github.com/streadway/amqp"
)
// Subscriber wraps an endpoint and provides a handler for AMQP Delivery messages.
type Subscriber struct {
e endpoint.Endpoint
dec DecodeRequestFunc
enc EncodeResponseFunc
before []RequestFunc
after []SubscriberResponseFunc
errorEncoder ErrorEncoder
logger log.Logger
}
// NewSubscriber constructs a new subscriber, which provides a handler
// for AMQP Delivery messages.
func NewSubscriber(
e endpoint.Endpoint,
dec DecodeRequestFunc,
enc EncodeResponseFunc,
options ...SubscriberOption,
) *Subscriber {
s := &Subscriber{
e: e,
dec: dec,
enc: enc,
errorEncoder: DefaultErrorEncoder,
logger: log.NewNopLogger(),
}
for _, option := range options {
option(s)
}
return s
}
// SubscriberOption sets an optional parameter for subscribers.
type SubscriberOption func(*Subscriber)
// SubscriberBefore functions are executed on the publisher delivery object
// before the request is decoded.
func SubscriberBefore(before ...RequestFunc) SubscriberOption {
return func(s *Subscriber) { s.before = append(s.before, before...) }
}
// SubscriberAfter functions are executed on the subscriber reply after the
// endpoint is invoked, but before anything is published to the reply.
func SubscriberAfter(after ...SubscriberResponseFunc) SubscriberOption {
return func(s *Subscriber) { s.after = append(s.after, after...) }
}
// SubscriberErrorEncoder is used to encode errors to the subscriber reply
// whenever they're encountered in the processing of a request. Clients can
// use this to provide custom error formatting. By default,
// errors will be published with the DefaultErrorEncoder.
func SubscriberErrorEncoder(ee ErrorEncoder) SubscriberOption {
return func(s *Subscriber) { s.errorEncoder = ee }
}
// SubscriberErrorLogger is used to log non-terminal errors. By default, no errors
// are logged. This is intended as a diagnostic measure. Finer-grained control
// of error handling, including logging in more detail, should be performed in a
// custom SubscriberErrorEncoder which has access to the context.
func SubscriberErrorLogger(logger log.Logger) SubscriberOption {
return func(s *Subscriber) { s.logger = logger }
}
// ServeDelivery handles AMQP Delivery messages
// It is strongly recommended to use *amqp.Channel as the
// Channel interface implementation.
func (s Subscriber) ServeDelivery(ch Channel) func(deliv *amqp.Delivery) {
return func(deliv *amqp.Delivery) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
pub := amqp.Publishing{}
for _, f := range s.before {
ctx = f(ctx, &pub)
}
request, err := s.dec(ctx, deliv)
if err != nil {
s.logger.Log("err", err)
s.errorEncoder(ctx, err, deliv, ch, &pub)
return
}
response, err := s.e(ctx, request)
if err != nil {
s.logger.Log("err", err)
s.errorEncoder(ctx, err, deliv, ch, &pub)
return
}
for _, f := range s.after {
ctx = f(ctx, deliv, ch, &pub)
}
if err := s.enc(ctx, &pub, response); err != nil {
s.logger.Log("err", err)
s.errorEncoder(ctx, err, deliv, ch, &pub)
return
}
if err := s.publishResponse(ctx, deliv, ch, &pub); err != nil {
s.logger.Log("err", err)
s.errorEncoder(ctx, err, deliv, ch, &pub)
return
}
}
}
func (s Subscriber) publishResponse(
ctx context.Context,
deliv *amqp.Delivery,
ch Channel,
pub *amqp.Publishing,
) error {
if pub.CorrelationId == "" {
pub.CorrelationId = deliv.CorrelationId
}
replyExchange := getPublishExchange(ctx)
replyTo := getPublishKey(ctx)
if replyTo == "" {
replyTo = deliv.ReplyTo
}
return ch.Publish(
replyExchange,
replyTo,
false, // mandatory
false, // immediate
*pub,
)
}
// EncodeJSONResponse marshals the response as JSON as part of the
// payload of the AMQP Publishing object.
func EncodeJSONResponse(
ctx context.Context,
pub *amqp.Publishing,
response interface{},
) error {
b, err := json.Marshal(response)
if err != nil {
return err
}
pub.Body = b
return nil
}
// EncodeNopResponse is a response function that does nothing.
func EncodeNopResponse(
ctx context.Context,
pub *amqp.Publishing,
response interface{},
) error {
return nil
}
// ErrorEncoder is responsible for encoding an error to the subscriber reply.
// Users are encouraged to use custom ErrorEncoders to encode errors to
// their replies, and will likely want to pass and check for their own error
// types.
type ErrorEncoder func(ctx context.Context,
err error, deliv *amqp.Delivery, ch Channel, pub *amqp.Publishing)
// DefaultErrorEncoder simply ignores the message. It does not reply
// nor Ack/Nack the message.
func DefaultErrorEncoder(ctx context.Context,
err error, deliv *amqp.Delivery, ch Channel, pub *amqp.Publishing) {
}
// SingleNackRequeueErrorEncoder issues a Nack to the delivery with multiple flag set as false
// and requeue flag set as true. It does not reply the message.
func SingleNackRequeueErrorEncoder(ctx context.Context,
err error, deliv *amqp.Delivery, ch Channel, pub *amqp.Publishing) {
deliv.Nack(
false, //multiple
true, //requeue
)
duration := getNackSleepDuration(ctx)
time.Sleep(duration)
}
// ReplyErrorEncoder serializes the error message as a DefaultErrorResponse
// JSON and sends the message to the ReplyTo address.
func ReplyErrorEncoder(
ctx context.Context,
err error,
deliv *amqp.Delivery,
ch Channel,
pub *amqp.Publishing,
) {
if pub.CorrelationId == "" {
pub.CorrelationId = deliv.CorrelationId
}
replyExchange := getPublishExchange(ctx)
replyTo := getPublishKey(ctx)
if replyTo == "" {
replyTo = deliv.ReplyTo
}
response := DefaultErrorResponse{err.Error()}
b, err := json.Marshal(response)
if err != nil {
return
}
pub.Body = b
ch.Publish(
replyExchange,
replyTo,
false, // mandatory
false, // immediate
*pub,
)
}
// ReplyAndAckErrorEncoder serializes the error message as a DefaultErrorResponse
// JSON and sends the message to the ReplyTo address then Acks the original
// message.
func ReplyAndAckErrorEncoder(ctx context.Context, err error, deliv *amqp.Delivery, ch Channel, pub *amqp.Publishing) {
ReplyErrorEncoder(ctx, err, deliv, ch, pub)
deliv.Ack(false)
}
// DefaultErrorResponse is the default structure of responses in the event
// of an error.
type DefaultErrorResponse struct {
Error string `json:"err"`
}
// Channel is a channel interface to make testing possible.
// It is highly recommended to use *amqp.Channel as the interface implementation.
type Channel interface {
Publish(exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error
Consume(queue, consumer string, autoAck, exclusive, noLocal, noWail bool, args amqp.Table) (<-chan amqp.Delivery, error)
}