diff --git a/transport/amqp/encode-decode.go b/transport/amqp/encode-decode.go deleted file mode 100644 index cfa1446..0000000 --- a/transport/amqp/encode-decode.go +++ /dev/null @@ -1,23 +0,0 @@ -package amqp - -import ( - "context" - - "github.com/streadway/amqp" -) - -// DecodeRequestFunc extracts a user-domain request object from -// an AMQP Delivery object. It is designed to be used in AMQP Subscribers. -type DecodeRequestFunc func(context.Context, *amqp.Delivery) (request interface{}, err error) - -// EncodeRequestFunc encodes the passed request object into -// an AMQP Publishing object. It is designed to be used in AMQP Publishers. -type EncodeRequestFunc func(context.Context, *amqp.Publishing, interface{}) error - -// EncodeResponseFunc encodes the passed response object to -// an AMQP Publishing object. It is designed to be used in AMQP Subscribers. -type EncodeResponseFunc func(context.Context, *amqp.Publishing, interface{}) error - -// DecodeResponseFunc extracts a user-domain response object from -// an AMQP Delivery object. It is designed to be used in AMQP Publishers. -type DecodeResponseFunc func(context.Context, *amqp.Delivery) (response interface{}, err error) diff --git a/transport/amqp/encode_decode.go b/transport/amqp/encode_decode.go new file mode 100644 index 0000000..cfa1446 --- /dev/null +++ b/transport/amqp/encode_decode.go @@ -0,0 +1,23 @@ +package amqp + +import ( + "context" + + "github.com/streadway/amqp" +) + +// DecodeRequestFunc extracts a user-domain request object from +// an AMQP Delivery object. It is designed to be used in AMQP Subscribers. +type DecodeRequestFunc func(context.Context, *amqp.Delivery) (request interface{}, err error) + +// EncodeRequestFunc encodes the passed request object into +// an AMQP Publishing object. It is designed to be used in AMQP Publishers. +type EncodeRequestFunc func(context.Context, *amqp.Publishing, interface{}) error + +// EncodeResponseFunc encodes the passed response object to +// an AMQP Publishing object. It is designed to be used in AMQP Subscribers. +type EncodeResponseFunc func(context.Context, *amqp.Publishing, interface{}) error + +// DecodeResponseFunc extracts a user-domain response object from +// an AMQP Delivery object. It is designed to be used in AMQP Publishers. +type DecodeResponseFunc func(context.Context, *amqp.Delivery) (response interface{}, err error) diff --git a/transport/amqp/publisher.go b/transport/amqp/publisher.go index a28ee94..f2d62e1 100644 --- a/transport/amqp/publisher.go +++ b/transport/amqp/publisher.go @@ -82,7 +82,8 @@ } for _, f := range p.before { - ctx = f(ctx, &pub) + // Affect only amqp.Publishing + ctx = f(ctx, &pub, nil) } deliv, err := p.publishAndConsumeFirstMatchingResponse(ctx, &pub) diff --git a/transport/amqp/request_response_func.go b/transport/amqp/request_response_func.go index a6f730f..1409240 100644 --- a/transport/amqp/request_response_func.go +++ b/transport/amqp/request_response_func.go @@ -10,7 +10,7 @@ // RequestFunc may take information from a publisher request and put it into a // request context. In Subscribers, RequestFuncs are executed prior to invoking // the endpoint. -type RequestFunc func(context.Context, *amqp.Publishing) context.Context +type RequestFunc func(context.Context, *amqp.Publishing, *amqp.Delivery) context.Context // SubscriberResponseFunc may take information from a request context and use it to // manipulate a Publisher. SubscriberResponseFuncs are only executed in @@ -29,7 +29,7 @@ // SetPublishExchange returns a RequestFunc that sets the Exchange field // of an AMQP Publish call. func SetPublishExchange(publishExchange string) RequestFunc { - return func(ctx context.Context, pub *amqp.Publishing) context.Context { + return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context { return context.WithValue(ctx, ContextKeyExchange, publishExchange) } } @@ -37,7 +37,7 @@ // SetPublishKey returns a RequestFunc that sets the Key field // of an AMQP Publish call. func SetPublishKey(publishKey string) RequestFunc { - return func(ctx context.Context, pub *amqp.Publishing) context.Context { + return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context { return context.WithValue(ctx, ContextKeyPublishKey, publishKey) } } @@ -45,7 +45,7 @@ // SetPublishDeliveryMode sets the delivery mode of a Publishing. // Please refer to AMQP delivery mode constants in the AMQP package. func SetPublishDeliveryMode(dmode uint8) RequestFunc { - return func(ctx context.Context, pub *amqp.Publishing) context.Context { + return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context { pub.DeliveryMode = dmode return ctx } @@ -57,7 +57,7 @@ // One example is the SingleNackRequeueErrorEncoder. // It is designed to be used by Subscribers. func SetNackSleepDuration(duration time.Duration) RequestFunc { - return func(ctx context.Context, pub *amqp.Publishing) context.Context { + return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context { return context.WithValue(ctx, ContextKeyNackSleepDuration, duration) } } @@ -68,7 +68,7 @@ // a matching correlationId. // It is designed to be used by Publishers. func SetConsumeAutoAck(autoAck bool) RequestFunc { - return func(ctx context.Context, pub *amqp.Publishing) context.Context { + return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context { return context.WithValue(ctx, ContextKeyAutoAck, autoAck) } } @@ -77,7 +77,7 @@ // function. // It is designed to be used by Publishers. func SetConsumeArgs(args amqp.Table) RequestFunc { - return func(ctx context.Context, pub *amqp.Publishing) context.Context { + return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context { return context.WithValue(ctx, ContextKeyConsumeArgs, args) } } @@ -85,7 +85,7 @@ // SetContentType returns a RequestFunc that sets the ContentType field of // an AMQP Publishing. func SetContentType(contentType string) RequestFunc { - return func(ctx context.Context, pub *amqp.Publishing) context.Context { + return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context { pub.ContentType = contentType return ctx } @@ -94,7 +94,7 @@ // SetContentEncoding returns a RequestFunc that sets the ContentEncoding field // of an AMQP Publishing. func SetContentEncoding(contentEncoding string) RequestFunc { - return func(ctx context.Context, pub *amqp.Publishing) context.Context { + return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context { pub.ContentEncoding = contentEncoding return ctx } @@ -103,7 +103,7 @@ // SetCorrelationID returns a RequestFunc that sets the CorrelationId field // of an AMQP Publishing. func SetCorrelationID(cid string) RequestFunc { - return func(ctx context.Context, pub *amqp.Publishing) context.Context { + return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context { pub.CorrelationId = cid return ctx } diff --git a/transport/amqp/subscriber.go b/transport/amqp/subscriber.go index 17e1b0f..d728aa4 100644 --- a/transport/amqp/subscriber.go +++ b/transport/amqp/subscriber.go @@ -84,7 +84,7 @@ pub := amqp.Publishing{} for _, f := range s.before { - ctx = f(ctx, &pub) + ctx = f(ctx, &pub, deliv) } request, err := s.dec(ctx, deliv)