Package list golang-github-go-kit-kit / 550ed03
refactored ServerResponseFunc to be more symmetrical with ClientResponseFunc Bas van Beek 4 years ago
5 changed file(s) with 140 addition(s) and 63 deletion(s). Raw diff Collapse all Expand all
3131 encodeRequest,
3232 decodeResponse,
3333 &pb.TestResponse{},
34 grpctransport.ClientBefore(clientBefore),
35 grpctransport.ClientAfter(clientAfter),
34 grpctransport.ClientBefore(
35 injectCorrelationID,
36 ),
37 grpctransport.ClientBefore(
38 displayClientRequestHeaders,
39 ),
40 grpctransport.ClientAfter(
41 displayClientResponseHeaders,
42 displayClientResponseTrailers,
43 ),
44 grpctransport.ClientAfter(
45 extractConsumedCorrelationID,
46 ),
3647 ).Endpoint(),
3748 }
3849 }
22 import (
33 "context"
44 "fmt"
5 "log"
65
7 "google.golang.org/grpc"
86 "google.golang.org/grpc/metadata"
97 )
108
119 type metaContext string
1210
1311 const (
14 correlationID metaContext = "correlation-id"
15 responseHDR metaContext = "my-response-header"
16 responseTRLR metaContext = "correlation-id-consumed"
12 correlationID metaContext = "correlation-id"
13 responseHDR metaContext = "my-response-header"
14 responseTRLR metaContext = "my-response-trailer"
15 correlationIDTRLR metaContext = "correlation-id-consumed"
1716 )
1817
19 func clientBefore(ctx context.Context, md *metadata.MD) context.Context {
18 /* client before functions */
19
20 func injectCorrelationID(ctx context.Context, md *metadata.MD) context.Context {
2021 if hdr, ok := ctx.Value(correlationID).(string); ok {
22 fmt.Printf("\tClient found correlationID %q in context, set metadata header\n", hdr)
2123 (*md)[string(correlationID)] = append((*md)[string(correlationID)], hdr)
2224 }
25 return ctx
26 }
27
28 func displayClientRequestHeaders(ctx context.Context, md *metadata.MD) context.Context {
2329 if len(*md) > 0 {
2430 fmt.Println("\tClient >> Request Headers:")
2531 for key, val := range *md {
2935 return ctx
3036 }
3137
32 func serverBefore(ctx context.Context, md *metadata.MD) context.Context {
38 /* server before functions */
39
40 func extractCorrelationID(ctx context.Context, md *metadata.MD) context.Context {
41 if hdr, ok := (*md)[string(correlationID)]; ok {
42 cID := hdr[len(hdr)-1]
43 ctx = context.WithValue(ctx, correlationID, cID)
44 fmt.Printf("\tServer received correlationID %q in metadata header, set context\n", cID)
45 }
46 return ctx
47 }
48
49 func displayServerRequestHeaders(ctx context.Context, md *metadata.MD) context.Context {
3350 if len(*md) > 0 {
3451 fmt.Println("\tServer << Request Headers:")
3552 for key, val := range *md {
3653 fmt.Printf("\t\t%s: %s\n", key, val[len(val)-1])
3754 }
3855 }
39 if hdr, ok := (*md)[string(correlationID)]; ok {
40 cID := hdr[len(hdr)-1]
41 ctx = context.WithValue(ctx, correlationID, cID)
42 fmt.Printf("\tServer placed correlationID %q in context\n", cID)
43 }
4456 return ctx
4557 }
4658
47 func serverAfter(ctx context.Context, _ *metadata.MD) {
48 var mdHeader, mdTrailer metadata.MD
59 /* server after functions */
4960
50 mdHeader = metadata.Pairs(string(responseHDR), "has-a-value")
51 if err := grpc.SendHeader(ctx, mdHeader); err != nil {
52 log.Fatalf("unable to send header: %+v\n", err)
53 }
61 func injectResponseHeader(ctx context.Context, md *metadata.MD, _ *metadata.MD) {
62 *md = metadata.Join(*md, metadata.Pairs(string(responseHDR), "has-a-value"))
63 }
5464
55 if hdr, ok := ctx.Value(correlationID).(string); ok {
56 mdTrailer = metadata.Pairs(string(responseTRLR), hdr)
57 if err := grpc.SetTrailer(ctx, mdTrailer); err != nil {
58 log.Fatalf("unable to set trailer: %+v\n", err)
59 }
60 fmt.Printf("\tServer found correlationID %q in context, set consumed trailer\n", hdr)
61 }
62 if len(mdHeader) > 0 {
65 func displayServerResponseHeaders(ctx context.Context, md *metadata.MD, _ *metadata.MD) {
66 if len(*md) > 0 {
6367 fmt.Println("\tServer >> Response Headers:")
64 for key, val := range mdHeader {
65 fmt.Printf("\t\t%s: %s\n", key, val[len(val)-1])
66 }
67 }
68 if len(mdTrailer) > 0 {
69 fmt.Println("\tServer >> Response Trailers:")
70 for key, val := range mdTrailer {
68 for key, val := range *md {
7169 fmt.Printf("\t\t%s: %s\n", key, val[len(val)-1])
7270 }
7371 }
7472 }
7573
76 func clientAfter(ctx context.Context, mdHeader metadata.MD, mdTrailer metadata.MD) context.Context {
77 if len(mdHeader) > 0 {
78 fmt.Println("\tClient << Response Headers:")
79 for key, val := range mdHeader {
74 func injectResponseTrailer(ctx context.Context, _ *metadata.MD, md *metadata.MD) {
75 *md = metadata.Join(*md, metadata.Pairs(string(responseTRLR), "has-a-value-too"))
76 }
77
78 func injectConsumedCorrelationID(ctx context.Context, _ *metadata.MD, md *metadata.MD) {
79 if hdr, ok := ctx.Value(correlationID).(string); ok {
80 fmt.Printf("\tServer found correlationID %q in context, set consumed trailer\n", hdr)
81 *md = metadata.Join(*md, metadata.Pairs(string(correlationIDTRLR), hdr))
82 }
83 }
84
85 func displayServerResponseTrailers(ctx context.Context, _ *metadata.MD, md *metadata.MD) {
86 if len(*md) > 0 {
87 fmt.Println("\tServer >> Response Trailers:")
88 for key, val := range *md {
8089 fmt.Printf("\t\t%s: %s\n", key, val[len(val)-1])
8190 }
8291 }
83 if len(mdTrailer) > 0 {
84 fmt.Println("\tClient << Response Trailers:")
85 for key, val := range mdTrailer {
92 }
93
94 /* client after functions */
95
96 func displayClientResponseHeaders(ctx context.Context, md metadata.MD, _ metadata.MD) context.Context {
97 if len(md) > 0 {
98 fmt.Println("\tClient << Response Headers:")
99 for key, val := range md {
86100 fmt.Printf("\t\t%s: %s\n", key, val[len(val)-1])
87101 }
88102 }
103 return ctx
104 }
89105
90 if hdr, ok := mdTrailer[string(responseTRLR)]; ok {
91 ctx = context.WithValue(ctx, responseTRLR, hdr[len(hdr)-1])
106 func displayClientResponseTrailers(ctx context.Context, _ metadata.MD, md metadata.MD) context.Context {
107 if len(md) > 0 {
108 fmt.Println("\tClient << Response Trailers:")
109 for key, val := range md {
110 fmt.Printf("\t\t%s: %s\n", key, val[len(val)-1])
111 }
92112 }
93113 return ctx
94114 }
115
116 func extractConsumedCorrelationID(ctx context.Context, _ metadata.MD, md metadata.MD) context.Context {
117 if hdr, ok := md[string(correlationIDTRLR)]; ok {
118 fmt.Printf("\tClient received consumed correlationID %q in metadata trailer, set context\n", hdr[len(hdr)-1])
119 ctx = context.WithValue(ctx, correlationIDTRLR, hdr[len(hdr)-1])
120 }
121 return ctx
122 }
123
124 /* CorrelationID context handlers */
95125
96126 func SetCorrelationID(ctx context.Context, v string) context.Context {
97127 return context.WithValue(ctx, correlationID, v)
98128 }
99129
100130 func GetConsumedCorrelationID(ctx context.Context) string {
101 if trlr, ok := ctx.Value(responseTRLR).(string); ok {
131 if trlr, ok := ctx.Value(correlationIDTRLR).(string); ok {
102132 return trlr
103133 }
104134 return ""
4949 makeTestEndpoint(svc),
5050 decodeRequest,
5151 encodeResponse,
52 grpctransport.ServerBefore(serverBefore),
53 grpctransport.ServerAfter(serverAfter),
52 grpctransport.ServerBefore(
53 extractCorrelationID,
54 ),
55 grpctransport.ServerBefore(
56 displayServerRequestHeaders,
57 ),
58 grpctransport.ServerAfter(
59 injectResponseHeader,
60 injectResponseTrailer,
61 injectConsumedCorrelationID,
62 ),
63 grpctransport.ServerAfter(
64 displayServerResponseHeaders,
65 displayServerResponseTrailers,
66 ),
5467 ),
5568 }
5669 }
1818 type RequestFunc func(context.Context, *metadata.MD) context.Context
1919
2020 // ServerResponseFunc may take information from a request context and use it to
21 // manipulate the gRPC metadata header. ResponseFuncs are only executed in
22 // servers, after invoking the endpoint but prior to writing a response.
23 type ServerResponseFunc func(context.Context, *metadata.MD)
21 // manipulate the gRPC response metadata headers and trailers. ResponseFuncs are
22 // only executed in servers, after invoking the endpoint but prior to writing a
23 // response.
24 type ServerResponseFunc func(ctx context.Context, header *metadata.MD, trailer *metadata.MD)
2425
2526 // ClientResponseFunc may take information from a gRPC metadata header and/or
2627 // trailer and make the responses available for consumption. ClientResponseFuncs
2728 // are only executed in clients, after a request has been made, but prior to it
2829 // being decoded.
2930 type ClientResponseFunc func(ctx context.Context, header metadata.MD, trailer metadata.MD) context.Context
30
31 // SetResponseHeader returns a ResponseFunc that sets the specified metadata
32 // key-value pair.
33 func SetResponseHeader(key, val string) ServerResponseFunc {
34 return func(_ context.Context, md *metadata.MD) {
35 key, val := EncodeKeyValue(key, val)
36 (*md)[key] = append((*md)[key], val)
37 }
38 }
3931
4032 // SetRequestHeader returns a RequestFunc that sets the specified metadata
4133 // key-value pair.
4436 key, val := EncodeKeyValue(key, val)
4537 (*md)[key] = append((*md)[key], val)
4638 return ctx
39 }
40 }
41
42 // SetResponseHeader returns a ResponseFunc that sets the specified metadata
43 // key-value pair.
44 func SetResponseHeader(key, val string) ServerResponseFunc {
45 return func(_ context.Context, md *metadata.MD, _ *metadata.MD) {
46 key, val := EncodeKeyValue(key, val)
47 (*md)[key] = append((*md)[key], val)
48 }
49 }
50
51 // SetResponseTrailer returns a ResponseFunc that sets the specified metadata
52 // key-value pair.
53 func SetResponseTrailer(key, val string) ServerResponseFunc {
54 return func(_ context.Context, _ *metadata.MD, md *metadata.MD) {
55 key, val := EncodeKeyValue(key, val)
56 (*md)[key] = append((*md)[key], val)
4757 }
4858 }
4959
11
22 import (
33 oldcontext "golang.org/x/net/context"
4 "google.golang.org/grpc"
45 "google.golang.org/grpc/metadata"
56
67 "github.com/go-kit/kit/endpoint"
9596 return ctx, nil, err
9697 }
9798
99 var mdHeader, mdTrailer metadata.MD
98100 for _, f := range s.after {
99 f(ctx, &md)
101 f(ctx, &mdHeader, &mdTrailer)
100102 }
101
102 // Store potentially updated metadata in the gRPC context.
103 ctx = metadata.NewContext(ctx, md)
104103
105104 grpcResp, err := s.enc(ctx, response)
106105 if err != nil {
108107 return ctx, nil, err
109108 }
110109
110 if len(mdHeader) > 0 {
111 if err = grpc.SendHeader(ctx, mdHeader); err != nil {
112 s.logger.Log("err", err)
113 return ctx, nil, err
114 }
115 }
116
117 if len(mdTrailer) > 0 {
118 if err = grpc.SetTrailer(ctx, mdTrailer); err != nil {
119 s.logger.Log("err", err)
120 return ctx, nil, err
121 }
122 }
123
111124 return ctx, grpcResp, nil
112125 }