package nats
import (
"context"
"encoding/json"
"github.com/go-kit/kit/endpoint"
"github.com/nats-io/nats.go"
"time"
)
// Publisher wraps a URL and provides a method that implements endpoint.Endpoint.
type Publisher struct {
publisher *nats.Conn
subject string
enc EncodeRequestFunc
dec DecodeResponseFunc
before []RequestFunc
after []PublisherResponseFunc
timeout time.Duration
}
// NewPublisher constructs a usable Publisher for a single remote method.
func NewPublisher(
publisher *nats.Conn,
subject string,
enc EncodeRequestFunc,
dec DecodeResponseFunc,
options ...PublisherOption,
) *Publisher {
p := &Publisher{
publisher: publisher,
subject: subject,
enc: enc,
dec: dec,
timeout: 10 * time.Second,
}
for _, option := range options {
option(p)
}
return p
}
// PublisherOption sets an optional parameter for clients.
type PublisherOption func(*Publisher)
// PublisherBefore sets the RequestFuncs that are applied to the outgoing NATS
// request before it's invoked.
func PublisherBefore(before ...RequestFunc) PublisherOption {
return func(p *Publisher) { p.before = append(p.before, before...) }
}
// PublisherAfter sets the ClientResponseFuncs applied to the incoming NATS
// request prior to it being decoded. This is useful for obtaining anything off
// of the response and adding onto the context prior to decoding.
func PublisherAfter(after ...PublisherResponseFunc) PublisherOption {
return func(p *Publisher) { p.after = append(p.after, after...) }
}
// PublisherTimeout sets the available timeout for NATS request.
func PublisherTimeout(timeout time.Duration) PublisherOption {
return func(p *Publisher) { p.timeout = timeout }
}
// Endpoint returns a usable endpoint that invokes the remote endpoint.
func (p Publisher) Endpoint() endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (interface{}, error) {
ctx, cancel := context.WithTimeout(ctx, p.timeout)
defer cancel()
msg := nats.Msg{Subject: p.subject}
if err := p.enc(ctx, &msg, request); err != nil {
return nil, err
}
for _, f := range p.before {
ctx = f(ctx, &msg)
}
resp, err := p.publisher.RequestWithContext(ctx, msg.Subject, msg.Data)
if err != nil {
return nil, err
}
for _, f := range p.after {
ctx = f(ctx, resp)
}
response, err := p.dec(ctx, resp)
if err != nil {
return nil, err
}
return response, nil
}
}
// EncodeJSONRequest is an EncodeRequestFunc that serializes the request as a
// JSON object to the Data of the Msg. Many JSON-over-NATS services can use it as
// a sensible default.
func EncodeJSONRequest(_ context.Context, msg *nats.Msg, request interface{}) error {
b, err := json.Marshal(request)
if err != nil {
return err
}
msg.Data = b
return nil
}