package addtransport
import (
"bytes"
"context"
"encoding/json"
"errors"
"io/ioutil"
"net/http"
"net/url"
"strings"
"time"
jujuratelimit "github.com/juju/ratelimit"
stdopentracing "github.com/opentracing/opentracing-go"
"github.com/sony/gobreaker"
"github.com/go-kit/kit/circuitbreaker"
"github.com/go-kit/kit/endpoint"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/ratelimit"
"github.com/go-kit/kit/tracing/opentracing"
httptransport "github.com/go-kit/kit/transport/http"
"github.com/go-kit/kit/examples/addsvc/pkg/addendpoint"
"github.com/go-kit/kit/examples/addsvc/pkg/addservice"
)
// NewHTTPHandler returns an HTTP handler that makes a set of endpoints
// available on predefined paths.
func NewHTTPHandler(endpoints addendpoint.Set, tracer stdopentracing.Tracer, logger log.Logger) http.Handler {
options := []httptransport.ServerOption{
httptransport.ServerErrorEncoder(errorEncoder),
httptransport.ServerErrorLogger(logger),
}
m := http.NewServeMux()
m.Handle("/sum", httptransport.NewServer(
endpoints.SumEndpoint,
decodeHTTPSumRequest,
encodeHTTPGenericResponse,
append(options, httptransport.ServerBefore(opentracing.HTTPToContext(tracer, "Sum", logger)))...,
))
m.Handle("/concat", httptransport.NewServer(
endpoints.ConcatEndpoint,
decodeHTTPConcatRequest,
encodeHTTPGenericResponse,
append(options, httptransport.ServerBefore(opentracing.HTTPToContext(tracer, "Concat", logger)))...,
))
return m
}
// NewHTTPClient returns an AddService backed by an HTTP server living at the
// remote instance. We expect instance to come from a service discovery system,
// so likely of the form "host:port". We bake-in certain middlewares,
// implementing the client library pattern.
func NewHTTPClient(instance string, tracer stdopentracing.Tracer, logger log.Logger) (addservice.Service, error) {
// Quickly sanitize the instance string.
if !strings.HasPrefix(instance, "http") {
instance = "http://" + instance
}
u, err := url.Parse(instance)
if err != nil {
return nil, err
}
// We construct a single ratelimiter middleware, to limit the total outgoing
// QPS from this client to all methods on the remote instance. We also
// construct per-endpoint circuitbreaker middlewares to demonstrate how
// that's done, although they could easily be combined into a single breaker
// for the entire remote instance, too.
limiter := ratelimit.NewTokenBucketLimiter(jujuratelimit.NewBucketWithRate(100, 100))
// Each individual endpoint is an http/transport.Client (which implements
// endpoint.Endpoint) that gets wrapped with various middlewares. If you
// made your own client library, you'd do this work there, so your server
// could rely on a consistent set of client behavior.
var sumEndpoint endpoint.Endpoint
{
sumEndpoint = httptransport.NewClient(
"POST",
copyURL(u, "/sum"),
encodeHTTPGenericRequest,
decodeHTTPSumResponse,
httptransport.ClientBefore(opentracing.ContextToHTTP(tracer, logger)),
).Endpoint()
sumEndpoint = opentracing.TraceClient(tracer, "Sum")(sumEndpoint)
sumEndpoint = limiter(sumEndpoint)
sumEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{
Name: "Sum",
Timeout: 30 * time.Second,
}))(sumEndpoint)
}
// The Concat endpoint is the same thing, with slightly different
// middlewares to demonstrate how to specialize per-endpoint.
var concatEndpoint endpoint.Endpoint
{
concatEndpoint = httptransport.NewClient(
"POST",
copyURL(u, "/concat"),
encodeHTTPGenericRequest,
decodeHTTPConcatResponse,
httptransport.ClientBefore(opentracing.ContextToHTTP(tracer, logger)),
).Endpoint()
concatEndpoint = opentracing.TraceClient(tracer, "Concat")(concatEndpoint)
concatEndpoint = limiter(concatEndpoint)
concatEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{
Name: "Concat",
Timeout: 10 * time.Second,
}))(concatEndpoint)
}
// Returning the endpoint.Set as a service.Service relies on the
// endpoint.Set implementing the Service methods. That's just a simple bit
// of glue code.
return addendpoint.Set{
SumEndpoint: sumEndpoint,
ConcatEndpoint: concatEndpoint,
}, nil
}
func copyURL(base *url.URL, path string) *url.URL {
next := *base
next.Path = path
return &next
}
func errorEncoder(_ context.Context, err error, w http.ResponseWriter) {
w.WriteHeader(err2code(err))
json.NewEncoder(w).Encode(errorWrapper{Error: err.Error()})
}
func err2code(err error) int {
switch err {
case addservice.ErrTwoZeroes, addservice.ErrMaxSizeExceeded, addservice.ErrIntOverflow:
return http.StatusBadRequest
}
return http.StatusInternalServerError
}
func errorDecoder(r *http.Response) error {
var w errorWrapper
if err := json.NewDecoder(r.Body).Decode(&w); err != nil {
return err
}
return errors.New(w.Error)
}
type errorWrapper struct {
Error string `json:"error"`
}
// decodeHTTPSumRequest is a transport/http.DecodeRequestFunc that decodes a
// JSON-encoded sum request from the HTTP request body. Primarily useful in a
// server.
func decodeHTTPSumRequest(_ context.Context, r *http.Request) (interface{}, error) {
var req addendpoint.SumRequest
err := json.NewDecoder(r.Body).Decode(&req)
return req, err
}
// decodeHTTPConcatRequest is a transport/http.DecodeRequestFunc that decodes a
// JSON-encoded concat request from the HTTP request body. Primarily useful in a
// server.
func decodeHTTPConcatRequest(_ context.Context, r *http.Request) (interface{}, error) {
var req addendpoint.ConcatRequest
err := json.NewDecoder(r.Body).Decode(&req)
return req, err
}
// decodeHTTPSumResponse is a transport/http.DecodeResponseFunc that decodes a
// JSON-encoded sum response from the HTTP response body. If the response has a
// non-200 status code, we will interpret that as an error and attempt to decode
// the specific error message from the response body. Primarily useful in a
// client.
func decodeHTTPSumResponse(_ context.Context, r *http.Response) (interface{}, error) {
if r.StatusCode != http.StatusOK {
return nil, errors.New(r.Status)
}
var resp addendpoint.SumResponse
err := json.NewDecoder(r.Body).Decode(&resp)
return resp, err
}
// decodeHTTPConcatResponse is a transport/http.DecodeResponseFunc that decodes
// a JSON-encoded concat response from the HTTP response body. If the response
// has a non-200 status code, we will interpret that as an error and attempt to
// decode the specific error message from the response body. Primarily useful in
// a client.
func decodeHTTPConcatResponse(_ context.Context, r *http.Response) (interface{}, error) {
if r.StatusCode != http.StatusOK {
return nil, errors.New(r.Status)
}
var resp addendpoint.ConcatResponse
err := json.NewDecoder(r.Body).Decode(&resp)
return resp, err
}
// encodeHTTPGenericRequest is a transport/http.EncodeRequestFunc that
// JSON-encodes any request to the request body. Primarily useful in a client.
func encodeHTTPGenericRequest(_ context.Context, r *http.Request, request interface{}) error {
var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(request); err != nil {
return err
}
r.Body = ioutil.NopCloser(&buf)
return nil
}
// encodeHTTPGenericResponse is a transport/http.EncodeResponseFunc that encodes
// the response as JSON to the response writer. Primarily useful in a server.
func encodeHTTPGenericResponse(ctx context.Context, w http.ResponseWriter, response interface{}) error {
if f, ok := response.(addendpoint.Failer); ok && f.Failed() != nil {
errorEncoder(ctx, f.Failed(), w)
return nil
}
w.Header().Set("Content-Type", "application/json; charset=utf-8")
return json.NewEncoder(w).Encode(response)
}