remove server wide context from HTTP Transport in favor of per request context provided by net/http
Bas van Beek
6 years ago
206 | 206 | // HTTP transport. |
207 | 207 | go func() { |
208 | 208 | logger := log.NewContext(logger).With("transport", "HTTP") |
209 | h := addsvc.MakeHTTPHandler(ctx, endpoints, tracer, logger) | |
209 | h := addsvc.MakeHTTPHandler(endpoints, tracer, logger) | |
210 | 210 | logger.Log("addr", *httpAddr) |
211 | 211 | errc <- http.ListenAndServe(*httpAddr, h) |
212 | 212 | }() |
19 | 19 | |
20 | 20 | // MakeHTTPHandler returns a handler that makes a set of endpoints available |
21 | 21 | // on predefined paths. |
22 | func MakeHTTPHandler(ctx context.Context, endpoints Endpoints, tracer stdopentracing.Tracer, logger log.Logger) http.Handler { | |
22 | func MakeHTTPHandler(endpoints Endpoints, tracer stdopentracing.Tracer, logger log.Logger) http.Handler { | |
23 | 23 | options := []httptransport.ServerOption{ |
24 | 24 | httptransport.ServerErrorEncoder(errorEncoder), |
25 | 25 | httptransport.ServerErrorLogger(logger), |
26 | 26 | } |
27 | 27 | m := http.NewServeMux() |
28 | 28 | m.Handle("/sum", httptransport.NewServer( |
29 | ctx, | |
30 | 29 | endpoints.SumEndpoint, |
31 | 30 | DecodeHTTPSumRequest, |
32 | 31 | EncodeHTTPGenericResponse, |
33 | 32 | append(options, httptransport.ServerBefore(opentracing.FromHTTPRequest(tracer, "Sum", logger)))..., |
34 | 33 | )) |
35 | 34 | m.Handle("/concat", httptransport.NewServer( |
36 | ctx, | |
37 | 35 | endpoints.ConcatEndpoint, |
38 | 36 | DecodeHTTPConcatRequest, |
39 | 37 | EncodeHTTPGenericResponse, |
103 | 103 | // HTTP handler, and just install it under a particular path prefix in |
104 | 104 | // our router. |
105 | 105 | |
106 | r.PathPrefix("/addsvc").Handler(http.StripPrefix("/addsvc", addsvc.MakeHTTPHandler(ctx, endpoints, tracer, logger))) | |
106 | r.PathPrefix("/addsvc").Handler(http.StripPrefix("/addsvc", addsvc.MakeHTTPHandler(endpoints, tracer, logger))) | |
107 | 107 | } |
108 | 108 | |
109 | 109 | // stringsvc routes. |
139 | 139 | // have to do provide it with the encode and decode functions for our |
140 | 140 | // stringsvc methods. |
141 | 141 | |
142 | r.Handle("/stringsvc/uppercase", httptransport.NewServer(ctx, uppercase, decodeUppercaseRequest, encodeJSONResponse)) | |
143 | r.Handle("/stringsvc/count", httptransport.NewServer(ctx, count, decodeCountRequest, encodeJSONResponse)) | |
142 | r.Handle("/stringsvc/uppercase", httptransport.NewServer(uppercase, decodeUppercaseRequest, encodeJSONResponse)) | |
143 | r.Handle("/stringsvc/count", httptransport.NewServer(count, decodeCountRequest, encodeJSONResponse)) | |
144 | 144 | } |
145 | 145 | |
146 | 146 | // Interrupt handler. |
0 | 0 | package main |
1 | 1 | |
2 | 2 | import ( |
3 | "context" | |
4 | 3 | "flag" |
5 | 4 | "fmt" |
6 | 5 | "net/http" |
25 | 24 | logger = log.NewContext(logger).With("caller", log.DefaultCaller) |
26 | 25 | } |
27 | 26 | |
28 | var ctx context.Context | |
29 | { | |
30 | ctx = context.Background() | |
31 | } | |
32 | ||
33 | 27 | var s profilesvc.Service |
34 | 28 | { |
35 | 29 | s = profilesvc.NewInmemService() |
38 | 32 | |
39 | 33 | var h http.Handler |
40 | 34 | { |
41 | h = profilesvc.MakeHTTPHandler(ctx, s, log.NewContext(logger).With("component", "HTTP")) | |
35 | h = profilesvc.MakeHTTPHandler(s, log.NewContext(logger).With("component", "HTTP")) | |
42 | 36 | } |
43 | 37 | |
44 | 38 | errs := make(chan error) |
24 | 24 | |
25 | 25 | // MakeHTTPHandler mounts all of the service endpoints into an http.Handler. |
26 | 26 | // Useful in a profilesvc server. |
27 | func MakeHTTPHandler(ctx context.Context, s Service, logger log.Logger) http.Handler { | |
27 | func MakeHTTPHandler(s Service, logger log.Logger) http.Handler { | |
28 | 28 | r := mux.NewRouter() |
29 | 29 | e := MakeServerEndpoints(s) |
30 | 30 | options := []httptransport.ServerOption{ |
43 | 43 | // DELETE /profiles/:id/addresses/:addressID remove an address |
44 | 44 | |
45 | 45 | r.Methods("POST").Path("/profiles/").Handler(httptransport.NewServer( |
46 | ctx, | |
47 | 46 | e.PostProfileEndpoint, |
48 | 47 | decodePostProfileRequest, |
49 | 48 | encodeResponse, |
50 | 49 | options..., |
51 | 50 | )) |
52 | 51 | r.Methods("GET").Path("/profiles/{id}").Handler(httptransport.NewServer( |
53 | ctx, | |
54 | 52 | e.GetProfileEndpoint, |
55 | 53 | decodeGetProfileRequest, |
56 | 54 | encodeResponse, |
57 | 55 | options..., |
58 | 56 | )) |
59 | 57 | r.Methods("PUT").Path("/profiles/{id}").Handler(httptransport.NewServer( |
60 | ctx, | |
61 | 58 | e.PutProfileEndpoint, |
62 | 59 | decodePutProfileRequest, |
63 | 60 | encodeResponse, |
64 | 61 | options..., |
65 | 62 | )) |
66 | 63 | r.Methods("PATCH").Path("/profiles/{id}").Handler(httptransport.NewServer( |
67 | ctx, | |
68 | 64 | e.PatchProfileEndpoint, |
69 | 65 | decodePatchProfileRequest, |
70 | 66 | encodeResponse, |
71 | 67 | options..., |
72 | 68 | )) |
73 | 69 | r.Methods("DELETE").Path("/profiles/{id}").Handler(httptransport.NewServer( |
74 | ctx, | |
75 | 70 | e.DeleteProfileEndpoint, |
76 | 71 | decodeDeleteProfileRequest, |
77 | 72 | encodeResponse, |
78 | 73 | options..., |
79 | 74 | )) |
80 | 75 | r.Methods("GET").Path("/profiles/{id}/addresses/").Handler(httptransport.NewServer( |
81 | ctx, | |
82 | 76 | e.GetAddressesEndpoint, |
83 | 77 | decodeGetAddressesRequest, |
84 | 78 | encodeResponse, |
85 | 79 | options..., |
86 | 80 | )) |
87 | 81 | r.Methods("GET").Path("/profiles/{id}/addresses/{addressID}").Handler(httptransport.NewServer( |
88 | ctx, | |
89 | 82 | e.GetAddressEndpoint, |
90 | 83 | decodeGetAddressRequest, |
91 | 84 | encodeResponse, |
92 | 85 | options..., |
93 | 86 | )) |
94 | 87 | r.Methods("POST").Path("/profiles/{id}/addresses/").Handler(httptransport.NewServer( |
95 | ctx, | |
96 | 88 | e.PostAddressEndpoint, |
97 | 89 | decodePostAddressRequest, |
98 | 90 | encodeResponse, |
99 | 91 | options..., |
100 | 92 | )) |
101 | 93 | r.Methods("DELETE").Path("/profiles/{id}/addresses/{addressID}").Handler(httptransport.NewServer( |
102 | ctx, | |
103 | 94 | e.DeleteAddressEndpoint, |
104 | 95 | decodeDeleteAddressRequest, |
105 | 96 | encodeResponse, |
16 | 16 | ) |
17 | 17 | |
18 | 18 | // MakeHandler returns a handler for the booking service. |
19 | func MakeHandler(ctx context.Context, bs Service, logger kitlog.Logger) http.Handler { | |
19 | func MakeHandler(bs Service, logger kitlog.Logger) http.Handler { | |
20 | 20 | opts := []kithttp.ServerOption{ |
21 | 21 | kithttp.ServerErrorLogger(logger), |
22 | 22 | kithttp.ServerErrorEncoder(encodeError), |
23 | 23 | } |
24 | 24 | |
25 | 25 | bookCargoHandler := kithttp.NewServer( |
26 | ctx, | |
27 | 26 | makeBookCargoEndpoint(bs), |
28 | 27 | decodeBookCargoRequest, |
29 | 28 | encodeResponse, |
30 | 29 | opts..., |
31 | 30 | ) |
32 | 31 | loadCargoHandler := kithttp.NewServer( |
33 | ctx, | |
34 | 32 | makeLoadCargoEndpoint(bs), |
35 | 33 | decodeLoadCargoRequest, |
36 | 34 | encodeResponse, |
37 | 35 | opts..., |
38 | 36 | ) |
39 | 37 | requestRoutesHandler := kithttp.NewServer( |
40 | ctx, | |
41 | 38 | makeRequestRoutesEndpoint(bs), |
42 | 39 | decodeRequestRoutesRequest, |
43 | 40 | encodeResponse, |
44 | 41 | opts..., |
45 | 42 | ) |
46 | 43 | assignToRouteHandler := kithttp.NewServer( |
47 | ctx, | |
48 | 44 | makeAssignToRouteEndpoint(bs), |
49 | 45 | decodeAssignToRouteRequest, |
50 | 46 | encodeResponse, |
51 | 47 | opts..., |
52 | 48 | ) |
53 | 49 | changeDestinationHandler := kithttp.NewServer( |
54 | ctx, | |
55 | 50 | makeChangeDestinationEndpoint(bs), |
56 | 51 | decodeChangeDestinationRequest, |
57 | 52 | encodeResponse, |
58 | 53 | opts..., |
59 | 54 | ) |
60 | 55 | listCargosHandler := kithttp.NewServer( |
61 | ctx, | |
62 | 56 | makeListCargosEndpoint(bs), |
63 | 57 | decodeListCargosRequest, |
64 | 58 | encodeResponse, |
65 | 59 | opts..., |
66 | 60 | ) |
67 | 61 | listLocationsHandler := kithttp.NewServer( |
68 | ctx, | |
69 | 62 | makeListLocationsEndpoint(bs), |
70 | 63 | decodeListLocationsRequest, |
71 | 64 | encodeResponse, |
16 | 16 | ) |
17 | 17 | |
18 | 18 | // MakeHandler returns a handler for the handling service. |
19 | func MakeHandler(ctx context.Context, hs Service, logger kitlog.Logger) http.Handler { | |
19 | func MakeHandler(hs Service, logger kitlog.Logger) http.Handler { | |
20 | 20 | r := mux.NewRouter() |
21 | 21 | |
22 | 22 | opts := []kithttp.ServerOption{ |
25 | 25 | } |
26 | 26 | |
27 | 27 | registerIncidentHandler := kithttp.NewServer( |
28 | ctx, | |
29 | 28 | makeRegisterIncidentEndpoint(hs), |
30 | 29 | decodeRegisterIncidentRequest, |
31 | 30 | encodeResponse, |
136 | 136 | |
137 | 137 | mux := http.NewServeMux() |
138 | 138 | |
139 | mux.Handle("/booking/v1/", booking.MakeHandler(ctx, bs, httpLogger)) | |
140 | mux.Handle("/tracking/v1/", tracking.MakeHandler(ctx, ts, httpLogger)) | |
141 | mux.Handle("/handling/v1/", handling.MakeHandler(ctx, hs, httpLogger)) | |
139 | mux.Handle("/booking/v1/", booking.MakeHandler(bs, httpLogger)) | |
140 | mux.Handle("/tracking/v1/", tracking.MakeHandler(ts, httpLogger)) | |
141 | mux.Handle("/handling/v1/", handling.MakeHandler(hs, httpLogger)) | |
142 | 142 | |
143 | 143 | http.Handle("/", accessControl(mux)) |
144 | 144 | http.Handle("/metrics", stdprometheus.Handler()) |
14 | 14 | ) |
15 | 15 | |
16 | 16 | // MakeHandler returns a handler for the tracking service. |
17 | func MakeHandler(ctx context.Context, ts Service, logger kitlog.Logger) http.Handler { | |
17 | func MakeHandler(ts Service, logger kitlog.Logger) http.Handler { | |
18 | 18 | r := mux.NewRouter() |
19 | 19 | |
20 | 20 | opts := []kithttp.ServerOption{ |
23 | 23 | } |
24 | 24 | |
25 | 25 | trackCargoHandler := kithttp.NewServer( |
26 | ctx, | |
27 | 26 | makeTrackCargoEndpoint(ts), |
28 | 27 | decodeTrackCargoRequest, |
29 | 28 | encodeResponse, |
31 | 31 | } |
32 | 32 | |
33 | 33 | func main() { |
34 | ctx := context.Background() | |
35 | 34 | svc := stringService{} |
36 | 35 | |
37 | 36 | uppercaseHandler := httptransport.NewServer( |
38 | ctx, | |
39 | 37 | makeUppercaseEndpoint(svc), |
40 | 38 | decodeUppercaseRequest, |
41 | 39 | encodeResponse, |
42 | 40 | ) |
43 | 41 | |
44 | 42 | countHandler := httptransport.NewServer( |
45 | ctx, | |
46 | 43 | makeCountEndpoint(svc), |
47 | 44 | decodeCountRequest, |
48 | 45 | encodeResponse, |
0 | 0 | package main |
1 | 1 | |
2 | 2 | import ( |
3 | "context" | |
4 | 3 | "net/http" |
5 | 4 | "os" |
6 | 5 | |
12 | 11 | ) |
13 | 12 | |
14 | 13 | func main() { |
15 | ctx := context.Background() | |
16 | 14 | logger := log.NewLogfmtLogger(os.Stderr) |
17 | 15 | |
18 | 16 | fieldKeys := []string{"method", "error"} |
41 | 39 | svc = instrumentingMiddleware{requestCount, requestLatency, countResult, svc} |
42 | 40 | |
43 | 41 | uppercaseHandler := httptransport.NewServer( |
44 | ctx, | |
45 | 42 | makeUppercaseEndpoint(svc), |
46 | 43 | decodeUppercaseRequest, |
47 | 44 | encodeResponse, |
48 | 45 | ) |
49 | 46 | |
50 | 47 | countHandler := httptransport.NewServer( |
51 | ctx, | |
52 | 48 | makeCountEndpoint(svc), |
53 | 49 | decodeCountRequest, |
54 | 50 | encodeResponse, |
23 | 23 | logger = log.NewLogfmtLogger(os.Stderr) |
24 | 24 | logger = log.NewContext(logger).With("listen", *listen).With("caller", log.DefaultCaller) |
25 | 25 | |
26 | ctx := context.Background() | |
27 | ||
28 | 26 | fieldKeys := []string{"method", "error"} |
29 | 27 | requestCount := kitprometheus.NewCounterFrom(stdprometheus.CounterOpts{ |
30 | 28 | Namespace: "my_group", |
47 | 45 | |
48 | 46 | var svc StringService |
49 | 47 | svc = stringService{} |
50 | svc = proxyingMiddleware(*proxy, ctx, logger)(svc) | |
48 | svc = proxyingMiddleware(context.Background(), *proxy, logger)(svc) | |
51 | 49 | svc = loggingMiddleware(logger)(svc) |
52 | 50 | svc = instrumentingMiddleware(requestCount, requestLatency, countResult)(svc) |
53 | 51 | |
54 | 52 | uppercaseHandler := httptransport.NewServer( |
55 | ctx, | |
56 | 53 | makeUppercaseEndpoint(svc), |
57 | 54 | decodeUppercaseRequest, |
58 | 55 | encodeResponse, |
59 | 56 | ) |
60 | 57 | countHandler := httptransport.NewServer( |
61 | ctx, | |
62 | 58 | makeCountEndpoint(svc), |
63 | 59 | decodeCountRequest, |
64 | 60 | encodeResponse, |
19 | 19 | httptransport "github.com/go-kit/kit/transport/http" |
20 | 20 | ) |
21 | 21 | |
22 | func proxyingMiddleware(instances string, ctx context.Context, logger log.Logger) ServiceMiddleware { | |
22 | func proxyingMiddleware(ctx context.Context, instances string, logger log.Logger) ServiceMiddleware { | |
23 | 23 | // If instances is empty, don't proxy. |
24 | 24 | if instances == "" { |
25 | 25 | logger.Log("proxy_to", "none") |
8 | 8 | |
9 | 9 | func ExamplePopulateRequestContext() { |
10 | 10 | handler := NewServer( |
11 | context.Background(), | |
12 | 11 | func(ctx context.Context, request interface{}) (response interface{}, err error) { |
13 | 12 | fmt.Println("Method", ctx.Value(ContextKeyRequestMethod).(string)) |
14 | 13 | fmt.Println("RequestPath", ctx.Value(ContextKeyRequestPath).(string)) |
10 | 10 | |
11 | 11 | // Server wraps an endpoint and implements http.Handler. |
12 | 12 | type Server struct { |
13 | ctx context.Context | |
14 | 13 | e endpoint.Endpoint |
15 | 14 | dec DecodeRequestFunc |
16 | 15 | enc EncodeResponseFunc |
24 | 23 | // NewServer constructs a new server, which implements http.Server and wraps |
25 | 24 | // the provided endpoint. |
26 | 25 | func NewServer( |
27 | ctx context.Context, | |
28 | 26 | e endpoint.Endpoint, |
29 | 27 | dec DecodeRequestFunc, |
30 | 28 | enc EncodeResponseFunc, |
31 | 29 | options ...ServerOption, |
32 | 30 | ) *Server { |
33 | 31 | s := &Server{ |
34 | ctx: ctx, | |
35 | 32 | e: e, |
36 | 33 | dec: dec, |
37 | 34 | enc: enc, |
84 | 81 | |
85 | 82 | // ServeHTTP implements http.Handler. |
86 | 83 | func (s Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { |
87 | ctx := s.ctx | |
84 | ctx := r.Context() | |
88 | 85 | |
89 | 86 | if s.finalizer != nil { |
90 | 87 | iw := &interceptingWriter{w, http.StatusOK, 0} |
15 | 15 | |
16 | 16 | func TestServerBadDecode(t *testing.T) { |
17 | 17 | handler := httptransport.NewServer( |
18 | context.Background(), | |
19 | 18 | func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil }, |
20 | 19 | func(context.Context, *http.Request) (interface{}, error) { return struct{}{}, errors.New("dang") }, |
21 | 20 | func(context.Context, http.ResponseWriter, interface{}) error { return nil }, |
30 | 29 | |
31 | 30 | func TestServerBadEndpoint(t *testing.T) { |
32 | 31 | handler := httptransport.NewServer( |
33 | context.Background(), | |
34 | 32 | func(context.Context, interface{}) (interface{}, error) { return struct{}{}, errors.New("dang") }, |
35 | 33 | func(context.Context, *http.Request) (interface{}, error) { return struct{}{}, nil }, |
36 | 34 | func(context.Context, http.ResponseWriter, interface{}) error { return nil }, |
45 | 43 | |
46 | 44 | func TestServerBadEncode(t *testing.T) { |
47 | 45 | handler := httptransport.NewServer( |
48 | context.Background(), | |
49 | 46 | func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil }, |
50 | 47 | func(context.Context, *http.Request) (interface{}, error) { return struct{}{}, nil }, |
51 | 48 | func(context.Context, http.ResponseWriter, interface{}) error { return errors.New("dang") }, |
67 | 64 | return http.StatusInternalServerError |
68 | 65 | } |
69 | 66 | handler := httptransport.NewServer( |
70 | context.Background(), | |
71 | 67 | func(context.Context, interface{}) (interface{}, error) { return struct{}{}, errTeapot }, |
72 | 68 | func(context.Context, *http.Request) (interface{}, error) { return struct{}{}, nil }, |
73 | 69 | func(context.Context, http.ResponseWriter, interface{}) error { return nil }, |
82 | 78 | } |
83 | 79 | |
84 | 80 | func TestServerHappyPath(t *testing.T) { |
85 | _, step, response := testServer(t) | |
81 | step, response := testServer(t) | |
86 | 82 | step() |
87 | 83 | resp := <-response |
88 | 84 | defer resp.Body.Close() |
91 | 87 | t.Errorf("want %d, have %d (%s)", want, have, buf) |
92 | 88 | } |
93 | 89 | } |
94 | ||
95 | 90 | |
96 | 91 | func TestMultipleServerBefore(t *testing.T) { |
97 | 92 | var ( |
102 | 97 | done = make(chan struct{}) |
103 | 98 | ) |
104 | 99 | handler := httptransport.NewServer( |
105 | context.Background(), | |
106 | 100 | endpoint.Nop, |
107 | 101 | func(context.Context, *http.Request) (interface{}, error) { |
108 | 102 | return struct{}{}, nil |
148 | 142 | done = make(chan struct{}) |
149 | 143 | ) |
150 | 144 | handler := httptransport.NewServer( |
151 | context.Background(), | |
152 | 145 | endpoint.Nop, |
153 | 146 | func(context.Context, *http.Request) (interface{}, error) { |
154 | 147 | return struct{}{}, nil |
194 | 187 | done = make(chan struct{}) |
195 | 188 | ) |
196 | 189 | handler := httptransport.NewServer( |
197 | context.Background(), | |
198 | 190 | endpoint.Nop, |
199 | 191 | func(context.Context, *http.Request) (interface{}, error) { |
200 | 192 | return struct{}{}, nil |
244 | 236 | |
245 | 237 | func TestEncodeJSONResponse(t *testing.T) { |
246 | 238 | handler := httptransport.NewServer( |
247 | context.Background(), | |
248 | 239 | func(context.Context, interface{}) (interface{}, error) { return enhancedResponse{Foo: "bar"}, nil }, |
249 | 240 | func(context.Context, *http.Request) (interface{}, error) { return struct{}{}, nil }, |
250 | 241 | httptransport.EncodeJSONResponse, |
275 | 266 | |
276 | 267 | func TestEncodeNoContent(t *testing.T) { |
277 | 268 | handler := httptransport.NewServer( |
278 | context.Background(), | |
279 | 269 | func(context.Context, interface{}) (interface{}, error) { return noContentResponse{}, nil }, |
280 | 270 | func(context.Context, *http.Request) (interface{}, error) { return struct{}{}, nil }, |
281 | 271 | httptransport.EncodeJSONResponse, |
306 | 296 | |
307 | 297 | func TestEnhancedError(t *testing.T) { |
308 | 298 | handler := httptransport.NewServer( |
309 | context.Background(), | |
310 | 299 | func(context.Context, interface{}) (interface{}, error) { return nil, enhancedError{} }, |
311 | 300 | func(context.Context, *http.Request) (interface{}, error) { return struct{}{}, nil }, |
312 | 301 | func(_ context.Context, w http.ResponseWriter, _ interface{}) error { return nil }, |
332 | 321 | } |
333 | 322 | } |
334 | 323 | |
335 | func testServer(t *testing.T) (cancel, step func(), resp <-chan *http.Response) { | |
324 | func testServer(t *testing.T) (step func(), resp <-chan *http.Response) { | |
336 | 325 | var ( |
337 | ctx, cancelfn = context.WithCancel(context.Background()) | |
338 | stepch = make(chan bool) | |
339 | endpoint = func(context.Context, interface{}) (interface{}, error) { <-stepch; return struct{}{}, nil } | |
340 | response = make(chan *http.Response) | |
341 | handler = httptransport.NewServer( | |
342 | ctx, | |
326 | stepch = make(chan bool) | |
327 | endpoint = func(context.Context, interface{}) (interface{}, error) { <-stepch; return struct{}{}, nil } | |
328 | response = make(chan *http.Response) | |
329 | handler = httptransport.NewServer( | |
343 | 330 | endpoint, |
344 | 331 | func(context.Context, *http.Request) (interface{}, error) { return struct{}{}, nil }, |
345 | 332 | func(context.Context, http.ResponseWriter, interface{}) error { return nil }, |
357 | 344 | } |
358 | 345 | response <- resp |
359 | 346 | }() |
360 | return cancelfn, func() { stepch <- true }, response | |
361 | } | |
347 | return func() { stepch <- true }, response | |
348 | } |