Codebase list golang-github-go-kit-kit / 9d19224
discover services on startup, add addsvc support Marcel Hauf 8 years ago
3 changed file(s) with 159 addition(s) and 106 deletion(s). Raw diff Collapse all Expand all
0 {
1 "service": {
2 "name": "addsvc",
3 "tags": [],
4 "address": "127.0.0.1",
5 "port": 8081,
6 "enableTagOverride": false
7 }
8 }
00 package main
11
22 import (
3 "bytes"
4 "encoding/json"
53 "flag"
64 "fmt"
75 "io"
1412 "strings"
1513 "syscall"
1614
17 "golang.org/x/net/context"
18
19 "github.com/gorilla/mux"
20
2115 "github.com/go-kit/kit/endpoint"
16 addsvc "github.com/go-kit/kit/examples/addsvc/client/grpc"
17 "github.com/go-kit/kit/examples/addsvc/server"
2218 "github.com/go-kit/kit/loadbalancer"
2319 "github.com/go-kit/kit/loadbalancer/consul"
2420 log "github.com/go-kit/kit/log"
21 //grpctransport "github.com/go-kit/kit/transport/grpc"
2522 httptransport "github.com/go-kit/kit/transport/http"
26
23 //proto "github.com/golang/protobuf/proto"
24 "github.com/gorilla/mux"
2725 "github.com/hashicorp/consul/api"
26 "golang.org/x/net/context"
27 "google.golang.org/grpc"
2828 )
2929
3030 var (
6868 }
6969 discoveryClient = consul.NewClient(consulClient)
7070
71 // discover service stringsvc
72 uppercase, err := consul.NewPublisher(discoveryClient, routeFactory(ctx, "uppercase"), logger, "stringsvc")
73 if err != nil {
74 logger.Log("fatal", err)
75 }
76 count, err := consul.NewPublisher(discoveryClient, routeFactory(ctx, "count"), logger, "stringsvc")
77 if err != nil {
78 logger.Log("fatal", err)
79 }
80
81 // discover service addsvc
82 addsvcSum, err := consul.NewPublisher(discoveryClient, factoryAddsvc(ctx, logger, makeSumEndpoint), logger, "addsvc")
83 if err != nil {
84 logger.Log("fatal", err)
85 }
86 addsvcConcat, err := consul.NewPublisher(discoveryClient, factoryAddsvc(ctx, logger, makeConcatEndpoint), logger, "addsvc")
87 if err != nil {
88 logger.Log("fatal", err)
89 }
90
7191 // apigateway
7292 go func() {
7393 r := mux.NewRouter()
74 r.HandleFunc("/api/{service}/{method}", apiGateway)
94 r.HandleFunc("/api/addsvc/sum", makeSumHandler(ctx, loadbalancer.NewRoundRobin(addsvcSum)))
95 r.HandleFunc("/api/addsvc/concat", makeConcatHandler(ctx, loadbalancer.NewRoundRobin(addsvcConcat)))
96 r.HandleFunc("/api/stringsvc/uppercase", factoryPassHandler(loadbalancer.NewRoundRobin(uppercase), logger))
97 r.HandleFunc("/api/stringsvc/count", factoryPassHandler(loadbalancer.NewRoundRobin(count), logger))
7598 errc <- http.ListenAndServe(*httpAddr, r)
7699 }()
77100
85108 return fmt.Errorf("%s", <-c)
86109 }
87110
88 func apiGateway(w http.ResponseWriter, r *http.Request) {
89 vars := mux.Vars(r)
90 service := vars["service"]
91 method := vars["method"]
92 e, err := getEndpoint(service, method)
93 if err != nil {
94 logger.Log("error", err)
95 return
96 }
97
98 var val interface{}
99 dec := json.NewDecoder(r.Body)
100 err = dec.Decode(&val)
101 if err != nil {
102 logger.Log("warning", err)
103 fmt.Fprint(w, err)
104 return
105 }
106
107 resp, err := e(ctx, val)
108 if err != nil {
109 logger.Log("warning", err)
110 fmt.Fprint(w, err)
111 return
112 }
113 enc := json.NewEncoder(w)
114 err = enc.Encode(resp)
115 if err != nil {
116 logger.Log("warning", err)
117 fmt.Fprint(w, err)
118 return
119 }
120 }
121
122 var services = make(map[string]service)
123
124 type service map[string]loadbalancer.LoadBalancer
125
126 func getEndpoint(se string, method string) (endpoint.Endpoint, error) {
127 if s, ok := services[se]; ok {
128 if m, ok := s[method]; ok {
129 return m.Endpoint()
130 }
131 }
132
133 publisher, err := consul.NewPublisher(discoveryClient, factory(ctx, method), log.NewLogfmtLogger(&log.StdlibWriter{}), se)
134 publisher.Endpoints()
135 if err != nil {
136 return nil, err
137 }
138 rr := loadbalancer.NewRoundRobin(publisher)
139
140 if _, ok := services[se]; ok {
141 services[se][method] = rr
142 } else {
143 services[se] = service{method: rr}
144 }
145
146 return rr.Endpoint()
147 }
148
149 func factory(ctx context.Context, method string) loadbalancer.Factory {
150 return func(service string) (endpoint.Endpoint, io.Closer, error) {
111 func makeSumHandler(ctx context.Context, lb loadbalancer.LoadBalancer) http.HandlerFunc {
112 return func(w http.ResponseWriter, r *http.Request) {
113 sumReq, err := server.DecodeSumRequest(r)
114 if err != nil {
115 logger.Log("error", err)
116 return
117 }
118 e, err := lb.Endpoint()
119 if err != nil {
120 logger.Log("error", err)
121 return
122 }
123 sumResp, err := e(ctx, sumReq)
124 if err != nil {
125 logger.Log("error", err)
126 return
127 }
128 err = server.EncodeSumResponse(w, sumResp)
129 if err != nil {
130 logger.Log("error", err)
131 return
132 }
133 }
134 }
135
136 func makeConcatHandler(ctx context.Context, lb loadbalancer.LoadBalancer) http.HandlerFunc {
137 return func(w http.ResponseWriter, r *http.Request) {
138 concatReq, err := server.DecodeConcatRequest(r)
139 if err != nil {
140 logger.Log("error", err)
141 return
142 }
143 e, err := lb.Endpoint()
144 if err != nil {
145 logger.Log("error", err)
146 return
147 }
148 concatResp, err := e(ctx, concatReq)
149 if err != nil {
150 logger.Log("error", err)
151 return
152 }
153 err = server.EncodeConcatResponse(w, concatResp)
154 if err != nil {
155 logger.Log("error", err)
156 return
157 }
158 }
159 }
160
161 func factoryAddsvc(ctx context.Context, logger log.Logger, maker func(server.AddService) endpoint.Endpoint) loadbalancer.Factory {
162 return func(instance string) (endpoint.Endpoint, io.Closer, error) {
151163 var e endpoint.Endpoint
152 e = makeProxy(ctx, service, method)
164 conn, err := grpc.Dial(instance, grpc.WithInsecure())
165 if err != nil {
166 return e, nil, err
167 }
168 svc := addsvc.New(ctx, conn, logger)
169 return maker(svc), nil, nil
170 }
171 }
172
173 func makeSumEndpoint(svc server.AddService) endpoint.Endpoint {
174 return func(ctx context.Context, request interface{}) (interface{}, error) {
175 req := request.(server.SumRequest)
176 v := svc.Sum(req.A, req.B)
177 return server.SumResponse{V: v}, nil
178 }
179 }
180
181 func makeConcatEndpoint(svc server.AddService) endpoint.Endpoint {
182 return func(ctx context.Context, request interface{}) (interface{}, error) {
183 req := request.(server.ConcatRequest)
184 v := svc.Concat(req.A, req.B)
185 return server.ConcatResponse{V: v}, nil
186 }
187 }
188
189 func routeFactory(ctx context.Context, method string) loadbalancer.Factory {
190 return func(instance string) (endpoint.Endpoint, io.Closer, error) {
191 var e endpoint.Endpoint
192 if !strings.HasPrefix(instance, "http") {
193 instance = "http://" + instance
194 }
195 u, err := url.Parse(instance)
196 if err != nil {
197 return nil, nil, err
198 }
199 u.Path = method
200
201 e = httptransport.NewClient("GET", u, passEncode, passDecode).Endpoint()
153202 return e, nil, nil
154203 }
155204 }
156205
157 func makeProxy(ctx context.Context, service, method string) endpoint.Endpoint {
158 if !strings.HasPrefix(service, "http") {
159 service = "http://" + service
160 }
161 u, err := url.Parse(service)
162 if err != nil {
163 panic(err)
164 }
165 if u.Path == "" {
166 u.Path = "/" + method
167 }
168
169 return httptransport.NewClient(
170 "GET",
171 u,
172 encodeRequest,
173 decodeResponse,
174 ).Endpoint()
175 }
176
177 func encodeRequest(r *http.Request, request interface{}) error {
178 var buf bytes.Buffer
179 if err := json.NewEncoder(&buf).Encode(request); err != nil {
180 return err
181 }
182 r.Body = ioutil.NopCloser(&buf)
206 func passEncode(r *http.Request, request interface{}) error {
207 r.Body = request.(io.ReadCloser)
183208 return nil
184209 }
185210
186 func decodeResponse(r *http.Response) (interface{}, error) {
187 var response interface{}
188 if err := json.NewDecoder(r.Body).Decode(&response); err != nil {
189 return nil, err
190 }
191 return response, nil
192 }
211 func passDecode(r *http.Response) (interface{}, error) {
212 return ioutil.ReadAll(r.Body)
213 }
214
215 func factoryPassHandler(lb loadbalancer.LoadBalancer, logger log.Logger) http.HandlerFunc {
216 return func(w http.ResponseWriter, r *http.Request) {
217 e, err := lb.Endpoint()
218 if err != nil {
219 logger.Log("error", err)
220 return
221 }
222 resp, err := e(ctx, r.Body)
223 if err != nil {
224 logger.Log("warning", err)
225 fmt.Fprint(w, err)
226 return
227 }
228 b := resp.([]byte)
229 _, err = w.Write(b)
230 if err != nil {
231 logger.Log("warning", err)
232 fmt.Fprint(w, err)
233 return
234 }
235 }
236 }
00 {
11 "service": {
22 "name": "stringsvc",
3 "tags": ["master"],
3 "tags": [],
44 "address": "127.0.0.1",
55 "port": 8080,
66 "enableTagOverride": false