8 | 8 |
"os"
|
9 | 9 |
"os/signal"
|
10 | 10 |
"syscall"
|
|
11 |
"text/tabwriter"
|
11 | 12 |
|
12 | 13 |
"github.com/apache/thrift/lib/go/thrift"
|
13 | 14 |
"github.com/oklog/oklog/pkg/group"
|
|
28 | 29 |
)
|
29 | 30 |
|
30 | 31 |
func main() {
|
|
32 |
// Define our flags. Your service probably won't need to bind listeners for
|
|
33 |
// *all* supported transports, or support both Zipkin and LightStep, and so
|
|
34 |
// on, but we do it here for demonstration purposes.
|
|
35 |
fs := flag.NewFlagSet("addsvc", flag.ExitOnError)
|
31 | 36 |
var (
|
32 | |
debugAddr = flag.String("debug.addr", ":8080", "Debug and metrics listen address")
|
33 | |
httpAddr = flag.String("http-addr", ":8081", "HTTP listen address")
|
34 | |
grpcAddr = flag.String("grpc-addr", ":8082", "gRPC listen address")
|
35 | |
thriftAddr = flag.String("thrift-addr", ":8082", "Thrift listen address")
|
36 | |
thriftProtocol = flag.String("thrift.protocol", "binary", "binary, compact, json, simplejson")
|
37 | |
thriftBufferSize = flag.Int("thrift.buffer.size", 0, "0 for unbuffered")
|
38 | |
thriftFramed = flag.Bool("thrift.framed", false, "true to enable framing")
|
39 | |
zipkinURL = flag.String("zipkin-url", "", "Zipkin collector URL e.g. http://localhost:9411/api/v1/spans")
|
|
37 |
debugAddr = fs.String("debug.addr", ":8080", "Debug and metrics listen address")
|
|
38 |
httpAddr = fs.String("http-addr", ":8081", "HTTP listen address")
|
|
39 |
grpcAddr = fs.String("grpc-addr", ":8082", "gRPC listen address")
|
|
40 |
thriftAddr = fs.String("thrift-addr", ":8082", "Thrift listen address")
|
|
41 |
thriftProtocol = fs.String("thrift-protocol", "binary", "binary, compact, json, simplejson")
|
|
42 |
thriftBuffer = fs.Int("thrift-buffer", 0, "0 for unbuffered")
|
|
43 |
thriftFramed = fs.Bool("thrift-framed", false, "true to enable framing")
|
|
44 |
zipkinURL = fs.String("zipkin-url", "", "Zipkin collector URL e.g. http://localhost:9411/api/v1/spans")
|
40 | 45 |
)
|
41 | |
flag.Parse()
|
42 | |
|
|
46 |
fs.Usage = usageFor(fs, os.Args[0]+" [flags]")
|
|
47 |
fs.Parse(os.Args[1:])
|
|
48 |
|
|
49 |
// Create a single logger, which we'll use and give to other components.
|
43 | 50 |
var logger log.Logger
|
44 | 51 |
{
|
45 | 52 |
logger = log.NewLogfmtLogger(os.Stderr)
|
|
47 | 54 |
logger = log.With(logger, "caller", log.DefaultCaller)
|
48 | 55 |
}
|
49 | 56 |
|
|
57 |
// Determine which tracer to use. We'll pass the tracer to all the
|
|
58 |
// components that use it, as a dependency.
|
50 | 59 |
var tracer stdopentracing.Tracer
|
51 | 60 |
{
|
52 | 61 |
if *zipkinURL != "" {
|
|
74 | 83 |
}
|
75 | 84 |
}
|
76 | 85 |
|
77 | |
// Our metrics are dependencies, here we create them.
|
|
86 |
// Create the (sparse) metrics we'll use in the service. They, too, are
|
|
87 |
// dependencies that we pass to components that use them.
|
78 | 88 |
var ints, chars metrics.Counter
|
79 | 89 |
{
|
80 | 90 |
// Business-level metrics.
|
|
102 | 112 |
}, []string{"method", "success"})
|
103 | 113 |
}
|
104 | 114 |
|
|
115 |
// Build the layers of the service "onion" from the inside out. First, the
|
|
116 |
// business logic service; then, the set of endpoints that wrap the service;
|
|
117 |
// and finally, a series of concrete transport adapters. The adapters, like
|
|
118 |
// the HTTP handler or the gRPC server, are the bridge between Go kit and
|
|
119 |
// the interfaces that the transports expect. Note that we're not binding
|
|
120 |
// them to ports or anything yet; we'll do that next.
|
105 | 121 |
var (
|
106 | |
service = addservice.New(logger, ints, chars)
|
107 | |
endpoints = addendpoint.New(service, logger, duration, tracer)
|
108 | |
httpHandler = addtransport.NewHTTPHandler(context.Background(), endpoints, logger, tracer)
|
109 | |
grpcServer = addtransport.MakeGRPCServer(endpoints, tracer, logger)
|
110 | |
thriftHandler = addtransport.MakeThriftHandler(context.Background(), endpoints)
|
|
122 |
service = addservice.New(logger, ints, chars)
|
|
123 |
endpoints = addendpoint.New(service, logger, duration, tracer)
|
|
124 |
httpHandler = addtransport.NewHTTPHandler(context.Background(), endpoints, logger, tracer)
|
|
125 |
grpcServer = addtransport.NewGRPCServer(endpoints, tracer, logger)
|
|
126 |
thriftServer = addtransport.NewThriftServer(context.Background(), endpoints)
|
111 | 127 |
)
|
112 | 128 |
|
|
129 |
// Now we're to the part of the func main where we want to start actually
|
|
130 |
// running things, like servers bound to listeners to receive connections.
|
|
131 |
//
|
|
132 |
// The method is the same for each component: add a new actor to the group
|
|
133 |
// struct, which is a combination of 2 anonymous functions: the first
|
|
134 |
// function actually runs the component, and the second function should
|
|
135 |
// interrupt the first function and cause it to return.
|
|
136 |
|
113 | 137 |
var g group.Group
|
114 | 138 |
{
|
|
139 |
// The debug listener mounts the http.DefaultServeMux, and serves up
|
|
140 |
// stuff like the Prometheus metrics route, the Go debug and profiling
|
|
141 |
// routes, and so on.
|
115 | 142 |
debugListener, err := net.Listen("tcp", *debugAddr)
|
116 | 143 |
if err != nil {
|
117 | 144 |
logger.Log("transport", "debug/HTTP", "during", "Listen", "err", err)
|
|
125 | 152 |
})
|
126 | 153 |
}
|
127 | 154 |
{
|
|
155 |
// The HTTP listener mounts the Go kit HTTP handler we created.
|
128 | 156 |
httpListener, err := net.Listen("tcp", *httpAddr)
|
129 | 157 |
if err != nil {
|
130 | 158 |
logger.Log("transport", "HTTP", "during", "Listen", "err", err)
|
|
138 | 166 |
})
|
139 | 167 |
}
|
140 | 168 |
{
|
|
169 |
// The gRPC listener mounts the Go kit gRPC server we created.
|
141 | 170 |
grpcListener, err := net.Listen("tcp", *grpcAddr)
|
142 | 171 |
if err != nil {
|
143 | 172 |
logger.Log("transport", "gRPC", "during", "Listen", "err", err)
|
|
153 | 182 |
})
|
154 | 183 |
}
|
155 | 184 |
{
|
|
185 |
// The Thrift socket mounts the Go kit Thrift server we created earlier.
|
|
186 |
// There's a lot of boilerplate involved here, related to configuring
|
|
187 |
// the protocol and transport; blame Thrift.
|
156 | 188 |
thriftSocket, err := thrift.NewTServerSocket(*thriftAddr)
|
157 | 189 |
if err != nil {
|
158 | 190 |
logger.Log("transport", "Thrift", "during", "Listen", "err", err)
|
|
174 | 206 |
return fmt.Errorf("invalid Thrift protocol %q", *thriftProtocol)
|
175 | 207 |
}
|
176 | 208 |
var transportFactory thrift.TTransportFactory
|
177 | |
if *thriftBufferSize > 0 {
|
178 | |
transportFactory = thrift.NewTBufferedTransportFactory(*thriftBufferSize)
|
|
209 |
if *thriftBuffer > 0 {
|
|
210 |
transportFactory = thrift.NewTBufferedTransportFactory(*thriftBuffer)
|
179 | 211 |
} else {
|
180 | 212 |
transportFactory = thrift.NewTTransportFactory()
|
181 | 213 |
}
|
|
183 | 215 |
transportFactory = thrift.NewTFramedTransportFactory(transportFactory)
|
184 | 216 |
}
|
185 | 217 |
return thrift.NewTSimpleServer4(
|
186 | |
addthrift.NewAddServiceProcessor(thriftHandler),
|
|
218 |
addthrift.NewAddServiceProcessor(thriftServer),
|
187 | 219 |
thriftSocket,
|
188 | 220 |
transportFactory,
|
189 | 221 |
protocolFactory,
|
|
193 | 225 |
})
|
194 | 226 |
}
|
195 | 227 |
{
|
|
228 |
// This function just sits and waits for ctrl-C.
|
196 | 229 |
cancelInterrupt := make(chan struct{})
|
197 | 230 |
g.Add(func() error {
|
198 | 231 |
c := make(chan os.Signal, 1)
|
|
209 | 242 |
}
|
210 | 243 |
logger.Log("exit", g.Run())
|
211 | 244 |
}
|
|
245 |
|
|
246 |
func usageFor(fs *flag.FlagSet, short string) func() {
|
|
247 |
return func() {
|
|
248 |
fmt.Fprintf(os.Stderr, "USAGE\n")
|
|
249 |
fmt.Fprintf(os.Stderr, " %s\n", short)
|
|
250 |
fmt.Fprintf(os.Stderr, "\n")
|
|
251 |
fmt.Fprintf(os.Stderr, "FLAGS\n")
|
|
252 |
w := tabwriter.NewWriter(os.Stderr, 0, 2, 2, ' ', 0)
|
|
253 |
fs.VisitAll(func(f *flag.Flag) {
|
|
254 |
fmt.Fprintf(w, "\t-%s %s\t%s\n", f.Name, f.DefValue, f.Usage)
|
|
255 |
})
|
|
256 |
w.Flush()
|
|
257 |
fmt.Fprintf(os.Stderr, "\n")
|
|
258 |
}
|
|
259 |
}
|