diff --git a/examples/README.md b/examples/README.md index c005e3e..b43b696 100644 --- a/examples/README.md +++ b/examples/README.md @@ -75,7 +75,7 @@ type uppercaseResponse struct { V string `json:"v"` - Err string `json:"err,omitempty"` // errors don't define JSON marshaling + Err string `json:"err,omitempty"` // errors don't JSON-marshal, so we use a string } type countRequest struct { @@ -98,6 +98,7 @@ An endpoint represents a single RPC. That is, a single method in our service interface. We'll write simple adapters to convert each of our service's methods into an endpoint. +Each adapter takes a StringService, and returns an endpoint that corresponds to one of the methods. ```go import ( @@ -281,9 +282,9 @@ which wraps an existing StringService, and performs the extra logging duties. ```go -type loggingMiddleware struct{ +type loggingMiddleware struct { logger log.Logger - StringService + next StringService } func (mw loggingMiddleware) Uppercase(s string) (output string, err error) { @@ -297,7 +298,7 @@ ) }(time.Now()) - output, err = mw.StringService.Uppercase(s) + output, err = mw.next.Uppercase(s) return } @@ -311,7 +312,7 @@ ) }(time.Now()) - n = mw.StringService.Count(s) + n = mw.next.Count(s) return } ``` @@ -329,8 +330,11 @@ func main() { logger := log.NewLogfmtLogger(os.Stderr) - svc := stringService{} + var svc StringService + svc = stringsvc{} svc = loggingMiddleware{logger, svc} + + // ... uppercaseHandler := httptransport.NewServer( // ... @@ -364,7 +368,7 @@ requestCount metrics.Counter requestLatency metrics.TimeHistogram countResult metrics.Histogram - StringService + next StringService } func (mw instrumentingMiddleware) Uppercase(s string) (output string, err error) { @@ -375,7 +379,7 @@ mw.requestLatency.With(methodField).With(errorField).Observe(time.Since(begin)) }(time.Now()) - output, err = mw.StringService.Uppercase(s) + output, err = mw.next.Uppercase(s) return } @@ -388,7 +392,7 @@ mw.countResult.Observe(int64(n)) }(time.Now()) - n = mw.StringService.Count(s) + n = mw.next.Count(s) return } ``` @@ -416,21 +420,12 @@ // ... }, []string{})) - svc := stringService{} + var svc StringService + svc = stringService{} svc = loggingMiddleware{logger, svc} svc = instrumentingMiddleware{requestCount, requestLatency, countResult, svc} - uppercaseHandler := httptransport.NewServer( - // ... - makeUppercaseEndpoint(svc), - // ... - ) - - countHandler := httptransport.NewServer( - // ... - makeCountEndpoint(svc), - // ... - ) + // ... http.Handle("/metrics", stdprometheus.Handler()) } @@ -467,17 +462,19 @@ **This is where Go kit shines**. We provide transport middlewares to solve many of the problems that come up. -Let's implement the proxying middleware as a ServiceMiddleware. -We'll only proxy one method, Uppercase. +Let's say that we want to have our string service call out to a _different_ string service + to satisfy the Uppercase method. +In effect, proxying the request to another service. +Let's implement the proxying middleware as a ServiceMiddleware, same as a logging or instrumenting middleware. ```go // proxymw implements StringService, forwarding Uppercase requests to the // provided endpoint, and serving all other (i.e. Count) requests via the -// embedded StringService. +// next StringService. type proxymw struct { - context.Context - StringService // Serve most requests via this embedded service... - UppercaseEndpoint endpoint.Endpoint // ...except Uppercase, which gets served by this endpoint + ctx context.Context + next StringService // Serve most requests via this service... + uppercase endpoint.Endpoint // ...except Uppercase, which gets served by this endpoint } ``` @@ -489,7 +486,7 @@ ```go func (mw proxymw) Uppercase(s string) (string, error) { - response, err := mw.UppercaseEndpoint(mw.Context, uppercaseRequest{S: s}) + response, err := mw.uppercase(mw.Context, uppercaseRequest{S: s}) if err != nil { return "", err } @@ -533,15 +530,15 @@ And if any of those instances start to behave badly, we want to deal with that, without affecting our own service's reliability. Go kit offers adapters to different service discovery systems, to get up-to-date sets of instances, exposed as individual endpoints. -Those adapters are called publishers. - -```go -type Publisher interface { +Those adapters are called subscribers. + +```go +type Subscriber interface { Endpoints() ([]endpoint.Endpoint, error) } ``` -Internally, publishers use a provided factory function to convert each discovered host:port string to a usable endpoint. +Internally, subscribers use a provided factory function to convert each discovered instance string (typically host:port) to a usable endpoint. ```go type Factory func(instance string) (endpoint.Endpoint, error) @@ -551,23 +548,19 @@ But it's important to put some safety middleware, like circuit breakers and rate limiters, into your factory, too. ```go -func factory(ctx context.Context, maxQPS int) loadbalancer.Factory { - return func(instance string) (endpoint.Endpoint, error) { - var e endpoint.Endpoint - e = makeUppercaseProxy(ctx, instance) - e = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{}))(e) - e = kitratelimit.NewTokenBucketLimiter(jujuratelimit.NewBucketWithRate(float64(maxQPS), int64(maxQPS)))(e) - return e, nil - } +var e endpoint.Endpoint +e = makeUppercaseProxy(ctx, instance) +e = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{}))(e) +e = kitratelimit.NewTokenBucketLimiter(jujuratelimit.NewBucketWithRate(float64(maxQPS), int64(maxQPS)))(e) } ``` Now that we've got a set of endpoints, we need to choose one. -Load balancers wrap publishers, and select one endpoint from many. +Load balancers wrap subscribers, and select one endpoint from many. Go kit provides a couple of basic load balancers, and it's easy to write your own if you want more advanced heuristics. ```go -type LoadBalancer interface { +type Balancer interface { Endpoint() (endpoint.Endpoint, error) } ``` @@ -578,24 +571,52 @@ The retry strategy will retry failed requests until either the max attempts or timeout has been reached. ```go -func Retry(max int, timeout time.Duration, lb LoadBalancer) endpoint.Endpoint +func Retry(max int, timeout time.Duration, lb Balancer) endpoint.Endpoint ``` Let's wire up our final proxying middleware. For simplicity, we'll assume the user will specify multiple comma-separate instance endpoints with a flag. ```go -func proxyingMiddleware(proxyList string, ctx context.Context, logger log.Logger) ServiceMiddleware { +func proxyingMiddleware(instances string, ctx context.Context, logger log.Logger) ServiceMiddleware { + // If instances is empty, don't proxy. + if instances == "" { + logger.Log("proxy_to", "none") + return func(next StringService) StringService { return next } + } + + // Set some parameters for our client. + var ( + qps = 100 // beyond which we will return an error + maxAttempts = 3 // per request, before giving up + maxTime = 250 * time.Millisecond // wallclock time, before giving up + ) + + // Otherwise, construct an endpoint for each instance in the list, and add + // it to a fixed set of endpoints. In a real service, rather than doing this + // by hand, you'd probably use package sd's support for your service + // discovery system. + var ( + instanceList = split(instances) + subscriber sd.FixedSubscriber + ) + logger.Log("proxy_to", fmt.Sprint(instanceList)) + for _, instance := range instanceList { + var e endpoint.Endpoint + e = makeUppercaseProxy(ctx, instance) + e = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{}))(e) + e = kitratelimit.NewTokenBucketLimiter(jujuratelimit.NewBucketWithRate(float64(qps), int64(qps)))(e) + subscriber = append(subscriber, e) + } + + // Now, build a single, retrying, load-balancing endpoint out of all of + // those individual endpoints. + balancer := lb.NewRoundRobin(subscriber) + retry := lb.Retry(maxAttempts, maxTime, balancer) + + // And finally, return the ServiceMiddleware, implemented by proxymw. return func(next StringService) StringService { - var ( - qps = 100 // max to each instance - publisher = static.NewPublisher(split(proxyList), factory(ctx, qps), logger) - lb = loadbalancer.NewRoundRobin(publisher) - maxAttempts = 3 - maxTime = 100 * time.Millisecond - endpoint = loadbalancer.Retry(maxAttempts, maxTime, lb) - ) - return proxymw{ctx, endpoint, next} + return proxymw{ctx, next, retry} } } ``` @@ -667,13 +688,15 @@ It's possible to use Go kit to create a client package to your service, to make consuming your service easier from other Go programs. Effectively, your client package will provide an implementation of your service interface, which invokes a remote service instance using a specific transport. -See [package addsvc/client](https://github.com/go-kit/kit/tree/master/examples/addsvc/client) for an example. +See [package addsvc/client](https://github.com/go-kit/kit/tree/master/examples/addsvc/client) + or [package profilesvc/client](https://github.com/go-kit/kit/tree/master/examples/profilesvc/client) + for examples. ## Other examples ### addsvc -[addsvc](https://github.com/go-kit/kit/blob/master/examples/addsvc) was the original example application. +[addsvc](https://github.com/go-kit/kit/blob/master/examples/addsvc) is the original example service. It exposes a set of operations over **all supported transports**. It's fully logged, instrumented, and uses Zipkin request tracing. It also demonstrates how to create and use client packages. diff --git a/examples/stringsvc2/instrumenting.go b/examples/stringsvc2/instrumenting.go index c3da27c..f461845 100644 --- a/examples/stringsvc2/instrumenting.go +++ b/examples/stringsvc2/instrumenting.go @@ -11,7 +11,7 @@ requestCount metrics.Counter requestLatency metrics.TimeHistogram countResult metrics.Histogram - StringService + next StringService } func (mw instrumentingMiddleware) Uppercase(s string) (output string, err error) { @@ -22,7 +22,7 @@ mw.requestLatency.With(methodField).With(errorField).Observe(time.Since(begin)) }(time.Now()) - output, err = mw.StringService.Uppercase(s) + output, err = mw.next.Uppercase(s) return } @@ -35,6 +35,6 @@ mw.countResult.Observe(int64(n)) }(time.Now()) - n = mw.StringService.Count(s) + n = mw.next.Count(s) return } diff --git a/examples/stringsvc2/logging.go b/examples/stringsvc2/logging.go index 67fec5d..b958f3b 100644 --- a/examples/stringsvc2/logging.go +++ b/examples/stringsvc2/logging.go @@ -8,7 +8,7 @@ type loggingMiddleware struct { logger log.Logger - StringService + next StringService } func (mw loggingMiddleware) Uppercase(s string) (output string, err error) { @@ -22,7 +22,7 @@ ) }(time.Now()) - output, err = mw.StringService.Uppercase(s) + output, err = mw.next.Uppercase(s) return } @@ -36,6 +36,6 @@ ) }(time.Now()) - n = mw.StringService.Count(s) + n = mw.next.Count(s) return } diff --git a/examples/stringsvc3/proxying.go b/examples/stringsvc3/proxying.go index 78b7550..33bc156 100644 --- a/examples/stringsvc3/proxying.go +++ b/examples/stringsvc3/proxying.go @@ -3,7 +3,6 @@ import ( "errors" "fmt" - "io" "net/url" "strings" "time" @@ -14,45 +13,70 @@ "github.com/go-kit/kit/circuitbreaker" "github.com/go-kit/kit/endpoint" - "github.com/go-kit/kit/loadbalancer" - "github.com/go-kit/kit/loadbalancer/static" "github.com/go-kit/kit/log" - kitratelimit "github.com/go-kit/kit/ratelimit" + "github.com/go-kit/kit/ratelimit" + "github.com/go-kit/kit/sd" + "github.com/go-kit/kit/sd/lb" httptransport "github.com/go-kit/kit/transport/http" ) -func proxyingMiddleware(proxyList string, ctx context.Context, logger log.Logger) ServiceMiddleware { - if proxyList == "" { +func proxyingMiddleware(instances string, ctx context.Context, logger log.Logger) ServiceMiddleware { + // If instances is empty, don't proxy. + if instances == "" { logger.Log("proxy_to", "none") return func(next StringService) StringService { return next } } - proxies := split(proxyList) - logger.Log("proxy_to", fmt.Sprint(proxies)) + // Set some parameters for our client. + var ( + qps = 100 // beyond which we will return an error + maxAttempts = 3 // per request, before giving up + maxTime = 250 * time.Millisecond // wallclock time, before giving up + ) + + // Otherwise, construct an endpoint for each instance in the list, and add + // it to a fixed set of endpoints. In a real service, rather than doing this + // by hand, you'd probably use package sd's support for your service + // discovery system. + var ( + instanceList = split(instances) + subscriber sd.FixedSubscriber + ) + logger.Log("proxy_to", fmt.Sprint(instanceList)) + for _, instance := range instanceList { + var e endpoint.Endpoint + e = makeUppercaseProxy(ctx, instance) + e = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{}))(e) + e = ratelimit.NewTokenBucketLimiter(jujuratelimit.NewBucketWithRate(float64(qps), int64(qps)))(e) + subscriber = append(subscriber, e) + } + + // Now, build a single, retrying, load-balancing endpoint out of all of + // those individual endpoints. + balancer := lb.NewRoundRobin(subscriber) + retry := lb.Retry(maxAttempts, maxTime, balancer) + + // And finally, return the ServiceMiddleware, implemented by proxymw. return func(next StringService) StringService { - var ( - qps = 100 // max to each instance - publisher = static.NewPublisher(proxies, factory(ctx, qps), logger) - lb = loadbalancer.NewRoundRobin(publisher) - maxAttempts = 3 - maxTime = 100 * time.Millisecond - endpoint = loadbalancer.Retry(maxAttempts, maxTime, lb) - ) - return proxymw{ctx, endpoint, next} + return proxymw{ctx, next, retry} } } // proxymw implements StringService, forwarding Uppercase requests to the // provided endpoint, and serving all other (i.e. Count) requests via the -// embedded StringService. +// next StringService. type proxymw struct { - context.Context - UppercaseEndpoint endpoint.Endpoint - StringService + ctx context.Context + next StringService // Serve most requests via this service... + uppercase endpoint.Endpoint // ...except Uppercase, which gets served by this endpoint +} + +func (mw proxymw) Count(s string) int { + return mw.next.Count(s) } func (mw proxymw) Uppercase(s string) (string, error) { - response, err := mw.UppercaseEndpoint(mw.Context, uppercaseRequest{S: s}) + response, err := mw.uppercase(mw.ctx, uppercaseRequest{S: s}) if err != nil { return "", err } @@ -62,16 +86,6 @@ return resp.V, errors.New(resp.Err) } return resp.V, nil -} - -func factory(ctx context.Context, qps int) loadbalancer.Factory { - return func(instance string) (endpoint.Endpoint, io.Closer, error) { - var e endpoint.Endpoint - e = makeUppercaseProxy(ctx, instance) - e = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{}))(e) - e = kitratelimit.NewTokenBucketLimiter(jujuratelimit.NewBucketWithRate(float64(qps), int64(qps)))(e) - return e, nil, nil - } } func makeUppercaseProxy(ctx context.Context, instance string) endpoint.Endpoint {