Codebase list golang-github-go-kit-kit / 5dc97b3
Apply @peterbourgon apigateway patch * Reorder imports * Remove flagset * Add loadbalancer.Retry and parameters * Restructure service definitions * Make ServiceDef an anonymous inline type * Rename the factory functions for symmetry * Add a lot of comments * Pass method as parameter to httpFactory * Passing "fatal" to logger.Log doesn't terminate; fix some instances of that * Consistent logger.Log "err" key with error vals * Changes to makeHandler * Use Retry-wrapped loadbalancer instead of invoking loadbalancer.Endpoint directly * Write http.Error to client in case of error Marcel Hauf 8 years ago
1 changed file(s) with 92 addition(s) and 93 deletion(s). Raw diff Collapse all Expand all
1212 "os/signal"
1313 "strings"
1414 "syscall"
15 "time"
16
17 "github.com/gorilla/mux"
18 "github.com/hashicorp/consul/api"
19 "golang.org/x/net/context"
20 "google.golang.org/grpc"
1521
1622 "github.com/go-kit/kit/endpoint"
1723 addsvc "github.com/go-kit/kit/examples/addsvc/client/grpc"
1824 "github.com/go-kit/kit/examples/addsvc/server"
1925 "github.com/go-kit/kit/loadbalancer"
2026 "github.com/go-kit/kit/loadbalancer/consul"
21 log "github.com/go-kit/kit/log"
22 //grpctransport "github.com/go-kit/kit/transport/grpc"
27 "github.com/go-kit/kit/log"
2328 httptransport "github.com/go-kit/kit/transport/http"
24 //proto "github.com/golang/protobuf/proto"
25 "github.com/gorilla/mux"
26 "github.com/hashicorp/consul/api"
27 "golang.org/x/net/context"
28 "google.golang.org/grpc"
2929 )
3030
3131 func main() {
32 fs := flag.NewFlagSet("", flag.ExitOnError)
3332 var (
34 httpAddr = fs.String("http.addr", ":8000", "Address for HTTP (JSON) server")
35 consulAddr = fs.String("consul.addr", "", "Consul agent address")
33 httpAddr = flag.String("http.addr", ":8000", "Address for HTTP (JSON) server")
34 consulAddr = flag.String("consul.addr", "", "Consul agent address")
35 retryMax = flag.Int("retry.max", 3, "per-request retries to different instances")
36 retryTimeout = flag.Duration("retry.timeout", 500*time.Millisecond, "per-request timeout, including retries")
3637 )
37 flag.Usage = fs.Usage
38 if err := fs.Parse(os.Args[1:]); err != nil {
39 fmt.Fprintf(os.Stderr, "%v", err)
40 os.Exit(1)
41 }
42
43 // log
38 flag.Parse()
39
40 // Log domain
4441 logger := log.NewLogfmtLogger(os.Stderr)
4542 logger = log.NewContext(logger).With("ts", log.DefaultTimestampUTC).With("caller", log.DefaultCaller)
4643 stdlog.SetFlags(0) // flags are handled by Go kit's logger
4744 stdlog.SetOutput(log.NewStdlibAdapter(logger)) // redirect anything using stdlib log to us
4845
49 // errors
46 // Service discovery domain. In this example we use Consul.
47 consulConfig := api.DefaultConfig()
48 if len(*consulAddr) > 0 {
49 consulConfig.Address = *consulAddr
50 }
51 consulClient, err := api.NewClient(consulConfig)
52 if err != nil {
53 logger.Log("err", err)
54 os.Exit(1)
55 }
56 discoveryClient := consul.NewClient(consulClient)
57
58 // Context domain.
59 ctx := context.Background()
60
61 // Set up our routes.
62 //
63 // Each Consul service name maps to multiple instances of that service. We
64 // connect to each instance according to its pre-determined transport: in this
65 // case, we choose to access addsvc via its gRPC client, and stringsvc over
66 // plain transport/http (it has no client package).
67 //
68 // Each service instance implements multiple methods, and we want to map each
69 // method to a unique path on the API gateway. So, we define that path and its
70 // corresponding factory function, which takes an instance string and returns an
71 // endpoint.Endpoint for the specific method.
72 //
73 // Finally, we mount that path + endpoint handler into the router.
74 r := mux.NewRouter()
75 for consulName, methods := range map[string][]struct {
76 path string
77 factory loadbalancer.Factory
78 }{
79 "addsvc": {
80 {path: "/api/addsvc/concat", factory: addsvcGRPCFactory(ctx, makeConcatEndpoint, logger)},
81 {path: "/api/addsvc/sum", factory: addsvcGRPCFactory(ctx, makeSumEndpoint, logger)},
82 },
83 "stringsvc": {
84 {path: "/api/stringsvc/uppercase", factory: httpFactory(ctx, "GET", "uppercase/")},
85 {path: "/api/stringsvc/concat", factory: httpFactory(ctx, "GET", "concat/")},
86 },
87 } {
88 for _, method := range methods {
89 publisher, err := consul.NewPublisher(discoveryClient, method.factory, logger, consulName)
90 if err != nil {
91 logger.Log("service", consulName, "path", method.path, "err", err)
92 continue
93 }
94 lb := loadbalancer.NewRoundRobin(publisher)
95 e := loadbalancer.Retry(*retryMax, *retryTimeout, lb)
96 h := makeHandler(ctx, e, logger)
97 r.HandleFunc(method.path, h)
98 }
99 }
100
101 // Mechanical stuff.
50102 errc := make(chan error)
51103 go func() {
52104 errc <- interrupt()
53105 }()
54
55 // consul
56 consulConfig := api.DefaultConfig()
57 if len(*consulAddr) > 0 {
58 consulConfig.Address = *consulAddr
59 }
60 consulClient, err := api.NewClient(consulConfig)
61 if err != nil {
62 logger.Log("fatal", err)
63 }
64 discoveryClient := consul.NewClient(consulClient)
65
66 ctx := context.Background()
67
68 // service definitions
69 serviceDefs := []*ServiceDef{}
70 serviceDefs = append(serviceDefs, &ServiceDef{
71 Name: "addsvc",
72 Endpoints: map[string]loadbalancer.Factory{
73 "/api/addsvc/concat": factoryAddsvc(ctx, logger, makeConcatEndpoint),
74 "/api/addsvc/sum": factoryAddsvc(ctx, logger, makeSumEndpoint),
75 },
76 })
77 serviceDefs = append(serviceDefs, &ServiceDef{
78 Name: "stringsvc",
79 Endpoints: map[string]loadbalancer.Factory{
80 "/api/stringsvc/uppercase": routeFactory(ctx, "uppercase"),
81 "/api/stringsvc/count": routeFactory(ctx, "count"),
82 },
83 })
84
85 // discover instances and register endpoints
86 r := mux.NewRouter()
87 for _, def := range serviceDefs {
88 for path, e := range def.Endpoints {
89 pub, err := consul.NewPublisher(discoveryClient, e, logger, def.Name)
90 if err != nil {
91 logger.Log("fatal", err)
92 }
93 r.HandleFunc(path, makeHandler(ctx, loadbalancer.NewRoundRobin(pub), logger))
94 }
95 }
96
97 // apigateway
98106 go func() {
107 logger.Log("transport", "http", "addr", *httpAddr)
99108 errc <- http.ListenAndServe(*httpAddr, r)
100109 }()
101
102 // wait for interrupt/error
103 logger.Log("fatal", <-errc)
104 }
105
106 type ServiceDef struct {
107 Name string
108 Endpoints map[string]loadbalancer.Factory
109 }
110
111 func interrupt() error {
112 c := make(chan os.Signal)
113 signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
114 return fmt.Errorf("%s", <-c)
115 }
116
117 func makeHandler(ctx context.Context, lb loadbalancer.LoadBalancer, logger log.Logger) http.HandlerFunc {
110 logger.Log("err", <-errc)
111 }
112
113 func makeHandler(ctx context.Context, e endpoint.Endpoint, logger log.Logger) http.HandlerFunc {
118114 return func(w http.ResponseWriter, r *http.Request) {
119 e, err := lb.Endpoint()
120 if err != nil {
121 logger.Log("error", err)
122 return
123 }
124115 resp, err := e(ctx, r.Body)
125116 if err != nil {
126 logger.Log("error", err)
117 logger.Log("err", err)
118 http.Error(w, err.Error(), http.StatusInternalServerError)
127119 return
128120 }
129121 b, ok := resp.([]byte)
130122 if !ok {
131 logger.Log("error", "endpoint response is not of type []byte")
123 logger.Log("err", "endpoint response is not of type []byte")
124 http.Error(w, err.Error(), http.StatusInternalServerError)
132125 return
133126 }
134127 _, err = w.Write(b)
135128 if err != nil {
136 logger.Log("error", err)
129 logger.Log("err", err)
137130 return
138131 }
139132 }
140133 }
141134
142 func factoryAddsvc(ctx context.Context, logger log.Logger, maker func(server.AddService) endpoint.Endpoint) loadbalancer.Factory {
135 func addsvcGRPCFactory(ctx context.Context, makeEndpoint func(server.AddService) endpoint.Endpoint, logger log.Logger) loadbalancer.Factory {
143136 return func(instance string) (endpoint.Endpoint, io.Closer, error) {
144137 var e endpoint.Endpoint
145138 conn, err := grpc.Dial(instance, grpc.WithInsecure())
147140 return e, nil, err
148141 }
149142 svc := addsvc.New(ctx, conn, logger)
150 return maker(svc), nil, nil
143 return makeEndpoint(svc), nil, nil
151144 }
152145 }
153146
175168 }
176169 }
177170
178 func routeFactory(ctx context.Context, method string) loadbalancer.Factory {
171 func httpFactory(ctx context.Context, method, path string) loadbalancer.Factory {
179172 return func(instance string) (endpoint.Endpoint, io.Closer, error) {
180173 var e endpoint.Endpoint
181174 if !strings.HasPrefix(instance, "http") {
185178 if err != nil {
186179 return nil, nil, err
187180 }
188 u.Path = method
189
190 e = httptransport.NewClient("GET", u, passEncode, passDecode).Endpoint()
181 u.Path = path
182
183 e = httptransport.NewClient(method, u, passEncode, passDecode).Endpoint()
191184 return e, nil, nil
192185 }
193186 }
200193 func passDecode(r *http.Response) (interface{}, error) {
201194 return ioutil.ReadAll(r.Body)
202195 }
196
197 func interrupt() error {
198 c := make(chan os.Signal)
199 signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
200 return fmt.Errorf("%s", <-c)
201 }