Codebase list golang-github-go-kit-kit / 70fbf2a
merge handler factory functions into one and use a service definition variable to construct the apigateway mapping Marcel Hauf 8 years ago
1 changed file(s) with 59 addition(s) and 93 deletion(s). Raw diff Collapse all Expand all
00 package main
11
22 import (
3 "encoding/json"
34 "flag"
45 "fmt"
56 "io"
2728 "google.golang.org/grpc"
2829 )
2930
30 var (
31 discoveryClient consul.Client
32 ctx = context.Background()
33 logger log.Logger
34 )
35
3631 func main() {
3732 fs := flag.NewFlagSet("", flag.ExitOnError)
3833 var (
4641 }
4742
4843 // log
49 logger = log.NewLogfmtLogger(os.Stderr)
44 logger := log.NewLogfmtLogger(os.Stderr)
5045 logger = log.NewContext(logger).With("ts", log.DefaultTimestampUTC).With("caller", log.DefaultCaller)
5146 stdlog.SetFlags(0) // flags are handled by Go kit's logger
5247 stdlog.SetOutput(log.NewStdlibAdapter(logger)) // redirect anything using stdlib log to us
6661 if err != nil {
6762 logger.Log("fatal", err)
6863 }
69 discoveryClient = consul.NewClient(consulClient)
70
71 // discover service stringsvc
72 uppercase, err := consul.NewPublisher(discoveryClient, routeFactory(ctx, "uppercase"), logger, "stringsvc")
73 if err != nil {
74 logger.Log("fatal", err)
75 }
76 count, err := consul.NewPublisher(discoveryClient, routeFactory(ctx, "count"), logger, "stringsvc")
77 if err != nil {
78 logger.Log("fatal", err)
79 }
80
81 // discover service addsvc
82 addsvcSum, err := consul.NewPublisher(discoveryClient, factoryAddsvc(ctx, logger, makeSumEndpoint), logger, "addsvc")
83 if err != nil {
84 logger.Log("fatal", err)
85 }
86 addsvcConcat, err := consul.NewPublisher(discoveryClient, factoryAddsvc(ctx, logger, makeConcatEndpoint), logger, "addsvc")
87 if err != nil {
88 logger.Log("fatal", err)
64 discoveryClient := consul.NewClient(consulClient)
65
66 ctx := context.Background()
67
68 // service definitions
69 serviceDefs := []*ServiceDef{}
70 serviceDefs = append(serviceDefs, &ServiceDef{
71 Name: "addsvc",
72 Endpoints: map[string]loadbalancer.Factory{
73 "/api/addsvc/concat": factoryAddsvc(ctx, logger, makeConcatEndpoint),
74 "/api/addsvc/sum": factoryAddsvc(ctx, logger, makeSumEndpoint),
75 },
76 })
77 serviceDefs = append(serviceDefs, &ServiceDef{
78 Name: "stringsvc",
79 Endpoints: map[string]loadbalancer.Factory{
80 "/api/stringsvc/uppercase": routeFactory(ctx, "uppercase"),
81 "/api/stringsvc/count": routeFactory(ctx, "count"),
82 },
83 })
84
85 // discover instances and register endpoints
86 r := mux.NewRouter()
87 for _, def := range serviceDefs {
88 for path, e := range def.Endpoints {
89 pub, err := consul.NewPublisher(discoveryClient, e, logger, def.Name)
90 if err != nil {
91 logger.Log("fatal", err)
92 }
93 r.HandleFunc(path, makeHandler(ctx, loadbalancer.NewRoundRobin(pub), logger))
94 }
8995 }
9096
9197 // apigateway
9298 go func() {
93 r := mux.NewRouter()
94 r.HandleFunc("/api/addsvc/sum", makeSumHandler(ctx, loadbalancer.NewRoundRobin(addsvcSum)))
95 r.HandleFunc("/api/addsvc/concat", makeConcatHandler(ctx, loadbalancer.NewRoundRobin(addsvcConcat)))
96 r.HandleFunc("/api/stringsvc/uppercase", factoryPassHandler(loadbalancer.NewRoundRobin(uppercase), logger))
97 r.HandleFunc("/api/stringsvc/count", factoryPassHandler(loadbalancer.NewRoundRobin(count), logger))
9899 errc <- http.ListenAndServe(*httpAddr, r)
99100 }()
100101
101102 // wait for interrupt/error
102103 logger.Log("fatal", <-errc)
104 }
105
106 type ServiceDef struct {
107 Name string
108 Endpoints map[string]loadbalancer.Factory
103109 }
104110
105111 func interrupt() error {
108114 return fmt.Errorf("%s", <-c)
109115 }
110116
111 func makeSumHandler(ctx context.Context, lb loadbalancer.LoadBalancer) http.HandlerFunc {
117 func makeHandler(ctx context.Context, lb loadbalancer.LoadBalancer, logger log.Logger) http.HandlerFunc {
112118 return func(w http.ResponseWriter, r *http.Request) {
113 sumReq, err := server.DecodeSumRequest(r)
119 e, err := lb.Endpoint()
114120 if err != nil {
115121 logger.Log("error", err)
116122 return
117123 }
118 e, err := lb.Endpoint()
124 resp, err := e(ctx, r.Body)
119125 if err != nil {
120126 logger.Log("error", err)
121127 return
122128 }
123 sumResp, err := e(ctx, sumReq)
124 if err != nil {
125 logger.Log("error", err)
126 return
127 }
128 err = server.EncodeSumResponse(w, sumResp)
129 if err != nil {
130 logger.Log("error", err)
131 return
132 }
133 }
134 }
135
136 func makeConcatHandler(ctx context.Context, lb loadbalancer.LoadBalancer) http.HandlerFunc {
137 return func(w http.ResponseWriter, r *http.Request) {
138 concatReq, err := server.DecodeConcatRequest(r)
139 if err != nil {
140 logger.Log("error", err)
141 return
142 }
143 e, err := lb.Endpoint()
144 if err != nil {
145 logger.Log("error", err)
146 return
147 }
148 concatResp, err := e(ctx, concatReq)
149 if err != nil {
150 logger.Log("error", err)
151 return
152 }
153 err = server.EncodeConcatResponse(w, concatResp)
129 b, ok := resp.([]byte)
130 if !ok {
131 logger.Log("error", "endpoint response is not of type []byte")
132 return
133 }
134 _, err = w.Write(b)
154135 if err != nil {
155136 logger.Log("error", err)
156137 return
172153
173154 func makeSumEndpoint(svc server.AddService) endpoint.Endpoint {
174155 return func(ctx context.Context, request interface{}) (interface{}, error) {
175 req := request.(server.SumRequest)
156 r := request.(io.Reader)
157 var req server.SumRequest
158 if err := json.NewDecoder(r).Decode(&req); err != nil {
159 return nil, err
160 }
176161 v := svc.Sum(req.A, req.B)
177 return server.SumResponse{V: v}, nil
162 return json.Marshal(v)
178163 }
179164 }
180165
181166 func makeConcatEndpoint(svc server.AddService) endpoint.Endpoint {
182167 return func(ctx context.Context, request interface{}) (interface{}, error) {
183 req := request.(server.ConcatRequest)
168 r := request.(io.Reader)
169 var req server.ConcatRequest
170 if err := json.NewDecoder(r).Decode(&req); err != nil {
171 return nil, err
172 }
184173 v := svc.Concat(req.A, req.B)
185 return server.ConcatResponse{V: v}, nil
174 return json.Marshal(v)
186175 }
187176 }
188177
211200 func passDecode(r *http.Response) (interface{}, error) {
212201 return ioutil.ReadAll(r.Body)
213202 }
214
215 func factoryPassHandler(lb loadbalancer.LoadBalancer, logger log.Logger) http.HandlerFunc {
216 return func(w http.ResponseWriter, r *http.Request) {
217 e, err := lb.Endpoint()
218 if err != nil {
219 logger.Log("error", err)
220 return
221 }
222 resp, err := e(ctx, r.Body)
223 if err != nil {
224 logger.Log("warning", err)
225 fmt.Fprint(w, err)
226 return
227 }
228 b := resp.([]byte)
229 _, err = w.Write(b)
230 if err != nil {
231 logger.Log("warning", err)
232 fmt.Fprint(w, err)
233 return
234 }
235 }
236 }