package amqp
import (
"context"
"time"
"github.com/streadway/amqp"
)
// RequestFunc may take information from a publisher request and put it into a
// request context. In Subscribers, RequestFuncs are executed prior to invoking
// the endpoint.
type RequestFunc func(context.Context, *amqp.Publishing) context.Context
// SubscriberResponseFunc may take information from a request context and use it to
// manipulate a Publisher. SubscriberResponseFuncs are only executed in
// subscribers, after invoking the endpoint but prior to publishing a reply.
type SubscriberResponseFunc func(context.Context,
*amqp.Delivery,
Channel,
*amqp.Publishing,
) context.Context
// PublisherResponseFunc may take information from an AMQP request and make the
// response available for consumption. PublisherResponseFunc are only executed
// in publishers, after a request has been made, but prior to it being decoded.
type PublisherResponseFunc func(context.Context, *amqp.Delivery) context.Context
// SetPublishExchange returns a RequestFunc that sets the Exchange field
// of an AMQP Publish call.
func SetPublishExchange(publishExchange string) RequestFunc {
return func(ctx context.Context, pub *amqp.Publishing) context.Context {
return context.WithValue(ctx, ContextKeyExchange, publishExchange)
}
}
// SetPublishKey returns a RequestFunc that sets the Key field
// of an AMQP Publish call.
func SetPublishKey(publishKey string) RequestFunc {
return func(ctx context.Context, pub *amqp.Publishing) context.Context {
return context.WithValue(ctx, ContextKeyPublishKey, publishKey)
}
}
// SetPublishDeliveryMode sets the delivery mode of a Publishing.
// Please refer to AMQP delivery mode constants in the AMQP package.
func SetPublishDeliveryMode(dmode uint8) RequestFunc {
return func(ctx context.Context, pub *amqp.Publishing) context.Context {
pub.DeliveryMode = dmode
return ctx
}
}
// SetNackSleepDuration returns a RequestFunc that sets the amount of time
// to sleep in the event of a Nack.
// This has to be used in conjunction with an error encoder that Nack and sleeps.
// One example is the SingleNackRequeueErrorEncoder.
// It is designed to be used by Subscribers.
func SetNackSleepDuration(duration time.Duration) RequestFunc {
return func(ctx context.Context, pub *amqp.Publishing) context.Context {
return context.WithValue(ctx, ContextKeyNackSleepDuration, duration)
}
}
// SetConsumeAutoAck returns a RequestFunc that sets whether or not to autoAck
// messages when consuming.
// When set to false, the publisher will Ack the first message it receives with
// a matching correlationId.
// It is designed to be used by Publishers.
func SetConsumeAutoAck(autoAck bool) RequestFunc {
return func(ctx context.Context, pub *amqp.Publishing) context.Context {
return context.WithValue(ctx, ContextKeyAutoAck, autoAck)
}
}
// SetConsumeArgs returns a RequestFunc that set the arguments for amqp Consume
// function.
// It is designed to be used by Publishers.
func SetConsumeArgs(args amqp.Table) RequestFunc {
return func(ctx context.Context, pub *amqp.Publishing) context.Context {
return context.WithValue(ctx, ContextKeyConsumeArgs, args)
}
}
// SetContentType returns a RequestFunc that sets the ContentType field of
// an AMQP Publishing.
func SetContentType(contentType string) RequestFunc {
return func(ctx context.Context, pub *amqp.Publishing) context.Context {
pub.ContentType = contentType
return ctx
}
}
// SetContentEncoding returns a RequestFunc that sets the ContentEncoding field
// of an AMQP Publishing.
func SetContentEncoding(contentEncoding string) RequestFunc {
return func(ctx context.Context, pub *amqp.Publishing) context.Context {
pub.ContentEncoding = contentEncoding
return ctx
}
}
// SetCorrelationID returns a RequestFunc that sets the CorrelationId field
// of an AMQP Publishing.
func SetCorrelationID(cid string) RequestFunc {
return func(ctx context.Context, pub *amqp.Publishing) context.Context {
pub.CorrelationId = cid
return ctx
}
}
// SetAckAfterEndpoint returns a SubscriberResponseFunc that prompts the service
// to Ack the Delivery object after successfully evaluating the endpoint,
// and before it encodes the response.
// It is designed to be used by Subscribers.
func SetAckAfterEndpoint(multiple bool) SubscriberResponseFunc {
return func(ctx context.Context,
deliv *amqp.Delivery,
ch Channel,
pub *amqp.Publishing,
) context.Context {
deliv.Ack(multiple)
return ctx
}
}
func getPublishExchange(ctx context.Context) string {
if exchange := ctx.Value(ContextKeyExchange); exchange != nil {
return exchange.(string)
}
return ""
}
func getPublishKey(ctx context.Context) string {
if publishKey := ctx.Value(ContextKeyPublishKey); publishKey != nil {
return publishKey.(string)
}
return ""
}
func getNackSleepDuration(ctx context.Context) time.Duration {
if duration := ctx.Value(ContextKeyNackSleepDuration); duration != nil {
return duration.(time.Duration)
}
return 0
}
func getConsumeAutoAck(ctx context.Context) bool {
if autoAck := ctx.Value(ContextKeyAutoAck); autoAck != nil {
return autoAck.(bool)
}
return false
}
func getConsumeArgs(ctx context.Context) amqp.Table {
if args := ctx.Value(ContextKeyConsumeArgs); args != nil {
return args.(amqp.Table)
}
return nil
}
type contextKey int
const (
// ContextKeyExchange is the value of the reply Exchange in
// amqp.Publish.
ContextKeyExchange contextKey = iota
// ContextKeyPublishKey is the value of the ReplyTo field in
// amqp.Publish.
ContextKeyPublishKey
// ContextKeyNackSleepDuration is the duration to sleep for if the
// service Nack and requeues a message.
// This is to prevent sporadic send-resending of message
// when a message is constantly Nack'd and requeued.
ContextKeyNackSleepDuration
// ContextKeyAutoAck is the value of autoAck field when calling
// amqp.Channel.Consume.
ContextKeyAutoAck
// ContextKeyConsumeArgs is the value of consumeArgs field when calling
// amqp.Channel.Consume.
ContextKeyConsumeArgs
)