Package list golang-github-go-kit-kit / fb57ea5
examples: update README + stringsvc: drop loadbalancer Peter Bourgon 5 years ago
4 changed file(s) with 131 addition(s) and 94 deletion(s). Raw diff Collapse all Expand all
7474
7575 type uppercaseResponse struct {
7676 V string `json:"v"`
77 Err string `json:"err,omitempty"` // errors don't define JSON marshaling
77 Err string `json:"err,omitempty"` // errors don't JSON-marshal, so we use a string
7878 }
7979
8080 type countRequest struct {
9797 An endpoint represents a single RPC.
9898 That is, a single method in our service interface.
9999 We'll write simple adapters to convert each of our service's methods into an endpoint.
100 Each adapter takes a StringService, and returns an endpoint that corresponds to one of the methods.
100101
101102 ```go
102103 import (
280281 which wraps an existing StringService, and performs the extra logging duties.
281282
282283 ```go
283 type loggingMiddleware struct{
284 type loggingMiddleware struct {
284285 logger log.Logger
285 StringService
286 next StringService
286287 }
287288
288289 func (mw loggingMiddleware) Uppercase(s string) (output string, err error) {
296297 )
297298 }(time.Now())
298299
299 output, err = mw.StringService.Uppercase(s)
300 output, err = mw.next.Uppercase(s)
300301 return
301302 }
302303
310311 )
311312 }(time.Now())
312313
313 n = mw.StringService.Count(s)
314 n = mw.next.Count(s)
314315 return
315316 }
316317 ```
328329 func main() {
329330 logger := log.NewLogfmtLogger(os.Stderr)
330331
331 svc := stringService{}
332 var svc StringService
333 svc = stringsvc{}
332334 svc = loggingMiddleware{logger, svc}
335
336 // ...
333337
334338 uppercaseHandler := httptransport.NewServer(
335339 // ...
363367 requestCount metrics.Counter
364368 requestLatency metrics.TimeHistogram
365369 countResult metrics.Histogram
366 StringService
370 next StringService
367371 }
368372
369373 func (mw instrumentingMiddleware) Uppercase(s string) (output string, err error) {
374378 mw.requestLatency.With(methodField).With(errorField).Observe(time.Since(begin))
375379 }(time.Now())
376380
377 output, err = mw.StringService.Uppercase(s)
381 output, err = mw.next.Uppercase(s)
378382 return
379383 }
380384
387391 mw.countResult.Observe(int64(n))
388392 }(time.Now())
389393
390 n = mw.StringService.Count(s)
394 n = mw.next.Count(s)
391395 return
392396 }
393397 ```
415419 // ...
416420 }, []string{}))
417421
418 svc := stringService{}
422 var svc StringService
423 svc = stringService{}
419424 svc = loggingMiddleware{logger, svc}
420425 svc = instrumentingMiddleware{requestCount, requestLatency, countResult, svc}
421426
422 uppercaseHandler := httptransport.NewServer(
423 // ...
424 makeUppercaseEndpoint(svc),
425 // ...
426 )
427
428 countHandler := httptransport.NewServer(
429 // ...
430 makeCountEndpoint(svc),
431 // ...
432 )
427 // ...
433428
434429 http.Handle("/metrics", stdprometheus.Handler())
435430 }
466461 **This is where Go kit shines**.
467462 We provide transport middlewares to solve many of the problems that come up.
468463
469 Let's implement the proxying middleware as a ServiceMiddleware.
470 We'll only proxy one method, Uppercase.
464 Let's say that we want to have our string service call out to a _different_ string service
465 to satisfy the Uppercase method.
466 In effect, proxying the request to another service.
467 Let's implement the proxying middleware as a ServiceMiddleware, same as a logging or instrumenting middleware.
471468
472469 ```go
473470 // proxymw implements StringService, forwarding Uppercase requests to the
474471 // provided endpoint, and serving all other (i.e. Count) requests via the
475 // embedded StringService.
472 // next StringService.
476473 type proxymw struct {
477 context.Context
478 StringService // Serve most requests via this embedded service...
479 UppercaseEndpoint endpoint.Endpoint // ...except Uppercase, which gets served by this endpoint
474 ctx context.Context
475 next StringService // Serve most requests via this service...
476 uppercase endpoint.Endpoint // ...except Uppercase, which gets served by this endpoint
480477 }
481478 ```
482479
488485
489486 ```go
490487 func (mw proxymw) Uppercase(s string) (string, error) {
491 response, err := mw.UppercaseEndpoint(mw.Context, uppercaseRequest{S: s})
488 response, err := mw.uppercase(mw.Context, uppercaseRequest{S: s})
492489 if err != nil {
493490 return "", err
494491 }
532529 And if any of those instances start to behave badly, we want to deal with that, without affecting our own service's reliability.
533530
534531 Go kit offers adapters to different service discovery systems, to get up-to-date sets of instances, exposed as individual endpoints.
535 Those adapters are called publishers.
536
537 ```go
538 type Publisher interface {
532 Those adapters are called subscribers.
533
534 ```go
535 type Subscriber interface {
539536 Endpoints() ([]endpoint.Endpoint, error)
540537 }
541538 ```
542539
543 Internally, publishers use a provided factory function to convert each discovered host:port string to a usable endpoint.
540 Internally, subscribers use a provided factory function to convert each discovered instance string (typically host:port) to a usable endpoint.
544541
545542 ```go
546543 type Factory func(instance string) (endpoint.Endpoint, error)
550547 But it's important to put some safety middleware, like circuit breakers and rate limiters, into your factory, too.
551548
552549 ```go
553 func factory(ctx context.Context, maxQPS int) loadbalancer.Factory {
554 return func(instance string) (endpoint.Endpoint, error) {
555 var e endpoint.Endpoint
556 e = makeUppercaseProxy(ctx, instance)
557 e = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{}))(e)
558 e = kitratelimit.NewTokenBucketLimiter(jujuratelimit.NewBucketWithRate(float64(maxQPS), int64(maxQPS)))(e)
559 return e, nil
560 }
550 var e endpoint.Endpoint
551 e = makeUppercaseProxy(ctx, instance)
552 e = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{}))(e)
553 e = kitratelimit.NewTokenBucketLimiter(jujuratelimit.NewBucketWithRate(float64(maxQPS), int64(maxQPS)))(e)
561554 }
562555 ```
563556
564557 Now that we've got a set of endpoints, we need to choose one.
565 Load balancers wrap publishers, and select one endpoint from many.
558 Load balancers wrap subscribers, and select one endpoint from many.
566559 Go kit provides a couple of basic load balancers, and it's easy to write your own if you want more advanced heuristics.
567560
568561 ```go
569 type LoadBalancer interface {
562 type Balancer interface {
570563 Endpoint() (endpoint.Endpoint, error)
571564 }
572565 ```
577570 The retry strategy will retry failed requests until either the max attempts or timeout has been reached.
578571
579572 ```go
580 func Retry(max int, timeout time.Duration, lb LoadBalancer) endpoint.Endpoint
573 func Retry(max int, timeout time.Duration, lb Balancer) endpoint.Endpoint
581574 ```
582575
583576 Let's wire up our final proxying middleware.
584577 For simplicity, we'll assume the user will specify multiple comma-separate instance endpoints with a flag.
585578
586579 ```go
587 func proxyingMiddleware(proxyList string, ctx context.Context, logger log.Logger) ServiceMiddleware {
580 func proxyingMiddleware(instances string, ctx context.Context, logger log.Logger) ServiceMiddleware {
581 // If instances is empty, don't proxy.
582 if instances == "" {
583 logger.Log("proxy_to", "none")
584 return func(next StringService) StringService { return next }
585 }
586
587 // Set some parameters for our client.
588 var (
589 qps = 100 // beyond which we will return an error
590 maxAttempts = 3 // per request, before giving up
591 maxTime = 250 * time.Millisecond // wallclock time, before giving up
592 )
593
594 // Otherwise, construct an endpoint for each instance in the list, and add
595 // it to a fixed set of endpoints. In a real service, rather than doing this
596 // by hand, you'd probably use package sd's support for your service
597 // discovery system.
598 var (
599 instanceList = split(instances)
600 subscriber sd.FixedSubscriber
601 )
602 logger.Log("proxy_to", fmt.Sprint(instanceList))
603 for _, instance := range instanceList {
604 var e endpoint.Endpoint
605 e = makeUppercaseProxy(ctx, instance)
606 e = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{}))(e)
607 e = kitratelimit.NewTokenBucketLimiter(jujuratelimit.NewBucketWithRate(float64(qps), int64(qps)))(e)
608 subscriber = append(subscriber, e)
609 }
610
611 // Now, build a single, retrying, load-balancing endpoint out of all of
612 // those individual endpoints.
613 balancer := lb.NewRoundRobin(subscriber)
614 retry := lb.Retry(maxAttempts, maxTime, balancer)
615
616 // And finally, return the ServiceMiddleware, implemented by proxymw.
588617 return func(next StringService) StringService {
589 var (
590 qps = 100 // max to each instance
591 publisher = static.NewPublisher(split(proxyList), factory(ctx, qps), logger)
592 lb = loadbalancer.NewRoundRobin(publisher)
593 maxAttempts = 3
594 maxTime = 100 * time.Millisecond
595 endpoint = loadbalancer.Retry(maxAttempts, maxTime, lb)
596 )
597 return proxymw{ctx, endpoint, next}
618 return proxymw{ctx, next, retry}
598619 }
599620 }
600621 ```
666687
667688 It's possible to use Go kit to create a client package to your service, to make consuming your service easier from other Go programs.
668689 Effectively, your client package will provide an implementation of your service interface, which invokes a remote service instance using a specific transport.
669 See [package addsvc/client](https://github.com/go-kit/kit/tree/master/examples/addsvc/client) for an example.
690 See [package addsvc/client](https://github.com/go-kit/kit/tree/master/examples/addsvc/client)
691 or [package profilesvc/client](https://github.com/go-kit/kit/tree/master/examples/profilesvc/client)
692 for examples.
670693
671694 ## Other examples
672695
673696 ### addsvc
674697
675 [addsvc](https://github.com/go-kit/kit/blob/master/examples/addsvc) was the original example application.
698 [addsvc](https://github.com/go-kit/kit/blob/master/examples/addsvc) is the original example service.
676699 It exposes a set of operations over **all supported transports**.
677700 It's fully logged, instrumented, and uses Zipkin request tracing.
678701 It also demonstrates how to create and use client packages.
1010 requestCount metrics.Counter
1111 requestLatency metrics.TimeHistogram
1212 countResult metrics.Histogram
13 StringService
13 next StringService
1414 }
1515
1616 func (mw instrumentingMiddleware) Uppercase(s string) (output string, err error) {
2121 mw.requestLatency.With(methodField).With(errorField).Observe(time.Since(begin))
2222 }(time.Now())
2323
24 output, err = mw.StringService.Uppercase(s)
24 output, err = mw.next.Uppercase(s)
2525 return
2626 }
2727
3434 mw.countResult.Observe(int64(n))
3535 }(time.Now())
3636
37 n = mw.StringService.Count(s)
37 n = mw.next.Count(s)
3838 return
3939 }
77
88 type loggingMiddleware struct {
99 logger log.Logger
10 StringService
10 next StringService
1111 }
1212
1313 func (mw loggingMiddleware) Uppercase(s string) (output string, err error) {
2121 )
2222 }(time.Now())
2323
24 output, err = mw.StringService.Uppercase(s)
24 output, err = mw.next.Uppercase(s)
2525 return
2626 }
2727
3535 )
3636 }(time.Now())
3737
38 n = mw.StringService.Count(s)
38 n = mw.next.Count(s)
3939 return
4040 }
22 import (
33 "errors"
44 "fmt"
5 "io"
65 "net/url"
76 "strings"
87 "time"
1312
1413 "github.com/go-kit/kit/circuitbreaker"
1514 "github.com/go-kit/kit/endpoint"
16 "github.com/go-kit/kit/loadbalancer"
17 "github.com/go-kit/kit/loadbalancer/static"
1815 "github.com/go-kit/kit/log"
19 kitratelimit "github.com/go-kit/kit/ratelimit"
16 "github.com/go-kit/kit/ratelimit"
17 "github.com/go-kit/kit/sd"
18 "github.com/go-kit/kit/sd/lb"
2019 httptransport "github.com/go-kit/kit/transport/http"
2120 )
2221
23 func proxyingMiddleware(proxyList string, ctx context.Context, logger log.Logger) ServiceMiddleware {
24 if proxyList == "" {
22 func proxyingMiddleware(instances string, ctx context.Context, logger log.Logger) ServiceMiddleware {
23 // If instances is empty, don't proxy.
24 if instances == "" {
2525 logger.Log("proxy_to", "none")
2626 return func(next StringService) StringService { return next }
2727 }
28 proxies := split(proxyList)
29 logger.Log("proxy_to", fmt.Sprint(proxies))
3028
29 // Set some parameters for our client.
30 var (
31 qps = 100 // beyond which we will return an error
32 maxAttempts = 3 // per request, before giving up
33 maxTime = 250 * time.Millisecond // wallclock time, before giving up
34 )
35
36 // Otherwise, construct an endpoint for each instance in the list, and add
37 // it to a fixed set of endpoints. In a real service, rather than doing this
38 // by hand, you'd probably use package sd's support for your service
39 // discovery system.
40 var (
41 instanceList = split(instances)
42 subscriber sd.FixedSubscriber
43 )
44 logger.Log("proxy_to", fmt.Sprint(instanceList))
45 for _, instance := range instanceList {
46 var e endpoint.Endpoint
47 e = makeUppercaseProxy(ctx, instance)
48 e = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{}))(e)
49 e = ratelimit.NewTokenBucketLimiter(jujuratelimit.NewBucketWithRate(float64(qps), int64(qps)))(e)
50 subscriber = append(subscriber, e)
51 }
52
53 // Now, build a single, retrying, load-balancing endpoint out of all of
54 // those individual endpoints.
55 balancer := lb.NewRoundRobin(subscriber)
56 retry := lb.Retry(maxAttempts, maxTime, balancer)
57
58 // And finally, return the ServiceMiddleware, implemented by proxymw.
3159 return func(next StringService) StringService {
32 var (
33 qps = 100 // max to each instance
34 publisher = static.NewPublisher(proxies, factory(ctx, qps), logger)
35 lb = loadbalancer.NewRoundRobin(publisher)
36 maxAttempts = 3
37 maxTime = 100 * time.Millisecond
38 endpoint = loadbalancer.Retry(maxAttempts, maxTime, lb)
39 )
40 return proxymw{ctx, endpoint, next}
60 return proxymw{ctx, next, retry}
4161 }
4262 }
4363
4464 // proxymw implements StringService, forwarding Uppercase requests to the
4565 // provided endpoint, and serving all other (i.e. Count) requests via the
46 // embedded StringService.
66 // next StringService.
4767 type proxymw struct {
48 context.Context
49 UppercaseEndpoint endpoint.Endpoint
50 StringService
68 ctx context.Context
69 next StringService // Serve most requests via this service...
70 uppercase endpoint.Endpoint // ...except Uppercase, which gets served by this endpoint
71 }
72
73 func (mw proxymw) Count(s string) int {
74 return mw.next.Count(s)
5175 }
5276
5377 func (mw proxymw) Uppercase(s string) (string, error) {
54 response, err := mw.UppercaseEndpoint(mw.Context, uppercaseRequest{S: s})
78 response, err := mw.uppercase(mw.ctx, uppercaseRequest{S: s})
5579 if err != nil {
5680 return "", err
5781 }
6185 return resp.V, errors.New(resp.Err)
6286 }
6387 return resp.V, nil
64 }
65
66 func factory(ctx context.Context, qps int) loadbalancer.Factory {
67 return func(instance string) (endpoint.Endpoint, io.Closer, error) {
68 var e endpoint.Endpoint
69 e = makeUppercaseProxy(ctx, instance)
70 e = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{}))(e)
71 e = kitratelimit.NewTokenBucketLimiter(jujuratelimit.NewBucketWithRate(float64(qps), int64(qps)))(e)
72 return e, nil, nil
73 }
7488 }
7589
7690 func makeUppercaseProxy(ctx context.Context, instance string) endpoint.Endpoint {