Codebase list golang-github-go-kit-kit / 790176c
Merge pull request #125 from go-kit/transport-http-constructor transport/http: Constructors Peter Bourgon 8 years ago
11 changed file(s) with 265 addition(s) and 233 deletion(s). Raw diff Collapse all Expand all
150150 ctx := context.Background()
151151 svc := stringService{}
152152
153 uppercaseHandler := httptransport.Server{
154 Context: ctx,
155 Endpoint: makeUppercaseEndpoint(svc),
156 DecodeRequestFunc: decodeUppercaseRequest,
157 EncodeResponseFunc: encodeResponse,
158 }
159
160 countHandler := httptransport.Server{
161 Context: ctx,
162 Endpoint: makeCountEndpoint(svc),
163 DecodeRequestFunc: decodeCountRequest,
164 EncodeResponseFunc: encodeResponse,
165 }
153 uppercaseHandler := httptransport.NewServer(
154 ctx,
155 makeUppercaseEndpoint(svc),
156 decodeUppercaseRequest,
157 encodeResponse,
158 )
159
160 countHandler := httptransport.NewServer(
161 ctx,
162 makeCountEndpoint(svc),
163 decodeCountRequest,
164 encodeResponse,
165 )
166166
167167 http.Handle("/uppercase", uppercaseHandler)
168168 http.Handle("/count", countHandler)
256256 count = makeCountEndpoint(svc)
257257 count = loggingMiddleware(log.NewContext(logger).With("method", "count"))(count)
258258
259 uppercaseHandler := httptransport.Server{
260 Endpoint: uppercase,
259 uppercaseHandler := httptransport.Server(
261260 // ...
262 }
263
264 countHandler := httptransport.Server{
265 Endpoint: count,
261 uppercase,
266262 // ...
267 }
263 )
264
265 countHandler := httptransport.Server(
266 // ...
267 count,
268 // ...
269 )
268270 ```
269271
270272 It turns out that this technique is useful for a lot more than just logging.
329331 svc := stringService{}
330332 svc = loggingMiddleware{logger, svc}
331333
332 uppercaseHandler := httptransport.Server{
333 Endpoint: makeUppercaseEndpoint(svc),
334 // ...
335 }
336
337 countHandler := httptransport.Server{
338 Endpoint: makeCountEndpoint(svc),
339 // ...
340 }
334 uppercaseHandler := httptransport.NewServer(
335 // ...
336 makeUppercaseEndpoint(svc),
337 // ...
338 )
339
340 countHandler := httptransport.NewServer(
341 // ...
342 makeCountEndpoint(svc),
343 // ...
344 )
341345 }
342346 ```
343347
415419 svc = loggingMiddleware{logger, svc}
416420 svc = instrumentingMiddleware{requestCount, requestLatency, countResult, svc}
417421
418 uppercaseHandler := httptransport.Server{
419 Endpoint: makeUppercaseEndpoint(svc),
420 // ...
421 }
422
423 countHandler := httptransport.Server{
424 Endpoint: makeCountEndpoint(svc),
425 // ...
426 }
422 uppercaseHandler := httptransport.NewServer(
423 // ...
424 makeUppercaseEndpoint(svc),
425 // ...
426 )
427
428 countHandler := httptransport.NewServer(
429 // ...
430 makeCountEndpoint(svc),
431 // ...
432 )
427433
428434 http.Handle("/metrics", stdprometheus.Handler())
429435 }
509515 }
510516
511517 func makeUppercaseEndpoint(ctx context.Context, proxyURL string) endpoint.Endpoint {
512 return (httptransport.Client{
513 Client: http.DefaultClient,
514 Method: "GET",
515 URL: mustParseURL(proxyURL),
516 Context: ctx,
517 EncodeFunc: encodeUppercaseRequest,
518 DecodeFunc: decodeUppercaseResponse,
519 }).Endpoint()
518 return httptransport.NewClient(
519 "GET",
520 mustParseURL(proxyURL),
521 encodeUppercaseRequest,
522 decodeUppercaseResponse,
523 ).Endpoint()
520524 }
521525 ```
522526
2828 return client{
2929 Context: ctx,
3030 Logger: logger,
31 sum: (httptransport.Client{
32 Client: c,
33 Method: "GET",
34 URL: sumURL,
35 EncodeRequestFunc: server.EncodeSumRequest,
36 DecodeResponseFunc: server.DecodeSumResponse,
37 }).Endpoint(),
38 concat: (httptransport.Client{
39 Client: c,
40 Method: "GET",
41 URL: concatURL,
42 EncodeRequestFunc: server.EncodeConcatRequest,
43 DecodeResponseFunc: server.DecodeConcatResponse,
44 }).Endpoint(),
31 sum: httptransport.NewClient(
32 "GET",
33 sumURL,
34 server.EncodeSumRequest,
35 server.DecodeSumResponse,
36 ).Endpoint(),
37 concat: httptransport.NewClient(
38 "GET",
39 concatURL,
40 server.EncodeConcatRequest,
41 server.DecodeConcatResponse,
42 ).Endpoint(),
4543 }
4644 }
4745
138138
139139 sum = makeSumEndpoint(svc)
140140 sum = zipkin.AnnotateServer(newSumSpan, collector)(sum)
141 mux.Handle("/sum", httptransport.Server{
142 Context: root,
143 Endpoint: sum,
144 DecodeRequestFunc: server.DecodeSumRequest,
145 EncodeResponseFunc: server.EncodeSumResponse,
146 Before: []httptransport.RequestFunc{traceSum},
147 After: []httptransport.ResponseFunc{},
148 Logger: transportLogger,
149 })
141 mux.Handle("/sum", httptransport.NewServer(
142 root,
143 sum,
144 server.DecodeSumRequest,
145 server.EncodeSumResponse,
146 httptransport.ServerBefore(traceSum),
147 httptransport.ServerErrorLogger(transportLogger),
148 ))
150149
151150 concat = makeConcatEndpoint(svc)
152151 concat = zipkin.AnnotateServer(newConcatSpan, collector)(concat)
153 mux.Handle("/concat", httptransport.Server{
154 Context: root,
155 Endpoint: concat,
156 DecodeRequestFunc: server.DecodeConcatRequest,
157 EncodeResponseFunc: server.EncodeConcatResponse,
158 Before: []httptransport.RequestFunc{traceConcat},
159 After: []httptransport.ResponseFunc{},
160 Logger: transportLogger,
161 })
152 mux.Handle("/concat", httptransport.NewServer(
153 root,
154 concat,
155 server.DecodeConcatRequest,
156 server.EncodeConcatResponse,
157 httptransport.ServerBefore(traceConcat),
158 httptransport.ServerErrorLogger(transportLogger),
159 ))
162160
163161 _ = transportLogger.Log("addr", *httpAddr)
164162 errc <- http.ListenAndServe(*httpAddr, mux)
3535 ctx := context.Background()
3636 svc := stringService{}
3737
38 uppercaseHandler := httptransport.Server{
39 Context: ctx,
40 Endpoint: makeUppercaseEndpoint(svc),
41 DecodeRequestFunc: decodeUppercaseRequest,
42 EncodeResponseFunc: encodeResponse,
43 }
38 uppercaseHandler := httptransport.NewServer(
39 ctx,
40 makeUppercaseEndpoint(svc),
41 decodeUppercaseRequest,
42 encodeResponse,
43 )
4444
45 countHandler := httptransport.Server{
46 Context: ctx,
47 Endpoint: makeCountEndpoint(svc),
48 DecodeRequestFunc: decodeCountRequest,
49 EncodeResponseFunc: encodeResponse,
50 }
45 countHandler := httptransport.NewServer(
46 ctx,
47 makeCountEndpoint(svc),
48 decodeCountRequest,
49 encodeResponse,
50 )
5151
5252 http.Handle("/uppercase", uppercaseHandler)
5353 http.Handle("/count", countHandler)
4242 svc = loggingMiddleware{logger, svc}
4343 svc = instrumentingMiddleware{requestCount, requestLatency, countResult, svc}
4444
45 uppercaseHandler := httptransport.Server{
46 Context: ctx,
47 Endpoint: makeUppercaseEndpoint(svc),
48 DecodeRequestFunc: decodeUppercaseRequest,
49 EncodeResponseFunc: encodeResponse,
50 }
45 uppercaseHandler := httptransport.NewServer(
46 ctx,
47 makeUppercaseEndpoint(svc),
48 decodeUppercaseRequest,
49 encodeResponse,
50 )
5151
52 countHandler := httptransport.Server{
53 Context: ctx,
54 Endpoint: makeCountEndpoint(svc),
55 DecodeRequestFunc: decodeCountRequest,
56 EncodeResponseFunc: encodeResponse,
57 }
52 countHandler := httptransport.NewServer(
53 ctx,
54 makeCountEndpoint(svc),
55 decodeCountRequest,
56 encodeResponse,
57 )
5858
5959 http.Handle("/uppercase", uppercaseHandler)
6060 http.Handle("/count", countHandler)
5353 svc = loggingMiddleware(logger)(svc)
5454 svc = instrumentingMiddleware(requestCount, requestLatency, countResult)(svc)
5555
56 uppercaseHandler := httptransport.Server{
57 Context: ctx,
58 Endpoint: makeUppercaseEndpoint(svc),
59 DecodeRequestFunc: decodeUppercaseRequest,
60 EncodeResponseFunc: encodeResponse,
61 }
62 countHandler := httptransport.Server{
63 Context: ctx,
64 Endpoint: makeCountEndpoint(svc),
65 DecodeRequestFunc: decodeCountRequest,
66 EncodeResponseFunc: encodeResponse,
67 }
56 uppercaseHandler := httptransport.NewServer(
57 ctx,
58 makeUppercaseEndpoint(svc),
59 decodeUppercaseRequest,
60 encodeResponse,
61 )
62 countHandler := httptransport.NewServer(
63 ctx,
64 makeCountEndpoint(svc),
65 decodeCountRequest,
66 encodeResponse,
67 )
6868
6969 http.Handle("/uppercase", uppercaseHandler)
7070 http.Handle("/count", countHandler)
22 import (
33 "errors"
44 "fmt"
5 "net/http"
65 "net/url"
76 "strings"
87 "time"
8483 if u.Path == "" {
8584 u.Path = "/uppercase"
8685 }
87 return (httptransport.Client{
88 Client: http.DefaultClient,
89 Method: "GET",
90 URL: u,
91 DecodeResponseFunc: decodeUppercaseResponse,
92 EncodeRequestFunc: encodeRequest,
93 }).Endpoint()
86 return httptransport.NewClient(
87 "GET",
88 u,
89 encodeRequest,
90 decodeUppercaseResponse,
91 ).Endpoint()
9492 }
9593
9694 func split(s string) []string {
1111
1212 // Client wraps a URL and provides a method that implements endpoint.Endpoint.
1313 type Client struct {
14 // If client is nil, http.DefaultClient will be used.
15 *http.Client
14 client *http.Client
15 method string
16 tgt *url.URL
17 enc EncodeRequestFunc
18 dec DecodeResponseFunc
19 before []RequestFunc
20 }
1621
17 // Method must be provided.
18 Method string
22 // NewClient returns a
23 func NewClient(method string, tgt *url.URL, enc EncodeRequestFunc, dec DecodeResponseFunc, options ...ClientOption) *Client {
24 c := &Client{
25 client: http.DefaultClient,
26 method: method,
27 tgt: tgt,
28 enc: enc,
29 dec: dec,
30 before: []RequestFunc{},
31 }
32 for _, option := range options {
33 option(c)
34 }
35 return c
36 }
1937
20 // URL must be provided.
21 URL *url.URL
38 // ClientOption sets an optional parameter for clients.
39 type ClientOption func(*Client)
2240
23 // EncodeRequestFunc must be provided. The HTTP request passed to the
24 // EncodeRequestFunc will have a nil body.
25 EncodeRequestFunc
41 // SetClient sets the underlying HTTP client used for requests.
42 // By default, http.DefaultClient is used.
43 func SetClient(client *http.Client) ClientOption {
44 return func(c *Client) { c.client = client }
45 }
2646
27 // DecodeResponseFunc must be provided.
28 DecodeResponseFunc
29
30 // Before functions are executed on the outgoing request after it is
31 // created, but before it's sent to the HTTP client. Clients have no After
32 // ResponseFuncs, as they don't work with ResponseWriters.
33 Before []RequestFunc
47 // SetClientBefore sets the RequestFuncs that are applied to the outgoing HTTP
48 // request before it's invoked.
49 func SetClientBefore(before ...RequestFunc) ClientOption {
50 return func(c *Client) { c.before = before }
3451 }
3552
3653 // Endpoint returns a usable endpoint that will invoke the RPC specified by
4057 ctx, cancel := context.WithCancel(ctx)
4158 defer cancel()
4259
43 req, err := http.NewRequest(c.Method, c.URL.String(), nil)
60 req, err := http.NewRequest(c.method, c.tgt.String(), nil)
4461 if err != nil {
4562 return nil, fmt.Errorf("NewRequest: %v", err)
4663 }
4764
48 if err = c.EncodeRequestFunc(req, request); err != nil {
65 if err = c.enc(req, request); err != nil {
4966 return nil, fmt.Errorf("Encode: %v", err)
5067 }
5168
52 for _, f := range c.Before {
69 for _, f := range c.before {
5370 ctx = f(ctx, req)
5471 }
5572
56 var resp *http.Response
57 if c.Client != nil {
58 resp, err = c.Client.Do(req)
59 } else {
60 resp, err = http.DefaultClient.Do(req)
61 }
73 resp, err := c.client.Do(req)
6274 if err != nil {
6375 return nil, fmt.Errorf("Do: %v", err)
6476 }
6577 defer func() { _ = resp.Body.Close() }()
6678
67 response, err := c.DecodeResponseFunc(resp)
79 response, err := c.dec(resp)
6880 if err != nil {
6981 return nil, fmt.Errorf("Decode: %v", err)
7082 }
2525 w.WriteHeader(http.StatusOK)
2626 }))
2727
28 client := httptransport.Client{
29 Method: "GET",
30 URL: mustParse(server.URL),
31 EncodeRequestFunc: encode,
32 DecodeResponseFunc: decode,
33 Before: []httptransport.RequestFunc{httptransport.SetRequestHeader(headerKey, headerVal)},
34 }
28 client := httptransport.NewClient(
29 "GET",
30 mustParse(server.URL),
31 encode,
32 decode,
33 httptransport.SetClientBefore(httptransport.SetRequestHeader(headerKey, headerVal)),
34 )
3535
3636 _, err := client.Endpoint()(context.Background(), struct{}{})
3737 if err != nil {
1010
1111 // Server wraps an endpoint and implements http.Handler.
1212 type Server struct {
13 // A background context must be provided.
14 context.Context
13 ctx context.Context
14 e endpoint.Endpoint
15 dec DecodeRequestFunc
16 enc EncodeResponseFunc
17 before []RequestFunc
18 after []ResponseFunc
19 errorEncoder func(w http.ResponseWriter, err error)
20 logger log.Logger
21 }
1522
16 // The endpoint that will be invoked.
17 endpoint.Endpoint
23 // NewServer constructs a new server, which implements http.Server and wraps
24 // the provided endpoint.
25 func NewServer(
26 ctx context.Context,
27 e endpoint.Endpoint,
28 dec DecodeRequestFunc,
29 enc EncodeResponseFunc,
30 options ...ServerOption,
31 ) *Server {
32 s := &Server{
33 ctx: ctx,
34 e: e,
35 dec: dec,
36 enc: enc,
37 errorEncoder: defaultErrorEncoder,
38 logger: log.NewNopLogger(),
39 }
40 for _, option := range options {
41 option(s)
42 }
43 return s
44 }
1845
19 // DecodeRequestFunc must be provided.
20 DecodeRequestFunc
46 // ServerOption sets an optional parameter for servers.
47 type ServerOption func(*Server)
2148
22 // EncodeResponseFunc must be provided.
23 EncodeResponseFunc
49 // ServerBefore functions are executed on the HTTP request object before the
50 // request is decoded.
51 func ServerBefore(before ...RequestFunc) ServerOption {
52 return func(s *Server) { s.before = before }
53 }
2454
25 // Before functions are executed on the HTTP request object before the
26 // request is decoded.
27 Before []RequestFunc
55 // ServerAfter functions are executed on the HTTP response writer after the
56 // endpoint is invoked, but before anything is written to the client.
57 func ServerAfter(after ...ResponseFunc) ServerOption {
58 return func(s *Server) { s.after = after }
59 }
2860
29 // After functions are executed on the HTTP response writer after the
30 // endpoint is invoked, but before anything is written to the client.
31 After []ResponseFunc
61 // ServerErrorEncoder is used to encode errors to the http.ResponseWriter
62 // whenever they're encountered in the processing of a request. Clients can
63 // use this to provide custom error formatting and response codes. By default,
64 // errors will be written as plain text with an appropriate, if generic,
65 // status code.
66 func ServerErrorEncoder(f func(w http.ResponseWriter, err error)) ServerOption {
67 return func(s *Server) { s.errorEncoder = f }
68 }
3269
33 // ErrorEncoder is used to encode errors to the http.ResponseWriter
34 // whenever they're encountered in the processing of a request. Clients
35 // can use this to provide custom error formatting and response codes. If
36 // ErrorEncoder is nil, the error will be written as plain text with
37 // an appropriate, if generic, status code.
38 ErrorEncoder func(w http.ResponseWriter, err error)
39
40 // Logger is used to log errors.
41 Logger log.Logger
70 // ServerErrorLogger is used to log non-terminal errors. By default, no errors
71 // are logged.
72 func ServerErrorLogger(logger log.Logger) ServerOption {
73 return func(s *Server) { s.logger = logger }
4274 }
4375
4476 // ServeHTTP implements http.Handler.
4577 func (s Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
46 if s.ErrorEncoder == nil {
47 s.ErrorEncoder = defaultErrorEncoder
48 }
49
50 ctx, cancel := context.WithCancel(s.Context)
78 ctx, cancel := context.WithCancel(s.ctx)
5179 defer cancel()
5280
53 for _, f := range s.Before {
81 for _, f := range s.before {
5482 ctx = f(ctx, r)
5583 }
5684
57 request, err := s.DecodeRequestFunc(r)
85 request, err := s.dec(r)
5886 if err != nil {
59 _ = s.Logger.Log("err", err)
60 s.ErrorEncoder(w, badRequestError{err})
87 _ = s.logger.Log("err", err)
88 s.errorEncoder(w, badRequestError{err})
6189 return
6290 }
6391
64 response, err := s.Endpoint(ctx, request)
92 response, err := s.e(ctx, request)
6593 if err != nil {
66 _ = s.Logger.Log("err", err)
67 s.ErrorEncoder(w, err)
94 _ = s.logger.Log("err", err)
95 s.errorEncoder(w, err)
6896 return
6997 }
7098
71 for _, f := range s.After {
99 for _, f := range s.after {
72100 f(ctx, w)
73101 }
74102
75 if err := s.EncodeResponseFunc(w, response); err != nil {
76 _ = s.Logger.Log("err", err)
77 s.ErrorEncoder(w, err)
103 if err := s.enc(w, response); err != nil {
104 _ = s.logger.Log("err", err)
105 s.errorEncoder(w, err)
78106 return
79107 }
80108 }
88
99 "golang.org/x/net/context"
1010
11 "github.com/go-kit/kit/log"
1211 httptransport "github.com/go-kit/kit/transport/http"
1312 )
1413
1514 func TestServerBadDecode(t *testing.T) {
16 handler := httptransport.Server{
17 Context: context.Background(),
18 Endpoint: func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil },
19 DecodeRequestFunc: func(*http.Request) (interface{}, error) { return struct{}{}, errors.New("dang") },
20 EncodeResponseFunc: func(http.ResponseWriter, interface{}) error { return nil },
21 Logger: log.NewNopLogger(),
22 }
15 handler := httptransport.NewServer(
16 context.Background(),
17 func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil },
18 func(*http.Request) (interface{}, error) { return struct{}{}, errors.New("dang") },
19 func(http.ResponseWriter, interface{}) error { return nil },
20 )
2321 server := httptest.NewServer(handler)
2422 defer server.Close()
2523 resp, _ := http.Get(server.URL)
2927 }
3028
3129 func TestServerBadEndpoint(t *testing.T) {
32 handler := httptransport.Server{
33 Context: context.Background(),
34 Endpoint: func(context.Context, interface{}) (interface{}, error) { return struct{}{}, errors.New("dang") },
35 DecodeRequestFunc: func(*http.Request) (interface{}, error) { return struct{}{}, nil },
36 EncodeResponseFunc: func(http.ResponseWriter, interface{}) error { return nil },
37 Logger: log.NewNopLogger(),
38 }
30 handler := httptransport.NewServer(
31 context.Background(),
32 func(context.Context, interface{}) (interface{}, error) { return struct{}{}, errors.New("dang") },
33 func(*http.Request) (interface{}, error) { return struct{}{}, nil },
34 func(http.ResponseWriter, interface{}) error { return nil },
35 )
3936 server := httptest.NewServer(handler)
4037 defer server.Close()
4138 resp, _ := http.Get(server.URL)
4542 }
4643
4744 func TestServerBadEncode(t *testing.T) {
48 handler := httptransport.Server{
49 Context: context.Background(),
50 Endpoint: func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil },
51 DecodeRequestFunc: func(*http.Request) (interface{}, error) { return struct{}{}, nil },
52 EncodeResponseFunc: func(http.ResponseWriter, interface{}) error { return errors.New("dang") },
53 Logger: log.NewNopLogger(),
54 }
45 handler := httptransport.NewServer(
46 context.Background(),
47 func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil },
48 func(*http.Request) (interface{}, error) { return struct{}{}, nil },
49 func(http.ResponseWriter, interface{}) error { return errors.New("dang") },
50 )
5551 server := httptest.NewServer(handler)
5652 defer server.Close()
5753 resp, _ := http.Get(server.URL)
6864 }
6965 return http.StatusInternalServerError
7066 }
71 handler := httptransport.Server{
72 Context: context.Background(),
73 Endpoint: func(context.Context, interface{}) (interface{}, error) { return struct{}{}, errTeapot },
74 DecodeRequestFunc: func(*http.Request) (interface{}, error) { return struct{}{}, nil },
75 EncodeResponseFunc: func(http.ResponseWriter, interface{}) error { return nil },
76 ErrorEncoder: func(w http.ResponseWriter, err error) { w.WriteHeader(code(err)) },
77 Logger: log.NewNopLogger(),
78 }
67 handler := httptransport.NewServer(
68 context.Background(),
69 func(context.Context, interface{}) (interface{}, error) { return struct{}{}, errTeapot },
70 func(*http.Request) (interface{}, error) { return struct{}{}, nil },
71 func(http.ResponseWriter, interface{}) error { return nil },
72 httptransport.ServerErrorEncoder(func(w http.ResponseWriter, err error) { w.WriteHeader(code(err)) }),
73 )
7974 server := httptest.NewServer(handler)
8075 defer server.Close()
8176 resp, _ := http.Get(server.URL)
10196 stepch = make(chan bool)
10297 endpoint = func(context.Context, interface{}) (interface{}, error) { <-stepch; return struct{}{}, nil }
10398 response = make(chan *http.Response)
104 handler = httptransport.Server{
105 Context: ctx,
106 Endpoint: endpoint,
107 DecodeRequestFunc: func(*http.Request) (interface{}, error) { return struct{}{}, nil },
108 EncodeResponseFunc: func(http.ResponseWriter, interface{}) error { return nil },
109 Before: []httptransport.RequestFunc{func(ctx context.Context, r *http.Request) context.Context { return ctx }},
110 After: []httptransport.ResponseFunc{func(ctx context.Context, w http.ResponseWriter) { return }},
111 Logger: log.NewNopLogger(),
112 }
99 handler = httptransport.NewServer(
100 ctx,
101 endpoint,
102 func(*http.Request) (interface{}, error) { return struct{}{}, nil },
103 func(http.ResponseWriter, interface{}) error { return nil },
104 httptransport.ServerBefore(func(ctx context.Context, r *http.Request) context.Context { return ctx }),
105 httptransport.ServerAfter(func(ctx context.Context, w http.ResponseWriter) { return }),
106 )
113107 )
114108 go func() {
115109 server := httptest.NewServer(handler)