Package list golang-github-go-kit-kit / 87d3373
Add support for a fixed sampling rate - Convert NewScribeCollector to functional options - Add log.NewNopLogger - Tests Peter Bourgon 6 years ago
6 changed file(s) with 116 addition(s) and 11 deletion(s). Raw diff Collapse all Expand all
9595 if zipkinCollector, err = zipkin.NewScribeCollector(
9696 *zipkinCollectorAddr,
9797 *zipkinCollectorTimeout,
98 *zipkinCollectorBatchSize,
99 *zipkinCollectorBatchInterval,
100 logger,
98 zipkin.ScribeBatchSize(*zipkinCollectorBatchSize),
99 zipkin.ScribeBatchInterval(*zipkinCollectorBatchInterval),
100 zipkin.ScribeLogger(logger),
101101 ); err != nil {
102102 logger.Log("err", err)
103103 os.Exit(1)
0 package log
1
2 type nopLogger struct{}
3
4 func (nopLogger) Log(...interface{}) error { return nil }
5
6 func NewNopLogger() Logger { return nopLogger{} }
0 package log_test
1
2 import (
3 "testing"
4
5 "github.com/go-kit/kit/log"
6 )
7
8 func TestNopLogger(t *testing.T) {
9 logger := log.NewNopLogger()
10 if err := logger.Log("abc", 123); err != nil {
11 t.Error(err)
12 }
13 if err := log.NewContext(logger).With("def", "ghi").Log(); err != nil {
14 t.Error(err)
15 }
16 }
33 "encoding/base64"
44 "errors"
55 "fmt"
6 "math"
7 "math/rand"
68 "net"
79 "strings"
810 "time"
3032 nextSend time.Time
3133 batchInterval time.Duration
3234 batchSize int
35 sampleRate float64
36 sampleSalt int64
3337 logger log.Logger
3438 }
3539
3943 // maximum size and interval of a batch of spans; as soon as either limit is
4044 // reached, the batch is sent. The logger is used to log errors, such as batch
4145 // send failures; users should provide an appropriate context, if desired.
42 func NewScribeCollector(addr string, timeout time.Duration, batchSize int, batchInterval time.Duration, logger log.Logger) (Collector, error) {
46 func NewScribeCollector(addr string, timeout time.Duration, options ...ScribeOption) (Collector, error) {
4347 factory := scribeClientFactory(addr, timeout)
4448 client, err := factory()
4549 if err != nil {
4650 return nil, err
4751 }
52 defaultBatchInterval := time.Second
4853 c := &ScribeCollector{
4954 client: client,
5055 factory: factory,
5156 spanc: make(chan *Span),
5257 sendc: make(chan struct{}),
5358 batch: []*scribe.LogEntry{},
54 nextSend: time.Now().Add(batchInterval),
55 batchInterval: batchInterval,
56 batchSize: batchSize,
57 logger: logger,
59 nextSend: time.Now().Add(defaultBatchInterval),
60 batchInterval: defaultBatchInterval,
61 batchSize: 100,
62 sampleRate: 1.0,
63 sampleSalt: rand.Int63(),
64 logger: log.NewNopLogger(),
65 }
66 for _, option := range options {
67 option(c)
5868 }
5969 go c.loop()
6070 return c, nil
7282 for {
7383 select {
7484 case span := <-c.spanc:
85 if !shouldSample(span.traceID, c.sampleSalt, c.sampleRate) {
86 continue
87 }
7588 c.batch = append(c.batch, &scribe.LogEntry{
7689 Category: "zipkin", // TODO parameterize?
7790 Message: serialize(span),
116129 return nil
117130 }
118131
132 // ScribeOption sets a parameter for the StdlibAdapter.
133 type ScribeOption func(s *ScribeCollector)
134
135 // ScribeBatchSize sets the maximum batch size, after which a collect will be
136 // triggered. The default batch size is 100 traces.
137 func ScribeBatchSize(n int) ScribeOption {
138 return func(s *ScribeCollector) { s.batchSize = n }
139 }
140
141 // ScribeBatchInterval sets the maximum duration we will buffer traces before
142 // emitting them to the collector. The default batch interval is 1 second.
143 func ScribeBatchInterval(d time.Duration) ScribeOption {
144 return func(s *ScribeCollector) { s.batchInterval = d }
145 }
146
147 // ScribeSampleRate sets the sample rate used to determine if a trace will be
148 // sent to the collector. By default, the sample rate is 1.0, i.e. all traces
149 // are sent.
150 func ScribeSampleRate(f float64) ScribeOption {
151 return func(s *ScribeCollector) { s.sampleRate = f }
152 }
153
154 // ScribeLogger sets the logger used to report errors in the collection
155 // process. By default, a no-op logger is used, i.e. no errors are logged
156 // anywhere. It's important to set this option in a production service.
157 func ScribeLogger(logger log.Logger) ScribeOption {
158 return func(s *ScribeCollector) { s.logger = logger }
159 }
160
119161 func scribeClientFactory(addr string, timeout time.Duration) func() (scribe.Scribe, error) {
120162 return func() (scribe.Scribe, error) {
121163 a, err := net.ResolveTCPAddr("tcp", addr)
141183 panic(err)
142184 }
143185 return base64.StdEncoding.EncodeToString(t.Buffer.Bytes())
186 }
187
188 func shouldSample(id int64, salt int64, rate float64) bool {
189 if rate <= 0 {
190 return false
191 }
192 if rate >= 1.0 {
193 return true
194 }
195 return int64(math.Abs(float64(id^salt)))%10000 < int64(rate*10000)
144196 }
145197
146198 // NopCollector implements Collector but performs no work.
0 package zipkin
1
2 import "testing"
3
4 func TestShouldSample(t *testing.T) {
5 type triple struct {
6 id, salt int64
7 rate float64
8 }
9 for input, want := range map[triple]bool{
10 triple{123, 456, 1.0}: true,
11 triple{123, 456, 999}: true,
12 triple{123, 456, 0.0}: false,
13 triple{123, 456, -42}: false,
14 triple{1229998, 0, 0.01}: false,
15 triple{1229999, 0, 0.01}: false,
16 triple{1230000, 0, 0.01}: true,
17 triple{1230001, 0, 0.01}: true,
18 triple{1230098, 0, 0.01}: true,
19 triple{1230099, 0, 0.01}: true,
20 triple{1230100, 0, 0.01}: false,
21 triple{1230101, 0, 0.01}: false,
22 triple{1, 9999999, 0.01}: false,
23 triple{999, 0, 0.99}: true,
24 triple{9999, 0, 0.99}: false,
25 } {
26 if have := shouldSample(input.id, input.salt, input.rate); want != have {
27 t.Errorf("%#+v: want %v, have %v", input, want, have)
28 }
29 }
30 }
22 import (
33 "encoding/base64"
44 "fmt"
5 "io/ioutil"
65 "math/rand"
76 "net"
87 "sync"
1110
1211 "github.com/apache/thrift/lib/go/thrift"
1312
14 "github.com/go-kit/kit/log"
1513 "github.com/go-kit/kit/tracing/zipkin"
1614 "github.com/go-kit/kit/tracing/zipkin/_thrift/gen-go/scribe"
1715 "github.com/go-kit/kit/tracing/zipkin/_thrift/gen-go/zipkincore"
2220
2321 timeout := time.Second
2422 batchInterval := time.Millisecond
25 c, err := zipkin.NewScribeCollector(server.addr(), timeout, 0, batchInterval, log.NewLogfmtLogger(ioutil.Discard))
23 c, err := zipkin.NewScribeCollector(server.addr(), timeout, zipkin.ScribeBatchSize(0), zipkin.ScribeBatchInterval(batchInterval))
2624 if err != nil {
2725 t.Fatal(err)
2826 }