Codebase list golang-github-go-kit-kit / dff50435-7a33-4f0c-bbdc-f455eb10d80a/v0.5.0 examples / addsvc / cmd / addsvc / main.go
dff50435-7a33-4f0c-bbdc-f455eb10d80a/v0.5.0

Tree @dff50435-7a33-4f0c-bbdc-f455eb10d80a/v0.5.0 (Download .tar.gz)

main.go @dff50435-7a33-4f0c-bbdc-f455eb10d80a/v0.5.0

055e4ba
51ccc66
 
4a29204
51ccc66
 
 
 
 
 
 
 
 
 
 
 
 
 
 
593cc9a
51ccc66
43624e9
0fde115
51ccc66
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
67928b5
 
43624e9
 
51ccc66
 
 
2c5ef80
51ccc66
 
 
055e4ba
 
51ccc66
 
 
 
2c5ef80
51ccc66
 
2c5ef80
b149ec8
51ccc66
 
 
 
b149ec8
51ccc66
 
 
 
 
b149ec8
51ccc66
2c5ef80
b149ec8
51ccc66
 
 
b149ec8
51ccc66
 
2c5ef80
51ccc66
 
 
055e4ba
51ccc66
67928b5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
055e4ba
67928b5
 
51ccc66
67928b5
 
51ccc66
 
 
 
 
67928b5
 
51ccc66
 
 
 
 
 
 
43624e9
055e4ba
43624e9
 
51ccc66
055e4ba
51ccc66
 
 
 
 
 
055e4ba
51ccc66
 
 
 
 
2c5ef80
51ccc66
 
 
 
 
 
 
2c5ef80
51ccc66
 
b149ec8
055e4ba
51ccc66
 
 
 
 
 
 
 
b149ec8
055e4ba
51ccc66
 
 
 
 
 
 
 
 
 
 
2c5ef80
51ccc66
 
 
2c5ef80
51ccc66
 
 
 
 
 
2c5ef80
51ccc66
055e4ba
51ccc66
 
 
 
 
 
 
593cc9a
51ccc66
 
 
 
 
2c5ef80
51ccc66
055e4ba
5b4da74
51ccc66
 
 
 
2c5ef80
51ccc66
055e4ba
51ccc66
 
 
 
 
 
 
2677e49
51ccc66
 
 
 
 
 
 
2c5ef80
51ccc66
055e4ba
51ccc66
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2c5ef80
51ccc66
 
package main

import (
	"context"
	"flag"
	"fmt"
	"net"
	"net/http"
	"net/http/pprof"
	"os"
	"os/signal"
	"strings"
	"syscall"

	"github.com/apache/thrift/lib/go/thrift"
	lightstep "github.com/lightstep/lightstep-tracer-go"
	stdopentracing "github.com/opentracing/opentracing-go"
	zipkin "github.com/openzipkin/zipkin-go-opentracing"
	stdprometheus "github.com/prometheus/client_golang/prometheus"
	"github.com/prometheus/client_golang/prometheus/promhttp"
	"google.golang.org/grpc"
	"sourcegraph.com/sourcegraph/appdash"
	appdashot "sourcegraph.com/sourcegraph/appdash/opentracing"

	"github.com/go-kit/kit/endpoint"
	"github.com/go-kit/kit/examples/addsvc"
	"github.com/go-kit/kit/examples/addsvc/pb"
	thriftadd "github.com/go-kit/kit/examples/addsvc/thrift/gen-go/addsvc"
	"github.com/go-kit/kit/log"
	"github.com/go-kit/kit/metrics"
	"github.com/go-kit/kit/metrics/prometheus"
	"github.com/go-kit/kit/tracing/opentracing"
)

