package main
import (
"flag"
"fmt"
"net"
"net/http"
"net/http/pprof"
"os"
"os/signal"
"strings"
"syscall"
"github.com/apache/thrift/lib/go/thrift"
lightstep "github.com/lightstep/lightstep-tracer-go"
stdopentracing "github.com/opentracing/opentracing-go"
zipkin "github.com/openzipkin/zipkin-go-opentracing"
stdprometheus "github.com/prometheus/client_golang/prometheus"
appdashot "github.com/sourcegraph/appdash/opentracing"
"golang.org/x/net/context"
"google.golang.org/grpc"
"sourcegraph.com/sourcegraph/appdash"
"github.com/go-kit/kit/endpoint"
"github.com/go-kit/kit/examples/addsvc"
"github.com/go-kit/kit/examples/addsvc/pb"
thriftadd "github.com/go-kit/kit/examples/addsvc/thrift/gen-go/addsvc"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/metrics"
"github.com/go-kit/kit/metrics/prometheus"
"github.com/go-kit/kit/tracing/opentracing"
)
func main() {
var (
debugAddr = flag.String("debug.addr", ":8080", "Debug and metrics listen address")
httpAddr = flag.String("http.addr", ":8081", "HTTP listen address")
grpcAddr = flag.String("grpc.addr", ":8082", "gRPC (HTTP) listen address")
thriftAddr = flag.String("thrift.addr", ":8083", "Thrift listen address")
thriftProtocol = flag.String("thrift.protocol", "binary", "binary, compact, json, simplejson")
thriftBufferSize = flag.Int("thrift.buffer.size", 0, "0 for unbuffered")
thriftFramed = flag.Bool("thrift.framed", false, "true to enable framing")
zipkinAddr = flag.String("zipkin.addr", "", "Enable Zipkin tracing via a Kafka server host:port")
appdashAddr = flag.String("appdash.addr", "", "Enable Appdash tracing via an Appdash server host:port")
lightstepToken = flag.String("lightstep.token", "", "Enable LightStep tracing via a LightStep access token")
)
flag.Parse()
// Logging domain.
var logger log.Logger
{
logger = log.NewLogfmtLogger(os.Stdout)
logger = log.NewContext(logger).With("ts", log.DefaultTimestampUTC)
logger = log.NewContext(logger).With("caller", log.DefaultCaller)
}
logger.Log("msg", "hello")
defer logger.Log("msg", "goodbye")
// Metrics domain.
var ints, chars metrics.Counter
{
// Business level metrics.
ints = prometheus.NewCounterFrom(stdprometheus.CounterOpts{
Namespace: "addsvc",
Name: "integers_summed",
Help: "Total count of integers summed via the Sum method.",
}, []string{})
chars = prometheus.NewCounterFrom(stdprometheus.CounterOpts{
Namespace: "addsvc",
Name: "characters_concatenated",
Help: "Total count of characters concatenated via the Concat method.",
}, []string{})
}
var duration metrics.Histogram
{
// Transport level metrics.
duration = prometheus.NewSummaryFrom(stdprometheus.SummaryOpts{
Namespace: "addsvc",
Name: "request_duration_ns",
Help: "Request duration in nanoseconds.",
}, []string{"method", "success"})
}
// Tracing domain.
var tracer stdopentracing.Tracer
{
if *zipkinAddr != "" {
logger := log.NewContext(logger).With("tracer", "Zipkin")
logger.Log("addr", *zipkinAddr)
collector, err := zipkin.NewKafkaCollector(
strings.Split(*zipkinAddr, ","),
zipkin.KafkaLogger(logger),
)
if err != nil {
logger.Log("err", err)
os.Exit(1)
}
tracer, err = zipkin.NewTracer(
zipkin.NewRecorder(collector, false, "localhost:80", "addsvc"),
)
if err != nil {
logger.Log("err", err)
os.Exit(1)
}
} else if *appdashAddr != "" {
logger := log.NewContext(logger).With("tracer", "Appdash")
logger.Log("addr", *appdashAddr)
tracer = appdashot.NewTracer(appdash.NewRemoteCollector(*appdashAddr))
} else if *lightstepToken != "" {
logger := log.NewContext(logger).With("tracer", "LightStep")
logger.Log() // probably don't want to print out the token :)
tracer = lightstep.NewTracer(lightstep.Options{
AccessToken: *lightstepToken,
})
defer lightstep.FlushLightStepTracer(tracer)
} else {
logger := log.NewContext(logger).With("tracer", "none")
logger.Log()
tracer = stdopentracing.GlobalTracer() // no-op
}
}
// Business domain.
var service addsvc.Service
{
service = addsvc.NewBasicService()
service = addsvc.ServiceLoggingMiddleware(logger)(service)
service = addsvc.ServiceInstrumentingMiddleware(ints, chars)(service)
}
// Endpoint domain.
var sumEndpoint endpoint.Endpoint
{
sumDuration := duration.With("method", "Sum")
sumLogger := log.NewContext(logger).With("method", "Sum")
sumEndpoint = addsvc.MakeSumEndpoint(service)
sumEndpoint = opentracing.TraceServer(tracer, "Sum")(sumEndpoint)
sumEndpoint = addsvc.EndpointInstrumentingMiddleware(sumDuration)(sumEndpoint)
sumEndpoint = addsvc.EndpointLoggingMiddleware(sumLogger)(sumEndpoint)
}
var concatEndpoint endpoint.Endpoint
{
concatDuration := duration.With("method", "Concat")
concatLogger := log.NewContext(logger).With("method", "Concat")
concatEndpoint = addsvc.MakeConcatEndpoint(service)
concatEndpoint = opentracing.TraceServer(tracer, "Concat")(concatEndpoint)
concatEndpoint = addsvc.EndpointInstrumentingMiddleware(concatDuration)(concatEndpoint)
concatEndpoint = addsvc.EndpointLoggingMiddleware(concatLogger)(concatEndpoint)
}
endpoints := addsvc.Endpoints{
SumEndpoint: sumEndpoint,
ConcatEndpoint: concatEndpoint,
}
// Mechanical domain.
errc := make(chan error)
ctx := context.Background()
// Interrupt handler.
go func() {
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
errc <- fmt.Errorf("%s", <-c)
}()
// Debug listener.
go func() {
logger := log.NewContext(logger).With("transport", "debug")
m := http.NewServeMux()
m.Handle("/debug/pprof/", http.HandlerFunc(pprof.Index))
m.Handle("/debug/pprof/cmdline", http.HandlerFunc(pprof.Cmdline))
m.Handle("/debug/pprof/profile", http.HandlerFunc(pprof.Profile))
m.Handle("/debug/pprof/symbol", http.HandlerFunc(pprof.Symbol))
m.Handle("/debug/pprof/trace", http.HandlerFunc(pprof.Trace))
m.Handle("/metrics", stdprometheus.Handler())
logger.Log("addr", *debugAddr)
errc <- http.ListenAndServe(*debugAddr, m)
}()
// HTTP transport.
go func() {
logger := log.NewContext(logger).With("transport", "HTTP")
h := addsvc.MakeHTTPHandler(ctx, endpoints, tracer, logger)
logger.Log("addr", *httpAddr)
errc <- http.ListenAndServe(*httpAddr, h)
}()
// gRPC transport.
go func() {
logger := log.NewContext(logger).With("transport", "gRPC")
ln, err := net.Listen("tcp", *grpcAddr)
if err != nil {
errc <- err
return
}
srv := addsvc.MakeGRPCServer(ctx, endpoints, tracer, logger)
s := grpc.NewServer()
pb.RegisterAddServer(s, srv)
logger.Log("addr", *grpcAddr)
errc <- s.Serve(ln)
}()
// Thrift transport.
go func() {
logger := log.NewContext(logger).With("transport", "Thrift")
var protocolFactory thrift.TProtocolFactory
switch *thriftProtocol {
case "binary":
protocolFactory = thrift.NewTBinaryProtocolFactoryDefault()
case "compact":
protocolFactory = thrift.NewTCompactProtocolFactory()
case "json":
protocolFactory = thrift.NewTJSONProtocolFactory()
case "simplejson":
protocolFactory = thrift.NewTSimpleJSONProtocolFactory()
default:
errc <- fmt.Errorf("invalid Thrift protocol %q", *thriftProtocol)
return
}
var transportFactory thrift.TTransportFactory
if *thriftBufferSize > 0 {
transportFactory = thrift.NewTBufferedTransportFactory(*thriftBufferSize)
} else {
transportFactory = thrift.NewTTransportFactory()
}
if *thriftFramed {
transportFactory = thrift.NewTFramedTransportFactory(transportFactory)
}
transport, err := thrift.NewTServerSocket(*thriftAddr)
if err != nil {
errc <- err
return
}
logger.Log("addr", *thriftAddr)
errc <- thrift.NewTSimpleServer4(
thriftadd.NewAddServiceProcessor(addsvc.MakeThriftHandler(ctx, endpoints)),
transport,
transportFactory,
protocolFactory,
).Serve()
}()
// Run!
logger.Log("exit", <-errc)
}