diff --git a/examples/apigateway/addsvc_def.json b/examples/apigateway/addsvc_def.json new file mode 100644 index 0000000..4df1774 --- /dev/null +++ b/examples/apigateway/addsvc_def.json @@ -0,0 +1,9 @@ +{ + "service": { + "name": "addsvc", + "tags": [], + "address": "127.0.0.1", + "port": 8081, + "enableTagOverride": false + } +} diff --git a/examples/apigateway/main.go b/examples/apigateway/main.go index e6cc488..2ea22d5 100644 --- a/examples/apigateway/main.go +++ b/examples/apigateway/main.go @@ -1,8 +1,6 @@ package main import ( - "bytes" - "encoding/json" "flag" "fmt" "io" @@ -15,17 +13,19 @@ "strings" "syscall" - "golang.org/x/net/context" - - "github.com/gorilla/mux" - "github.com/go-kit/kit/endpoint" + addsvc "github.com/go-kit/kit/examples/addsvc/client/grpc" + "github.com/go-kit/kit/examples/addsvc/server" "github.com/go-kit/kit/loadbalancer" "github.com/go-kit/kit/loadbalancer/consul" log "github.com/go-kit/kit/log" + //grpctransport "github.com/go-kit/kit/transport/grpc" httptransport "github.com/go-kit/kit/transport/http" - + //proto "github.com/golang/protobuf/proto" + "github.com/gorilla/mux" "github.com/hashicorp/consul/api" + "golang.org/x/net/context" + "google.golang.org/grpc" ) var ( @@ -69,10 +69,33 @@ } discoveryClient = consul.NewClient(consulClient) + // discover service stringsvc + uppercase, err := consul.NewPublisher(discoveryClient, routeFactory(ctx, "uppercase"), logger, "stringsvc") + if err != nil { + logger.Log("fatal", err) + } + count, err := consul.NewPublisher(discoveryClient, routeFactory(ctx, "count"), logger, "stringsvc") + if err != nil { + logger.Log("fatal", err) + } + + // discover service addsvc + addsvcSum, err := consul.NewPublisher(discoveryClient, factoryAddsvc(ctx, logger, makeSumEndpoint), logger, "addsvc") + if err != nil { + logger.Log("fatal", err) + } + addsvcConcat, err := consul.NewPublisher(discoveryClient, factoryAddsvc(ctx, logger, makeConcatEndpoint), logger, "addsvc") + if err != nil { + logger.Log("fatal", err) + } + // apigateway go func() { r := mux.NewRouter() - r.HandleFunc("/api/{service}/{method}", apiGateway) + r.HandleFunc("/api/addsvc/sum", makeSumHandler(ctx, loadbalancer.NewRoundRobin(addsvcSum))) + r.HandleFunc("/api/addsvc/concat", makeConcatHandler(ctx, loadbalancer.NewRoundRobin(addsvcConcat))) + r.HandleFunc("/api/stringsvc/uppercase", factoryPassHandler(loadbalancer.NewRoundRobin(uppercase), logger)) + r.HandleFunc("/api/stringsvc/count", factoryPassHandler(loadbalancer.NewRoundRobin(count), logger)) errc <- http.ListenAndServe(*httpAddr, r) }() @@ -86,108 +109,129 @@ return fmt.Errorf("%s", <-c) } -func apiGateway(w http.ResponseWriter, r *http.Request) { - vars := mux.Vars(r) - service := vars["service"] - method := vars["method"] - e, err := getEndpoint(service, method) - if err != nil { - logger.Log("error", err) - return - } - - var val interface{} - dec := json.NewDecoder(r.Body) - err = dec.Decode(&val) - if err != nil { - logger.Log("warning", err) - fmt.Fprint(w, err) - return - } - - resp, err := e(ctx, val) - if err != nil { - logger.Log("warning", err) - fmt.Fprint(w, err) - return - } - enc := json.NewEncoder(w) - err = enc.Encode(resp) - if err != nil { - logger.Log("warning", err) - fmt.Fprint(w, err) - return - } -} - -var services = make(map[string]service) - -type service map[string]loadbalancer.LoadBalancer - -func getEndpoint(se string, method string) (endpoint.Endpoint, error) { - if s, ok := services[se]; ok { - if m, ok := s[method]; ok { - return m.Endpoint() - } - } - - publisher, err := consul.NewPublisher(discoveryClient, factory(ctx, method), log.NewLogfmtLogger(&log.StdlibWriter{}), se) - publisher.Endpoints() - if err != nil { - return nil, err - } - rr := loadbalancer.NewRoundRobin(publisher) - - if _, ok := services[se]; ok { - services[se][method] = rr - } else { - services[se] = service{method: rr} - } - - return rr.Endpoint() -} - -func factory(ctx context.Context, method string) loadbalancer.Factory { - return func(service string) (endpoint.Endpoint, io.Closer, error) { +func makeSumHandler(ctx context.Context, lb loadbalancer.LoadBalancer) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + sumReq, err := server.DecodeSumRequest(r) + if err != nil { + logger.Log("error", err) + return + } + e, err := lb.Endpoint() + if err != nil { + logger.Log("error", err) + return + } + sumResp, err := e(ctx, sumReq) + if err != nil { + logger.Log("error", err) + return + } + err = server.EncodeSumResponse(w, sumResp) + if err != nil { + logger.Log("error", err) + return + } + } +} + +func makeConcatHandler(ctx context.Context, lb loadbalancer.LoadBalancer) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + concatReq, err := server.DecodeConcatRequest(r) + if err != nil { + logger.Log("error", err) + return + } + e, err := lb.Endpoint() + if err != nil { + logger.Log("error", err) + return + } + concatResp, err := e(ctx, concatReq) + if err != nil { + logger.Log("error", err) + return + } + err = server.EncodeConcatResponse(w, concatResp) + if err != nil { + logger.Log("error", err) + return + } + } +} + +func factoryAddsvc(ctx context.Context, logger log.Logger, maker func(server.AddService) endpoint.Endpoint) loadbalancer.Factory { + return func(instance string) (endpoint.Endpoint, io.Closer, error) { var e endpoint.Endpoint - e = makeProxy(ctx, service, method) + conn, err := grpc.Dial(instance, grpc.WithInsecure()) + if err != nil { + return e, nil, err + } + svc := addsvc.New(ctx, conn, logger) + return maker(svc), nil, nil + } +} + +func makeSumEndpoint(svc server.AddService) endpoint.Endpoint { + return func(ctx context.Context, request interface{}) (interface{}, error) { + req := request.(server.SumRequest) + v := svc.Sum(req.A, req.B) + return server.SumResponse{V: v}, nil + } +} + +func makeConcatEndpoint(svc server.AddService) endpoint.Endpoint { + return func(ctx context.Context, request interface{}) (interface{}, error) { + req := request.(server.ConcatRequest) + v := svc.Concat(req.A, req.B) + return server.ConcatResponse{V: v}, nil + } +} + +func routeFactory(ctx context.Context, method string) loadbalancer.Factory { + return func(instance string) (endpoint.Endpoint, io.Closer, error) { + var e endpoint.Endpoint + if !strings.HasPrefix(instance, "http") { + instance = "http://" + instance + } + u, err := url.Parse(instance) + if err != nil { + return nil, nil, err + } + u.Path = method + + e = httptransport.NewClient("GET", u, passEncode, passDecode).Endpoint() return e, nil, nil } } -func makeProxy(ctx context.Context, service, method string) endpoint.Endpoint { - if !strings.HasPrefix(service, "http") { - service = "http://" + service - } - u, err := url.Parse(service) - if err != nil { - panic(err) - } - if u.Path == "" { - u.Path = "/" + method - } - - return httptransport.NewClient( - "GET", - u, - encodeRequest, - decodeResponse, - ).Endpoint() -} - -func encodeRequest(r *http.Request, request interface{}) error { - var buf bytes.Buffer - if err := json.NewEncoder(&buf).Encode(request); err != nil { - return err - } - r.Body = ioutil.NopCloser(&buf) +func passEncode(r *http.Request, request interface{}) error { + r.Body = request.(io.ReadCloser) return nil } -func decodeResponse(r *http.Response) (interface{}, error) { - var response interface{} - if err := json.NewDecoder(r.Body).Decode(&response); err != nil { - return nil, err - } - return response, nil -} +func passDecode(r *http.Response) (interface{}, error) { + return ioutil.ReadAll(r.Body) +} + +func factoryPassHandler(lb loadbalancer.LoadBalancer, logger log.Logger) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + e, err := lb.Endpoint() + if err != nil { + logger.Log("error", err) + return + } + resp, err := e(ctx, r.Body) + if err != nil { + logger.Log("warning", err) + fmt.Fprint(w, err) + return + } + b := resp.([]byte) + _, err = w.Write(b) + if err != nil { + logger.Log("warning", err) + fmt.Fprint(w, err) + return + } + } +} diff --git a/examples/apigateway/stringsvc_def.json b/examples/apigateway/stringsvc_def.json index 065489d..54373be 100644 --- a/examples/apigateway/stringsvc_def.json +++ b/examples/apigateway/stringsvc_def.json @@ -1,7 +1,7 @@ { "service": { "name": "stringsvc", - "tags": ["master"], + "tags": [], "address": "127.0.0.1", "port": 8080, "enableTagOverride": false