Codebase list golang-github-go-kit-kit / 53b763c
Add amqp.Delivery to the request func in order to have the ability to change incoming message before decode function in case of zipkin context (#802) 0xdev authored 5 years ago Peter Bourgon committed 5 years ago
5 changed file(s) with 36 addition(s) and 35 deletion(s). Raw diff Collapse all Expand all
+0
-23
transport/amqp/encode-decode.go less more
0 package amqp
1
2 import (
3 "context"
4
5 "github.com/streadway/amqp"
6 )
7
8 // DecodeRequestFunc extracts a user-domain request object from
9 // an AMQP Delivery object. It is designed to be used in AMQP Subscribers.
10 type DecodeRequestFunc func(context.Context, *amqp.Delivery) (request interface{}, err error)
11
12 // EncodeRequestFunc encodes the passed request object into
13 // an AMQP Publishing object. It is designed to be used in AMQP Publishers.
14 type EncodeRequestFunc func(context.Context, *amqp.Publishing, interface{}) error
15
16 // EncodeResponseFunc encodes the passed response object to
17 // an AMQP Publishing object. It is designed to be used in AMQP Subscribers.
18 type EncodeResponseFunc func(context.Context, *amqp.Publishing, interface{}) error
19
20 // DecodeResponseFunc extracts a user-domain response object from
21 // an AMQP Delivery object. It is designed to be used in AMQP Publishers.
22 type DecodeResponseFunc func(context.Context, *amqp.Delivery) (response interface{}, err error)
0 package amqp
1
2 import (
3 "context"
4
5 "github.com/streadway/amqp"
6 )
7
8 // DecodeRequestFunc extracts a user-domain request object from
9 // an AMQP Delivery object. It is designed to be used in AMQP Subscribers.
10 type DecodeRequestFunc func(context.Context, *amqp.Delivery) (request interface{}, err error)
11
12 // EncodeRequestFunc encodes the passed request object into
13 // an AMQP Publishing object. It is designed to be used in AMQP Publishers.
14 type EncodeRequestFunc func(context.Context, *amqp.Publishing, interface{}) error
15
16 // EncodeResponseFunc encodes the passed response object to
17 // an AMQP Publishing object. It is designed to be used in AMQP Subscribers.
18 type EncodeResponseFunc func(context.Context, *amqp.Publishing, interface{}) error
19
20 // DecodeResponseFunc extracts a user-domain response object from
21 // an AMQP Delivery object. It is designed to be used in AMQP Publishers.
22 type DecodeResponseFunc func(context.Context, *amqp.Delivery) (response interface{}, err error)
8181 }
8282
8383 for _, f := range p.before {
84 ctx = f(ctx, &pub)
84 // Affect only amqp.Publishing
85 ctx = f(ctx, &pub, nil)
8586 }
8687
8788 deliv, err := p.publishAndConsumeFirstMatchingResponse(ctx, &pub)
99 // RequestFunc may take information from a publisher request and put it into a
1010 // request context. In Subscribers, RequestFuncs are executed prior to invoking
1111 // the endpoint.
12 type RequestFunc func(context.Context, *amqp.Publishing) context.Context
12 type RequestFunc func(context.Context, *amqp.Publishing, *amqp.Delivery) context.Context
1313
1414 // SubscriberResponseFunc may take information from a request context and use it to
1515 // manipulate a Publisher. SubscriberResponseFuncs are only executed in
2828 // SetPublishExchange returns a RequestFunc that sets the Exchange field
2929 // of an AMQP Publish call.
3030 func SetPublishExchange(publishExchange string) RequestFunc {
31 return func(ctx context.Context, pub *amqp.Publishing) context.Context {
31 return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context {
3232 return context.WithValue(ctx, ContextKeyExchange, publishExchange)
3333 }
3434 }
3636 // SetPublishKey returns a RequestFunc that sets the Key field
3737 // of an AMQP Publish call.
3838 func SetPublishKey(publishKey string) RequestFunc {
39 return func(ctx context.Context, pub *amqp.Publishing) context.Context {
39 return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context {
4040 return context.WithValue(ctx, ContextKeyPublishKey, publishKey)
4141 }
4242 }
4444 // SetPublishDeliveryMode sets the delivery mode of a Publishing.
4545 // Please refer to AMQP delivery mode constants in the AMQP package.
4646 func SetPublishDeliveryMode(dmode uint8) RequestFunc {
47 return func(ctx context.Context, pub *amqp.Publishing) context.Context {
47 return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context {
4848 pub.DeliveryMode = dmode
4949 return ctx
5050 }
5656 // One example is the SingleNackRequeueErrorEncoder.
5757 // It is designed to be used by Subscribers.
5858 func SetNackSleepDuration(duration time.Duration) RequestFunc {
59 return func(ctx context.Context, pub *amqp.Publishing) context.Context {
59 return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context {
6060 return context.WithValue(ctx, ContextKeyNackSleepDuration, duration)
6161 }
6262 }
6767 // a matching correlationId.
6868 // It is designed to be used by Publishers.
6969 func SetConsumeAutoAck(autoAck bool) RequestFunc {
70 return func(ctx context.Context, pub *amqp.Publishing) context.Context {
70 return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context {
7171 return context.WithValue(ctx, ContextKeyAutoAck, autoAck)
7272 }
7373 }
7676 // function.
7777 // It is designed to be used by Publishers.
7878 func SetConsumeArgs(args amqp.Table) RequestFunc {
79 return func(ctx context.Context, pub *amqp.Publishing) context.Context {
79 return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context {
8080 return context.WithValue(ctx, ContextKeyConsumeArgs, args)
8181 }
8282 }
8484 // SetContentType returns a RequestFunc that sets the ContentType field of
8585 // an AMQP Publishing.
8686 func SetContentType(contentType string) RequestFunc {
87 return func(ctx context.Context, pub *amqp.Publishing) context.Context {
87 return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context {
8888 pub.ContentType = contentType
8989 return ctx
9090 }
9393 // SetContentEncoding returns a RequestFunc that sets the ContentEncoding field
9494 // of an AMQP Publishing.
9595 func SetContentEncoding(contentEncoding string) RequestFunc {
96 return func(ctx context.Context, pub *amqp.Publishing) context.Context {
96 return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context {
9797 pub.ContentEncoding = contentEncoding
9898 return ctx
9999 }
102102 // SetCorrelationID returns a RequestFunc that sets the CorrelationId field
103103 // of an AMQP Publishing.
104104 func SetCorrelationID(cid string) RequestFunc {
105 return func(ctx context.Context, pub *amqp.Publishing) context.Context {
105 return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context {
106106 pub.CorrelationId = cid
107107 return ctx
108108 }
8383 pub := amqp.Publishing{}
8484
8585 for _, f := range s.before {
86 ctx = f(ctx, &pub)
86 ctx = f(ctx, &pub, deliv)
8787 }
8888
8989 request, err := s.dec(ctx, deliv)