Codebase list golang-github-go-kit-kit / 7cae3fd
Merge pull request #79 from go-kit/zipkin-enhancements Zipkin enhancements Peter Bourgon 8 years ago
9 changed file(s) with 148 addition(s) and 39 deletion(s). Raw diff Collapse all Expand all
9595 if zipkinCollector, err = zipkin.NewScribeCollector(
9696 *zipkinCollectorAddr,
9797 *zipkinCollectorTimeout,
98 *zipkinCollectorBatchSize,
99 *zipkinCollectorBatchInterval,
98 zipkin.ScribeBatchSize(*zipkinCollectorBatchSize),
99 zipkin.ScribeBatchInterval(*zipkinCollectorBatchInterval),
100 zipkin.ScribeLogger(logger),
100101 ); err != nil {
101102 logger.Log("err", err)
102103 os.Exit(1)
104105 }
105106 zipkinMethodName := "add"
106107 zipkinSpanFunc := zipkin.MakeNewSpanFunc(zipkinHostPort, *zipkinServiceName, zipkinMethodName)
107 zipkin.Log.Swap(logger) // log diagnostic/error details
108108
109109 // Our business and operational domain
110110 var a Add = pureAdd
141141 go func() {
142142 ctx, cancel := context.WithCancel(root)
143143 defer cancel()
144 before := []httptransport.BeforeFunc{zipkin.ToContext(zipkinSpanFunc)}
144 before := []httptransport.BeforeFunc{zipkin.ToContext(zipkinSpanFunc, logger)}
145145 after := []httptransport.AfterFunc{}
146146 handler := makeHTTPBinding(ctx, e, before, after)
147147 logger.Log("addr", *httpAddr, "transport", "HTTP/JSON")
0 package log
1
2 type nopLogger struct{}
3
4 func (nopLogger) Log(...interface{}) error { return nil }
5
6 func NewNopLogger() Logger { return nopLogger{} }
0 package log_test
1
2 import (
3 "testing"
4
5 "github.com/go-kit/kit/log"
6 )
7
8 func TestNopLogger(t *testing.T) {
9 logger := log.NewNopLogger()
10 if err := logger.Log("abc", 123); err != nil {
11 t.Error(err)
12 }
13 if err := log.NewContext(logger).With("def", "ghi").Log(); err != nil {
14 t.Error(err)
15 }
16 }
33 "encoding/base64"
44 "errors"
55 "fmt"
6 "math"
7 "math/rand"
68 "net"
79 "strings"
810 "time"
911
1012 "github.com/apache/thrift/lib/go/thrift"
1113
14 "github.com/go-kit/kit/log"
1215 "github.com/go-kit/kit/tracing/zipkin/_thrift/gen-go/scribe"
1316 )
1417
2528 factory func() (scribe.Scribe, error)
2629 spanc chan *Span
2730 sendc chan struct{}
28 quitc chan chan struct{}
2931 batch []*scribe.LogEntry
3032 nextSend time.Time
3133 batchInterval time.Duration
3234 batchSize int
33 }
34
35 // NewScribeCollector returns a new Scribe-backed Collector, ready for use.
36 func NewScribeCollector(addr string, timeout time.Duration, batchSize int, batchInterval time.Duration) (Collector, error) {
35 sampleRate float64
36 sampleSalt int64
37 logger log.Logger
38 }
39
40 // NewScribeCollector returns a new Scribe-backed Collector. addr should be a
41 // TCP endpoint of the form "host:port". timeout is passed to the Thrift dial
42 // function NewTSocketFromAddrTimeout. batchSize and batchInterval control the
43 // maximum size and interval of a batch of spans; as soon as either limit is
44 // reached, the batch is sent. The logger is used to log errors, such as batch
45 // send failures; users should provide an appropriate context, if desired.
46 func NewScribeCollector(addr string, timeout time.Duration, options ...ScribeOption) (Collector, error) {
3747 factory := scribeClientFactory(addr, timeout)
3848 client, err := factory()
3949 if err != nil {
4050 return nil, err
4151 }
52 defaultBatchInterval := time.Second
4253 c := &ScribeCollector{
4354 client: client,
4455 factory: factory,
4556 spanc: make(chan *Span),
4657 sendc: make(chan struct{}),
4758 batch: []*scribe.LogEntry{},
48 nextSend: time.Now().Add(batchInterval),
49 batchInterval: batchInterval,
50 batchSize: batchSize,
59 nextSend: time.Now().Add(defaultBatchInterval),
60 batchInterval: defaultBatchInterval,
61 batchSize: 100,
62 sampleRate: 1.0,
63 sampleSalt: rand.Int63(),
64 logger: log.NewNopLogger(),
65 }
66 for _, option := range options {
67 option(c)
5168 }
5269 go c.loop()
5370 return c, nil
6582 for {
6683 select {
6784 case span := <-c.spanc:
85 if !shouldSample(span.traceID, c.sampleSalt, c.sampleRate) {
86 continue
87 }
6888 c.batch = append(c.batch, &scribe.LogEntry{
6989 Category: "zipkin", // TODO parameterize?
7090 Message: serialize(span),
81101 case <-c.sendc:
82102 c.nextSend = time.Now().Add(c.batchInterval)
83103 if err := c.send(c.batch); err != nil {
84 Log.Log("err", err.Error())
85 continue
104 c.logger.Log("err", err.Error())
86105 }
87106 c.batch = c.batch[:0]
88107 }
110129 return nil
111130 }
112131
132 // ScribeOption sets a parameter for the StdlibAdapter.
133 type ScribeOption func(s *ScribeCollector)
134
135 // ScribeBatchSize sets the maximum batch size, after which a collect will be
136 // triggered. The default batch size is 100 traces.
137 func ScribeBatchSize(n int) ScribeOption {
138 return func(s *ScribeCollector) { s.batchSize = n }
139 }
140
141 // ScribeBatchInterval sets the maximum duration we will buffer traces before
142 // emitting them to the collector. The default batch interval is 1 second.
143 func ScribeBatchInterval(d time.Duration) ScribeOption {
144 return func(s *ScribeCollector) { s.batchInterval = d }
145 }
146
147 // ScribeSampleRate sets the sample rate used to determine if a trace will be
148 // sent to the collector. By default, the sample rate is 1.0, i.e. all traces
149 // are sent.
150 func ScribeSampleRate(f float64) ScribeOption {
151 return func(s *ScribeCollector) { s.sampleRate = f }
152 }
153
154 // ScribeLogger sets the logger used to report errors in the collection
155 // process. By default, a no-op logger is used, i.e. no errors are logged
156 // anywhere. It's important to set this option in a production service.
157 func ScribeLogger(logger log.Logger) ScribeOption {
158 return func(s *ScribeCollector) { s.logger = logger }
159 }
160
113161 func scribeClientFactory(addr string, timeout time.Duration) func() (scribe.Scribe, error) {
114162 return func() (scribe.Scribe, error) {
115163 a, err := net.ResolveTCPAddr("tcp", addr)
135183 panic(err)
136184 }
137185 return base64.StdEncoding.EncodeToString(t.Buffer.Bytes())
186 }
187
188 func shouldSample(id int64, salt int64, rate float64) bool {
189 if rate <= 0 {
190 return false
191 }
192 if rate >= 1.0 {
193 return true
194 }
195 return int64(math.Abs(float64(id^salt)))%10000 < int64(rate*10000)
138196 }
139197
140198 // NopCollector implements Collector but performs no work.
0 package zipkin
1
2 import "testing"
3
4 func TestShouldSample(t *testing.T) {
5 type triple struct {
6 id, salt int64
7 rate float64
8 }
9 for input, want := range map[triple]bool{
10 triple{123, 456, 1.0}: true,
11 triple{123, 456, 999}: true,
12 triple{123, 456, 0.0}: false,
13 triple{123, 456, -42}: false,
14 triple{1229998, 0, 0.01}: false,
15 triple{1229999, 0, 0.01}: false,
16 triple{1230000, 0, 0.01}: true,
17 triple{1230001, 0, 0.01}: true,
18 triple{1230098, 0, 0.01}: true,
19 triple{1230099, 0, 0.01}: true,
20 triple{1230100, 0, 0.01}: false,
21 triple{1230101, 0, 0.01}: false,
22 triple{1, 9999999, 0.01}: false,
23 triple{999, 0, 0.99}: true,
24 triple{9999, 0, 0.99}: false,
25 } {
26 if have := shouldSample(input.id, input.salt, input.rate); want != have {
27 t.Errorf("%#+v: want %v, have %v", input, want, have)
28 }
29 }
30 }
2020
2121 timeout := time.Second
2222 batchInterval := time.Millisecond
23 c, err := zipkin.NewScribeCollector(server.addr(), timeout, 0, batchInterval)
23 c, err := zipkin.NewScribeCollector(server.addr(), timeout, zipkin.ScribeBatchSize(0), zipkin.ScribeBatchInterval(batchInterval))
2424 if err != nil {
2525 t.Fatal(err)
2626 }
2929 //binaryAnnotations []BinaryAnnotation // TODO
3030 }
3131
32 // NewSpan returns a new Span object ready for use.
32 // NewSpan returns a new Span, which can be annotated and collected by a
33 // collector. Spans are passed through the request context to each middleware
34 // under the SpanContextKey.
3335 func NewSpan(hostport, serviceName, methodName string, traceID, spanID, parentSpanID int64) *Span {
3436 return &Span{
3537 host: makeEndpoint(hostport, serviceName),
4042 }
4143 }
4244
43 // makeEndpoint will return a nil Endpoint if the input parameters are
44 // malformed.
45 // makeEndpoint takes the hostport and service name that represent this Zipkin
46 // service, and returns an endpoint that's embedded into the Zipkin core Span
47 // type. It will return a nil endpoint if the input parameters are malformed.
4548 func makeEndpoint(hostport, serviceName string) *zipkincore.Endpoint {
4649 host, port, err := net.SplitHostPort(hostport)
4750 if err != nil {
48 Log.Log("hostport", hostport, "err", err)
4951 return nil
5052 }
5153 addrs, err := net.LookupIP(host)
5254 if err != nil {
53 Log.Log("host", host, "err", err)
5455 return nil
5556 }
5657 if len(addrs) <= 0 {
57 Log.Log("host", host, "err", "no IPs")
5858 return nil
5959 }
6060 portInt, err := strconv.ParseInt(port, 10, 16)
6161 if err != nil {
62 Log.Log("port", port, "err", err)
6362 return nil
6463 }
6564 endpoint := zipkincore.NewEndpoint()
2020 // • http://www.slideshare.net/johanoskarsson/zipkin-runtime-open-house
2121 // • https://groups.google.com/forum/#!topic/zipkin-user/KilwtSA0g1k
2222 // • https://gist.github.com/yoavaa/3478d3a0df666f21a98c
23
24 // Log is used to report diagnostic information. To enable it, swap in your
25 // application's logger.
26 var Log log.SwapLogger
2723
2824 const (
2925 // https://github.com/racker/tryfer#headers
9490 // ToContext returns a function that satisfies transport/http.BeforeFunc. It
9591 // takes a Zipkin span from the incoming HTTP request, and saves it in the
9692 // request context. It's designed to be wired into a server's HTTP transport
97 // Before stack.
98 func ToContext(newSpan NewSpanFunc) func(ctx context.Context, r *http.Request) context.Context {
93 // Before stack. The logger is used to report errors.
94 func ToContext(newSpan NewSpanFunc, logger log.Logger) func(ctx context.Context, r *http.Request) context.Context {
9995 return func(ctx context.Context, r *http.Request) context.Context {
100 return context.WithValue(ctx, SpanContextKey, fromHTTP(newSpan, r))
96 return context.WithValue(ctx, SpanContextKey, fromHTTP(newSpan, r, logger))
10197 }
10298 }
10399
125121 }
126122 }
127123
128 func fromHTTP(newSpan NewSpanFunc, r *http.Request) *Span {
124 func fromHTTP(newSpan NewSpanFunc, r *http.Request, logger log.Logger) *Span {
129125 traceIDStr := r.Header.Get(traceIDHTTPHeader)
130126 if traceIDStr == "" {
131 Log.Log("debug", "make new span")
132127 return newSpan(newID(), newID(), 0) // normal; just make a new one
133128 }
134129 traceID, err := strconv.ParseInt(traceIDStr, 16, 64)
135130 if err != nil {
136 Log.Log(traceIDHTTPHeader, traceIDStr, "err", err)
131 logger.Log(traceIDHTTPHeader, traceIDStr, "err", err)
137132 return newSpan(newID(), newID(), 0)
138133 }
139134 spanIDStr := r.Header.Get(spanIDHTTPHeader)
140135 if spanIDStr == "" {
141 Log.Log("msg", "trace ID without span ID") // abnormal
142 spanIDStr = strconv.FormatInt(newID(), 64) // deal with it
136 logger.Log("msg", "trace ID without span ID") // abnormal
137 spanIDStr = strconv.FormatInt(newID(), 64) // deal with it
143138 }
144139 spanID, err := strconv.ParseInt(spanIDStr, 16, 64)
145140 if err != nil {
146 Log.Log(spanIDHTTPHeader, spanIDStr, "err", err) // abnormal
147 spanID = newID() // deal with it
141 logger.Log(spanIDHTTPHeader, spanIDStr, "err", err) // abnormal
142 spanID = newID() // deal with it
148143 }
149144 parentSpanIDStr := r.Header.Get(parentSpanIDHTTPHeader)
150145 if parentSpanIDStr == "" {
152147 }
153148 parentSpanID, err := strconv.ParseInt(parentSpanIDStr, 16, 64)
154149 if err != nil {
155 Log.Log(parentSpanIDHTTPHeader, parentSpanIDStr, "err", err) // abnormal
156 parentSpanID = 0 // the only way to deal with it
150 logger.Log(parentSpanIDHTTPHeader, parentSpanIDStr, "err", err) // abnormal
151 parentSpanID = 0 // the only way to deal with it
157152 }
158153 return newSpan(traceID, spanID, parentSpanID)
159154 }
11
22 import (
33 "fmt"
4 "io/ioutil"
45 "net/http"
56 "reflect"
67 "runtime"
1112 "golang.org/x/net/context"
1213
1314 "github.com/go-kit/kit/endpoint"
15 "github.com/go-kit/kit/log"
1416 "github.com/go-kit/kit/tracing/zipkin"
1517 )
1618
3032 r.Header.Set("X-B3-ParentSpanId", strconv.FormatInt(parentSpanID, 16))
3133
3234 newSpan := zipkin.MakeNewSpanFunc(hostport, serviceName, methodName)
33 toContext := zipkin.ToContext(newSpan)
35 toContext := zipkin.ToContext(newSpan, log.NewLogfmtLogger(ioutil.Discard))
3436
3537 ctx := toContext(context.Background(), r)
3638 val := ctx.Value(zipkin.SpanContextKey)