Codebase list golang-github-go-kit-kit / f307e26 examples / stringsvc3 / proxying.go
f307e26

Tree @f307e26 (Download .tar.gz)

proxying.go @f307e26raw · history · blame

package main

import (
	"errors"
	"fmt"
	"net/http"
	"net/url"
	"strings"
	"time"

	jujuratelimit "github.com/juju/ratelimit"
	"github.com/sony/gobreaker"
	"golang.org/x/net/context"

	"github.com/go-kit/kit/circuitbreaker"
	"github.com/go-kit/kit/endpoint"
	"github.com/go-kit/kit/loadbalancer"
	"github.com/go-kit/kit/loadbalancer/static"
	"github.com/go-kit/kit/log"
	kitratelimit "github.com/go-kit/kit/ratelimit"
	httptransport "github.com/go-kit/kit/transport/http"
)

func proxyingMiddleware(proxyList string, ctx context.Context, logger log.Logger) ServiceMiddleware {
	if proxyList == "" {
		_ = logger.Log("proxy_to", "none")
		return func(next StringService) StringService { return next }
	}
	proxies := split(proxyList)
	_ = logger.Log("proxy_to", fmt.Sprint(proxies))

	return func(next StringService) StringService {
		var (
			qps         = 100 // max to each instance
			publisher   = static.NewPublisher(proxies, factory(ctx, qps), logger)
			lb          = loadbalancer.NewRoundRobin(publisher)
			maxAttempts = 3
			maxTime     = 100 * time.Millisecond
			endpoint    = loadbalancer.Retry(maxAttempts, maxTime, lb)
		)
		return proxymw{ctx, endpoint, next}
	}
}

// proxymw implements StringService, forwarding Uppercase requests to the
// provided endpoint, and serving all other (i.e. Count) requests via the
// embedded StringService.
type proxymw struct {
	context.Context
	UppercaseEndpoint endpoint.Endpoint
	StringService
}

func (mw proxymw) Uppercase(s string) (string, error) {
	response, err := mw.UppercaseEndpoint(mw.Context, uppercaseRequest{S: s})
	if err != nil {
		return "", err
	}

	resp := response.(uppercaseResponse)
	if resp.Err != "" {
		return resp.V, errors.New(resp.Err)
	}
	return resp.V, nil
}

func factory(ctx context.Context, qps int) loadbalancer.Factory {
	return func(instance string) (endpoint.Endpoint, error) {
		var e endpoint.Endpoint
		e = makeUppercaseProxy(ctx, instance)
		e = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{}))(e)
		e = kitratelimit.NewTokenBucketLimiter(jujuratelimit.NewBucketWithRate(float64(qps), int64(qps)))(e)
		return e, nil
	}
}

func makeUppercaseProxy(ctx context.Context, instance string) endpoint.Endpoint {
	if !strings.HasPrefix(instance, "http") {
		instance = "http://" + instance
	}
	u, err := url.Parse(instance)
	if err != nil {
		panic(err)
	}
	if u.Path == "" {
		u.Path = "/uppercase"
	}
	return (httptransport.Client{
		Client:             http.DefaultClient,
		Method:             "GET",
		URL:                u,
		DecodeResponseFunc: decodeUppercaseResponse,
		EncodeRequestFunc:  encodeRequest,
	}).Endpoint()
}

func split(s string) []string {
	a := strings.Split(s, ",")
	for i := range a {
		a[i] = strings.TrimSpace(a[i])
	}
	return a
}