Codebase list golang-github-go-kit-kit / 2c5ef80
examples/apigateway: refactor Peter Bourgon 7 years ago
5 changed file(s) with 249 addition(s) and 168 deletion(s). Raw diff Collapse all Expand all
4646 )
4747 flag.Parse()
4848
49 // Logging domain
49 // Logging domain.
5050 var logger log.Logger
5151 {
5252 logger = log.NewLogfmtLogger(os.Stdout)
5656 logger.Log("msg", "hello")
5757 defer logger.Log("msg", "goodbye")
5858
59 // Metrics domain
59 // Metrics domain.
6060 var ints, chars metrics.Counter
6161 {
62 // Business level metrics
62 // Business level metrics.
6363 ints = prometheus.NewCounter(stdprometheus.CounterOpts{
6464 Namespace: "addsvc",
6565 Name: "integers_summed",
7373 }
7474 var duration metrics.TimeHistogram
7575 {
76 // Transport level metrics
76 // Transport level metrics.
7777 duration = metrics.NewTimeHistogram(time.Nanosecond, prometheus.NewSummary(stdprometheus.SummaryOpts{
7878 Namespace: "addsvc",
7979 Name: "request_duration_ns",
8181 }, []string{"method", "success"}))
8282 }
8383
84 // Tracing domain
84 // Tracing domain.
8585 var tracer stdopentracing.Tracer
8686 {
8787 if *zipkinAddr != "" {
120120 }
121121 }
122122
123 // Business domain
123 // Business domain.
124124 var service addsvc.Service
125125 {
126126 service = addsvc.NewBasicService()
128128 service = addsvc.ServiceInstrumentingMiddleware(ints, chars)(service)
129129 }
130130
131 // Endpoint domain
131 // Endpoint domain.
132132 var sumEndpoint endpoint.Endpoint
133133 {
134134 sumDuration := duration.With(metrics.Field{Key: "method", Value: "Sum"})
154154 ConcatEndpoint: concatEndpoint,
155155 }
156156
157 // Mechanical domain
157 // Mechanical domain.
158158 errc := make(chan error)
159159 ctx := context.Background()
160160
161 // Interrupt handler
161 // Interrupt handler.
162162 go func() {
163163 c := make(chan os.Signal, 1)
164164 signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
165165 errc <- fmt.Errorf("%s", <-c)
166166 }()
167167
168 // Debug listener
168 // Debug listener.
169169 go func() {
170170 logger := log.NewContext(logger).With("transport", "debug")
171171
181181 errc <- http.ListenAndServe(*debugAddr, m)
182182 }()
183183
184 // HTTP transport
184 // HTTP transport.
185185 go func() {
186186 logger := log.NewContext(logger).With("transport", "HTTP")
187187 h := addsvc.MakeHTTPHandler(ctx, endpoints, tracer, logger)
189189 errc <- http.ListenAndServe(*httpAddr, h)
190190 }()
191191
192 // gRPC transport
192 // gRPC transport.
193193 go func() {
194194 logger := log.NewContext(logger).With("transport", "gRPC")
195195
207207 errc <- s.Serve(ln)
208208 }()
209209
210 // Thrift transport
210 // Thrift transport.
211211 go func() {
212212 logger := log.NewContext(logger).With("transport", "Thrift")
213213
251251 ).Serve()
252252 }()
253253
254 // Run
254 // Run!
255255 logger.Log("exit", <-errc)
256256 }
00 package main
11
22 import (
3 "bytes"
34 "encoding/json"
45 "flag"
56 "fmt"
67 "io"
78 "io/ioutil"
8 stdlog "log"
99 "net/http"
1010 "net/url"
1111 "os"
1616
1717 "github.com/gorilla/mux"
1818 "github.com/hashicorp/consul/api"
19 "github.com/opentracing/opentracing-go"
19 stdopentracing "github.com/opentracing/opentracing-go"
2020 "golang.org/x/net/context"
2121
2222 "github.com/go-kit/kit/endpoint"
23 "github.com/go-kit/kit/examples/addsvc/client/grpc"
24 "github.com/go-kit/kit/examples/addsvc/server"
25 "github.com/go-kit/kit/loadbalancer"
26 "github.com/go-kit/kit/loadbalancer/consul"
23 "github.com/go-kit/kit/examples/addsvc"
24 addsvcgrpcclient "github.com/go-kit/kit/examples/addsvc/client/grpc"
2725 "github.com/go-kit/kit/log"
26 "github.com/go-kit/kit/sd"
27 consulsd "github.com/go-kit/kit/sd/consul"
28 "github.com/go-kit/kit/sd/lb"
2829 httptransport "github.com/go-kit/kit/transport/http"
30 "google.golang.org/grpc"
2931 )
3032
3133 func main() {
3739 )
3840 flag.Parse()
3941
40 // Log domain
41 logger := log.NewLogfmtLogger(os.Stderr)
42 logger = log.NewContext(logger).With("ts", log.DefaultTimestampUTC).With("caller", log.DefaultCaller)
43 stdlog.SetFlags(0) // flags are handled by Go kit's logger
44 stdlog.SetOutput(log.NewStdlibAdapter(logger)) // redirect anything using stdlib log to us
42 // Logging domain.
43 var logger log.Logger
44 {
45 logger = log.NewLogfmtLogger(os.Stderr)
46 logger = log.NewContext(logger).With("ts", log.DefaultTimestampUTC)
47 logger = log.NewContext(logger).With("caller", log.DefaultCaller)
48 }
4549
4650 // Service discovery domain. In this example we use Consul.
47 consulConfig := api.DefaultConfig()
48 if len(*consulAddr) > 0 {
49 consulConfig.Address = *consulAddr
50 }
51 consulClient, err := api.NewClient(consulConfig)
52 if err != nil {
53 logger.Log("err", err)
54 os.Exit(1)
55 }
56 discoveryClient := consul.NewClient(consulClient)
57
58 // Context domain.
51 var client consulsd.Client
52 {
53 consulConfig := api.DefaultConfig()
54 if len(*consulAddr) > 0 {
55 consulConfig.Address = *consulAddr
56 }
57 consulClient, err := api.NewClient(consulConfig)
58 if err != nil {
59 logger.Log("err", err)
60 os.Exit(1)
61 }
62 client = consulsd.NewClient(consulClient)
63 }
64
65 // Transport domain.
66 tracer := stdopentracing.GlobalTracer() // no-op
5967 ctx := context.Background()
60
61 // Set up our routes.
62 //
63 // Each Consul service name maps to multiple instances of that service. We
64 // connect to each instance according to its pre-determined transport: in this
65 // case, we choose to access addsvc via its gRPC client, and stringsvc over
66 // plain transport/http (it has no client package).
67 //
68 // Each service instance implements multiple methods, and we want to map each
69 // method to a unique path on the API gateway. So, we define that path and its
70 // corresponding factory function, which takes an instance string and returns an
71 // endpoint.Endpoint for the specific method.
72 //
73 // Finally, we mount that path + endpoint handler into the router.
7468 r := mux.NewRouter()
75 for consulName, methods := range map[string][]struct {
76 path string
77 factory loadbalancer.Factory
78 }{
79 "addsvc": {
80 {path: "/api/addsvc/concat", factory: grpc.MakeConcatEndpointFactory(opentracing.GlobalTracer(), nil)},
81 {path: "/api/addsvc/sum", factory: grpc.MakeSumEndpointFactory(opentracing.GlobalTracer(), nil)},
82 },
83 "stringsvc": {
84 {path: "/api/stringsvc/uppercase", factory: httpFactory(ctx, "GET", "uppercase/")},
85 {path: "/api/stringsvc/concat", factory: httpFactory(ctx, "GET", "concat/")},
86 },
87 } {
88 for _, method := range methods {
89 publisher, err := consul.NewPublisher(discoveryClient, method.factory, logger, consulName)
90 if err != nil {
91 logger.Log("service", consulName, "path", method.path, "err", err)
92 continue
93 }
94 lb := loadbalancer.NewRoundRobin(publisher)
95 e := loadbalancer.Retry(*retryMax, *retryTimeout, lb)
96 h := makeHandler(ctx, e, logger)
97 r.HandleFunc(method.path, h)
98 }
99 }
100
101 // Mechanical stuff.
69
70 // Now we begin installing the routes. Each route corresponds to a single
71 // method: sum, concat, uppercase, and count.
72
73 // addsvc routes.
74 {
75 // Each method gets constructed with a factory. Factories take an
76 // instance string, and return a specific endpoint. In the factory we
77 // dial the instance string we get from Consul, and then leverage an
78 // addsvc client package to construct a complete service. We can then
79 // leverage the addsvc.Make{Sum,Concat}Endpoint constructors to convert
80 // the complete service to specific endpoint.
81
82 var (
83 tags = []string{}
84 passingOnly = true
85 endpoints = addsvc.Endpoints{}
86 )
87 {
88 factory := addsvcFactory(addsvc.MakeSumEndpoint, tracer, logger)
89 subscriber := consulsd.NewSubscriber(client, factory, logger, "addsvc", tags, passingOnly)
90 balancer := lb.NewRoundRobin(subscriber)
91 retry := lb.Retry(*retryMax, *retryTimeout, balancer)
92 endpoints.SumEndpoint = retry
93 }
94 {
95 factory := addsvcFactory(addsvc.MakeConcatEndpoint, tracer, logger)
96 subscriber := consulsd.NewSubscriber(client, factory, logger, "addsvc", tags, passingOnly)
97 balancer := lb.NewRoundRobin(subscriber)
98 retry := lb.Retry(*retryMax, *retryTimeout, balancer)
99 endpoints.ConcatEndpoint = retry
100 }
101
102 // Here we leverage the fact that addsvc comes with a constructor for an
103 // HTTP handler, and just install it under a particular path prefix in
104 // our router.
105
106 r.PathPrefix("addsvc/").Handler(addsvc.MakeHTTPHandler(ctx, endpoints, tracer, logger))
107 }
108
109 // stringsvc routes.
110 {
111 // addsvc had lots of nice importable Go packages we could leverage.
112 // With stringsvc we are not so fortunate, it just has some endpoints
113 // that we assume will exist. So we have to write that logic here. This
114 // is by design, so you can see two totally different methods of
115 // proxying to a remote service.
116
117 var (
118 tags = []string{}
119 passingOnly = true
120 uppercase endpoint.Endpoint
121 count endpoint.Endpoint
122 )
123 {
124 factory := stringsvcFactory(ctx, "GET", "/uppercase")
125 subscriber := consulsd.NewSubscriber(client, factory, logger, "stringsvc", tags, passingOnly)
126 balancer := lb.NewRoundRobin(subscriber)
127 retry := lb.Retry(*retryMax, *retryTimeout, balancer)
128 uppercase = retry
129 }
130 {
131 factory := stringsvcFactory(ctx, "GET", "/count")
132 subscriber := consulsd.NewSubscriber(client, factory, logger, "stringsvc", tags, passingOnly)
133 balancer := lb.NewRoundRobin(subscriber)
134 retry := lb.Retry(*retryMax, *retryTimeout, balancer)
135 count = retry
136 }
137
138 // We can use the transport/http.Server to act as our handler, all we
139 // have to do provide it with the encode and decode functions for our
140 // stringsvc methods.
141
142 r.Handle("/stringsvc/uppercase", httptransport.NewServer(ctx, uppercase, decodeUppercaseRequest, encodeJSONResponse))
143 r.Handle("/stringsvc/count", httptransport.NewServer(ctx, count, decodeCountRequest, encodeJSONResponse))
144 }
145
146 // Interrupt handler.
102147 errc := make(chan error)
103148 go func() {
104 errc <- interrupt()
149 c := make(chan os.Signal)
150 signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
151 errc <- fmt.Errorf("%s", <-c)
105152 }()
153
154 // HTTP transport.
106155 go func() {
107 logger.Log("transport", "http", "addr", *httpAddr)
156 logger.Log("transport", "HTTP", "addr", *httpAddr)
108157 errc <- http.ListenAndServe(*httpAddr, r)
109158 }()
110 logger.Log("err", <-errc)
111 }
112
113 func makeHandler(ctx context.Context, e endpoint.Endpoint, logger log.Logger) http.HandlerFunc {
114 return func(w http.ResponseWriter, r *http.Request) {
115 resp, err := e(ctx, r.Body)
159
160 // Run!
161 logger.Log("exit", <-errc)
162 }
163
164 func addsvcFactory(makeEndpoint func(addsvc.Service) endpoint.Endpoint, tracer stdopentracing.Tracer, logger log.Logger) sd.Factory {
165 return func(instance string) (endpoint.Endpoint, io.Closer, error) {
166 // We could just as easily use the HTTP or Thrift client package to make
167 // the connection to addsvc. We've chosen gRPC arbitrarily. Note that
168 // the transport is an implementation detail: it doesn't leak out of
169 // this function. Nice!
170
171 conn, err := grpc.Dial(instance, grpc.WithInsecure())
116172 if err != nil {
117 logger.Log("err", err)
118 http.Error(w, err.Error(), http.StatusInternalServerError)
119 return
120 }
121 b, ok := resp.([]byte)
122 if !ok {
123 logger.Log("err", "endpoint response is not of type []byte")
124 http.Error(w, err.Error(), http.StatusInternalServerError)
125 return
126 }
127 _, err = w.Write(b)
128 if err != nil {
129 logger.Log("err", err)
130 return
131 }
132 }
133 }
134
135 func makeSumEndpoint(svc server.AddService) endpoint.Endpoint {
136 return func(ctx context.Context, request interface{}) (interface{}, error) {
137 r := request.(io.Reader)
138 var req server.SumRequest
139 if err := json.NewDecoder(r).Decode(&req); err != nil {
140 return nil, err
141 }
142 v := svc.Sum(req.A, req.B)
143 return json.Marshal(v)
144 }
145 }
146
147 func makeConcatEndpoint(svc server.AddService) endpoint.Endpoint {
148 return func(ctx context.Context, request interface{}) (interface{}, error) {
149 r := request.(io.Reader)
150 var req server.ConcatRequest
151 if err := json.NewDecoder(r).Decode(&req); err != nil {
152 return nil, err
153 }
154 v := svc.Concat(req.A, req.B)
155 return json.Marshal(v)
156 }
157 }
158
159 func httpFactory(ctx context.Context, method, path string) loadbalancer.Factory {
173 return nil, nil, err
174 }
175 service := addsvcgrpcclient.New(conn, tracer, logger)
176 endpoint := makeEndpoint(service)
177
178 // Notice that the addsvc gRPC client converts the connection to a
179 // complete addsvc, and we just throw away everything except the method
180 // we're interested in. A smarter factory would mux multiple methods
181 // over the same connection. But that would require more work to manage
182 // the returned io.Closer, e.g. reference counting. Since this is for
183 // the purposes of demonstration, we'll just keep it simple.
184
185 return endpoint, conn, nil
186 }
187 }
188
189 func stringsvcFactory(ctx context.Context, method, path string) sd.Factory {
160190 return func(instance string) (endpoint.Endpoint, io.Closer, error) {
161 var e endpoint.Endpoint
162191 if !strings.HasPrefix(instance, "http") {
163192 instance = "http://" + instance
164193 }
165 u, err := url.Parse(instance)
194 tgt, err := url.Parse(instance)
166195 if err != nil {
167196 return nil, nil, err
168197 }
169 u.Path = path
170
171 e = httptransport.NewClient(method, u, passEncode, passDecode).Endpoint()
172 return e, nil, nil
173 }
174 }
175
176 func passEncode(_ context.Context, r *http.Request, request interface{}) error {
177 r.Body = request.(io.ReadCloser)
198 tgt.Path = path
199
200 // Since stringsvc doesn't have any kind of package we can import, or
201 // any formal spec, we are forced to just assert where the endpoints
202 // live, and write our own code to encode and decode requests and
203 // responses. Ideally, if you write the service, you will want to
204 // provide stronger guarantees to your clients.
205
206 var (
207 enc httptransport.EncodeRequestFunc
208 dec httptransport.DecodeResponseFunc
209 )
210 switch path {
211 case "/uppercase":
212 enc, dec = encodeJSONRequest, decodeUppercaseResponse
213 case "/count":
214 enc, dec = encodeJSONRequest, decodeCountResponse
215 default:
216 return nil, nil, fmt.Errorf("unknown stringsvc path %q", path)
217 }
218
219 return httptransport.NewClient(method, tgt, enc, dec).Endpoint(), nil, nil
220 }
221 }
222
223 func encodeJSONRequest(_ context.Context, req *http.Request, request interface{}) error {
224 // Both uppercase and count requests are encoded in the same way:
225 // simple JSON serialization to the request body.
226 var buf bytes.Buffer
227 if err := json.NewEncoder(&buf).Encode(request); err != nil {
228 return err
229 }
230 req.Body = ioutil.NopCloser(&buf)
178231 return nil
179232 }
180233
181 func passDecode(_ context.Context, r *http.Response) (interface{}, error) {
182 return ioutil.ReadAll(r.Body)
183 }
184
185 func interrupt() error {
186 c := make(chan os.Signal)
187 signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
188 return fmt.Errorf("%s", <-c)
189 }
234 func encodeJSONResponse(_ context.Context, w http.ResponseWriter, response interface{}) error {
235 w.Header().Set("Content-Type", "application/json; charset=utf-8")
236 return json.NewEncoder(w).Encode(response)
237 }
238
239 // I've just copied these functions from stringsvc3/transport.go, inlining the
240 // struct definitions.
241
242 func decodeUppercaseResponse(ctx context.Context, resp *http.Response) (interface{}, error) {
243 var response struct {
244 V string `json:"v"`
245 Err string `json:"err,omitempty"`
246 }
247 if err := json.NewDecoder(resp.Body).Decode(&response); err != nil {
248 return nil, err
249 }
250 return response, nil
251 }
252
253 func decodeCountResponse(ctx context.Context, resp *http.Response) (interface{}, error) {
254 var response struct {
255 V int `json:"v"`
256 }
257 if err := json.NewDecoder(resp.Body).Decode(&response); err != nil {
258 return nil, err
259 }
260 return response, nil
261 }
262
263 func decodeUppercaseRequest(ctx context.Context, req *http.Request) (interface{}, error) {
264 var request struct {
265 S string `json:"s"`
266 }
267 if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
268 return nil, err
269 }
270 return request, nil
271 }
272
273 func decodeCountRequest(ctx context.Context, req *http.Request) (interface{}, error) {
274 var request struct {
275 S string `json:"s"`
276 }
277 if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
278 return nil, err
279 }
280 return request, nil
281 }
3131 // NewSubscriber returns a Consul subscriber which returns endpoints for the
3232 // requested service. It only returns instances for which all of the passed tags
3333 // are present.
34 func NewSubscriber(client Client, factory sd.Factory, logger log.Logger, service string, tags []string, passingOnly bool) (*Subscriber, error) {
34 func NewSubscriber(client Client, factory sd.Factory, logger log.Logger, service string, tags []string, passingOnly bool) *Subscriber {
3535 s := &Subscriber{
3636 cache: cache.New(factory, logger),
3737 client: client,
5151
5252 s.cache.Update(instances)
5353 go s.loop(index)
54 return s, nil
54 return s
5555 }
5656
5757 // Endpoints implements the Subscriber interface.
6262 client = newTestClient(consulState)
6363 )
6464
65 s, err := NewSubscriber(client, testFactory, logger, "search", []string{"api"}, true)
66 if err != nil {
67 t.Fatal(err)
68 }
65 s := NewSubscriber(client, testFactory, logger, "search", []string{"api"}, true)
6966 defer s.Stop()
7067
7168 endpoints, err := s.Endpoints()
8481 client = newTestClient(consulState)
8582 )
8683
87 s, err := NewSubscriber(client, testFactory, logger, "feed", []string{}, true)
88 if err != nil {
89 t.Fatal(err)
90 }
84 s := NewSubscriber(client, testFactory, logger, "feed", []string{}, true)
9185 defer s.Stop()
9286
9387 endpoints, err := s.Endpoints()
106100 client = newTestClient(consulState)
107101 )
108102
109 s, err := NewSubscriber(client, testFactory, logger, "search", []string{"api", "v2"}, true)
110 if err != nil {
111 t.Fatal(err)
112 }
103 s := NewSubscriber(client, testFactory, logger, "search", []string{"api", "v2"}, true)
113104 defer s.Stop()
114105
115106 endpoints, err := s.Endpoints()
123114 }
124115
125116 func TestSubscriberAddressOverride(t *testing.T) {
126 s, err := NewSubscriber(newTestClient(consulState), testFactory, log.NewNopLogger(), "search", []string{"db"}, true)
127 if err != nil {
128 t.Fatal(err)
129 }
117 s := NewSubscriber(newTestClient(consulState), testFactory, log.NewNopLogger(), "search", []string{"db"}, true)
130118 defer s.Stop()
131119
132120 endpoints, err := s.Endpoints()
88 // Factory is a function that converts an instance string (e.g. host:port) to a
99 // specific endpoint. Instances that provide multiple endpoints require multiple
1010 // factories. A factory also returns an io.Closer that's invoked when the
11 // instance goes away and needs to be cleaned up.
11 // instance goes away and needs to be cleaned up. Factories may return nil
12 // closers.
1213 //
1314 // Users are expected to provide their own factory functions that assume
1415 // specific transports, or can deduce transports by parsing the instance string.