func main() {
	var (
		debugAddr        = flag.String("debug.addr", ":8080", "Debug and metrics listen address")
		httpAddr         = flag.String("http.addr", ":8081", "HTTP listen address")
		grpcAddr         = flag.String("grpc.addr", ":8082", "gRPC (HTTP) listen address")
		thriftAddr       = flag.String("thrift.addr", ":8083", "Thrift listen address")
		thriftProtocol   = flag.String("thrift.protocol", "binary", "binary, compact, json, simplejson")
		thriftBufferSize = flag.Int("thrift.buffer.size", 0, "0 for unbuffered")
		thriftFramed     = flag.Bool("thrift.framed", false, "true to enable framing")
		zipkinAddr       = flag.String("zipkin.addr", "", "Enable Zipkin tracing via a Zipkin HTTP Collector endpoint")
		zipkinKafkaAddr  = flag.String("zipkin.kafka.addr", "", "Enable Zipkin tracing via a Kafka server host:port")
		appdashAddr      = flag.String("appdash.addr", "", "Enable Appdash tracing via an Appdash server host:port")
		lightstepToken   = flag.String("lightstep.token", "", "Enable LightStep tracing via a LightStep access token")
	)
	flag.Parse()

	// Logging domain.
	var logger log.Logger
	{
		logger = log.NewLogfmtLogger(os.Stdout)
		logger = log.With(logger, "ts", log.DefaultTimestampUTC)
		logger = log.With(logger, "caller", log.DefaultCaller)
	}
	logger.Log("msg", "hello")
	defer logger.Log("msg", "goodbye")

	// Metrics domain.
	var ints, chars metrics.Counter
	{
		// Business level metrics.
		ints = prometheus.NewCounterFrom(stdprometheus.CounterOpts{
			Namespace: "addsvc",
			Name:      "integers_summed",
			Help:      "Total count of integers summed via the Sum method.",
		}, []string{})
		chars = prometheus.NewCounterFrom(stdprometheus.CounterOpts{
			Namespace: "addsvc",
			Name:      "characters_concatenated",
			Help:      "Total count of characters concatenated via the Concat method.",
		}, []string{})
	}
	var duration metrics.Histogram
	{
		// Transport level metrics.
		duration = prometheus.NewSummaryFrom(stdprometheus.SummaryOpts{
			Namespace: "addsvc",
			Name:      "request_duration_ns",
			Help:      "Request duration in nanoseconds.",
		}, []string{"method", "success"})
	}

	// Tracing domain.
	var tracer stdopentracing.Tracer
	{
		if *zipkinAddr != "" {
			logger := log.With(logger, "tracer", "ZipkinHTTP")
			logger.Log("addr", *zipkinAddr)

			// endpoint typically looks like: http://zipkinhost:9411/api/v1/spans
			collector, err := zipkin.NewHTTPCollector(*zipkinAddr)
			if err != nil {
				logger.Log("err", err)
				os.Exit(1)
			}
			defer collector.Close()

			tracer, err = zipkin.NewTracer(
				zipkin.NewRecorder(collector, false, "localhost:80", "addsvc"),
			)
			if err != nil {
				logger.Log("err", err)
				os.Exit(1)
			}
		} else if *zipkinKafkaAddr != "" {
			logger := log.With(logger, "tracer", "ZipkinKafka")
			logger.Log("addr", *zipkinKafkaAddr)

			collector, err := zipkin.NewKafkaCollector(
				strings.Split(*zipkinKafkaAddr, ","),
				zipkin.KafkaLogger(log.NewNopLogger()),
			)
			if err != nil {
				logger.Log("err", err)
				os.Exit(1)
			}
			defer collector.Close()

			tracer, err = zipkin.NewTracer(
				zipkin.NewRecorder(collector, false, "localhost:80", "addsvc"),
			)
			if err != nil {
				logger.Log("err", err)
				os.Exit(1)
			}
		} else if *appdashAddr != "" {
			logger := log.With(logger, "tracer", "Appdash")
			logger.Log("addr", *appdashAddr)
			tracer = appdashot.NewTracer(appdash.NewRemoteCollector(*appdashAddr))
		} else if *lightstepToken != "" {
			logger := log.With(logger, "tracer", "LightStep")
			logger.Log() // probably don't want to print out the token :)
			tracer = lightstep.NewTracer(lightstep.Options{
				AccessToken: *lightstepToken,
			})
			defer lightstep.FlushLightStepTracer(tracer)
		} else {
			logger := log.With(logger, "tracer", "none")
			logger.Log()
			tracer = stdopentracing.GlobalTracer() // no-op
		}
	}

	// Business domain.
	var service addsvc.Service
	{
		service = addsvc.NewBasicService()
		service = addsvc.ServiceLoggingMiddleware(logger)(service)
		service = addsvc.ServiceInstrumentingMiddleware(ints, chars)(service)
	}

	// Endpoint domain.
	var sumEndpoint endpoint.Endpoint
	{
		sumDuration := duration.With("method", "Sum")
		sumLogger := log.With(logger, "method", "Sum")

		sumEndpoint = addsvc.MakeSumEndpoint(service)
		sumEndpoint = opentracing.TraceServer(tracer, "Sum")(sumEndpoint)
		sumEndpoint = addsvc.EndpointInstrumentingMiddleware(sumDuration)(sumEndpoint)
		sumEndpoint = addsvc.EndpointLoggingMiddleware(sumLogger)(sumEndpoint)
	}
	var concatEndpoint endpoint.Endpoint
	{
		concatDuration := duration.With("method", "Concat")
		concatLogger := log.With(logger, "method", "Concat")

		concatEndpoint = addsvc.MakeConcatEndpoint(service)
		concatEndpoint = opentracing.TraceServer(tracer, "Concat")(concatEndpoint)
		concatEndpoint = addsvc.EndpointInstrumentingMiddleware(concatDuration)(concatEndpoint)
		concatEndpoint = addsvc.EndpointLoggingMiddleware(concatLogger)(concatEndpoint)
	}
	endpoints := addsvc.Endpoints{
		SumEndpoint:    sumEndpoint,
		ConcatEndpoint: concatEndpoint,
	}

	// Mechanical domain.
	errc := make(chan error)
	ctx := context.Background()

	// Interrupt handler.
	go func() {
		c := make(chan os.Signal, 1)
		signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
		errc <- fmt.Errorf("%s", <-c)
	}()

	// Debug listener.
	go func() {
		logger := log.With(logger, "transport", "debug")

		m := http.NewServeMux()
		m.Handle("/debug/pprof/", http.HandlerFunc(pprof.Index))
		m.Handle("/debug/pprof/cmdline", http.HandlerFunc(pprof.Cmdline))
		m.Handle("/debug/pprof/profile", http.HandlerFunc(pprof.Profile))
		m.Handle("/debug/pprof/symbol", http.HandlerFunc(pprof.Symbol))
		m.Handle("/debug/pprof/trace", http.HandlerFunc(pprof.Trace))
		m.Handle("/metrics", promhttp.Handler())

		logger.Log("addr", *debugAddr)
		errc <- http.ListenAndServe(*debugAddr, m)
	}()

	// HTTP transport.
	go func() {
		logger := log.With(logger, "transport", "HTTP")
		h := addsvc.MakeHTTPHandler(endpoints, tracer, logger)
		logger.Log("addr", *httpAddr)
		errc <- http.ListenAndServe(*httpAddr, h)
	}()

	// gRPC transport.
	go func() {
		logger := log.With(logger, "transport", "gRPC")

		ln, err := net.Listen("tcp", *grpcAddr)
		if err != nil {
			errc <- err
			return
		}

		srv := addsvc.MakeGRPCServer(endpoints, tracer, logger)
		s := grpc.NewServer()
		pb.RegisterAddServer(s, srv)

		logger.Log("addr", *grpcAddr)
		errc <- s.Serve(ln)
	}()

	// Thrift transport.
	go func() {
		logger := log.With(logger, "transport", "Thrift")

		var protocolFactory thrift.TProtocolFactory
		switch *thriftProtocol {
		case "binary":
			protocolFactory = thrift.NewTBinaryProtocolFactoryDefault()
		case "compact":
			protocolFactory = thrift.NewTCompactProtocolFactory()
		case "json":
			protocolFactory = thrift.NewTJSONProtocolFactory()
		case "simplejson":
			protocolFactory = thrift.NewTSimpleJSONProtocolFactory()
		default:
			errc <- fmt.Errorf("invalid Thrift protocol %q", *thriftProtocol)
			return
		}

		var transportFactory thrift.TTransportFactory
		if *thriftBufferSize > 0 {
			transportFactory = thrift.NewTBufferedTransportFactory(*thriftBufferSize)
		} else {
			transportFactory = thrift.NewTTransportFactory()
		}
		if *thriftFramed {
			transportFactory = thrift.NewTFramedTransportFactory(transportFactory)
		}

		transport, err := thrift.NewTServerSocket(*thriftAddr)
		if err != nil {
			errc <- err
			return
		}

		logger.Log("addr", *thriftAddr)
		errc <- thrift.NewTSimpleServer4(
			thriftadd.NewAddServiceProcessor(addsvc.MakeThriftHandler(ctx, endpoints)),
			transport,
			transportFactory,
			protocolFactory,
		).Serve()
	}()

	// Run!
	logger.Log("exit", <-errc)
}