Codebase list golang-github-go-kit-kit / 482d5d7
add basic apigateway example which discovers services on demand from consul and forwards rpc calls to discovered endpoints Marcel Hauf 7 years ago
2 changed file(s) with 157 addition(s) and 0 deletion(s). Raw diff Collapse all Expand all
0 package main
1
2 import (
3 "bytes"
4 "encoding/json"
5 "io"
6 "io/ioutil"
7 "log"
8 "net/http"
9 "net/url"
10 "strings"
11
12 "golang.org/x/net/context"
13
14 "github.com/gorilla/mux"
15
16 "github.com/go-kit/kit/endpoint"
17 "github.com/go-kit/kit/loadbalancer"
18 "github.com/go-kit/kit/loadbalancer/consul"
19 klog "github.com/go-kit/kit/log"
20 httptransport "github.com/go-kit/kit/transport/http"
21
22 "github.com/hashicorp/consul/api"
23 )
24
25 var (
26 discoveryClient consul.Client
27 ctx = context.Background()
28 )
29
30 func main() {
31
32 consulConfig := api.DefaultConfig()
33 consulClient, err := api.NewClient(consulConfig)
34 if err != nil {
35 log.Fatal(err)
36 }
37 discoveryClient = consul.NewClient(consulClient)
38
39 r := mux.NewRouter()
40 r.HandleFunc("/api/{service}/{method}", apiGateway)
41
42 http.ListenAndServe(":8000", r)
43 }
44
45 func apiGateway(w http.ResponseWriter, r *http.Request) {
46 vars := mux.Vars(r)
47 service := vars["service"]
48 method := vars["method"]
49 e, err := getEndpoint(service, method)
50 if err != nil {
51 log.Print(err)
52 return
53 }
54
55 var val interface{}
56 dec := json.NewDecoder(r.Body)
57 err = dec.Decode(&val)
58 if err != nil {
59 log.Print(err)
60 return
61 }
62
63 resp, err := e(ctx, val)
64 if err != nil {
65 log.Print(err)
66 return
67 }
68 enc := json.NewEncoder(w)
69 err = enc.Encode(resp)
70 if err != nil {
71 log.Print(err)
72 return
73 }
74 }
75
76 var services = make(map[string]service)
77
78 type service map[string]loadbalancer.LoadBalancer
79
80 func getEndpoint(se string, method string) (endpoint.Endpoint, error) {
81 if s, ok := services[se]; ok {
82 if m, ok := s[method]; ok {
83 return m.Endpoint()
84 }
85 }
86
87 publisher, err := consul.NewPublisher(discoveryClient, factory(ctx, method), klog.NewLogfmtLogger(&klog.StdlibWriter{}), se)
88 if err != nil {
89 return nil, err
90 }
91 rr := loadbalancer.NewRoundRobin(publisher)
92
93 if _, ok := services[se]; ok {
94 services[se][method] = rr
95 } else {
96 services[se] = service{method: rr}
97 }
98
99 return rr.Endpoint()
100 }
101
102 func factory(ctx context.Context, method string) loadbalancer.Factory {
103 return func(service string) (endpoint.Endpoint, io.Closer, error) {
104 var e endpoint.Endpoint
105 e = makeProxy(ctx, service, method)
106 return e, nil, nil
107 }
108 }
109
110 func makeProxy(ctx context.Context, service, method string) endpoint.Endpoint {
111 if !strings.HasPrefix(service, "http") {
112 service = "http://" + service
113 }
114 u, err := url.Parse(service)
115 if err != nil {
116 panic(err)
117 }
118 if u.Path == "" {
119 u.Path = "/" + method
120 }
121
122 return httptransport.NewClient(
123 "GET",
124 u,
125 encodeRequest,
126 decodeResponse,
127 ).Endpoint()
128 }
129
130 func encodeRequest(r *http.Request, request interface{}) error {
131 log.Printf("encode req: %v", request)
132 var buf bytes.Buffer
133 if err := json.NewEncoder(&buf).Encode(request); err != nil {
134 log.Print(err)
135 return err
136 }
137 r.Body = ioutil.NopCloser(&buf)
138 return nil
139 }
140
141 func decodeResponse(r *http.Response) (interface{}, error) {
142 var response interface{}
143 if err := json.NewDecoder(r.Body).Decode(&response); err != nil {
144 return nil, err
145 }
146 return response, nil
147 }
0 {
1 "service": {
2 "name": "stringsvc",
3 "tags": ["master"],
4 "address": "127.0.0.1",
5 "port": 8080,
6 "enableTagOverride": false
7 }
8 }