Codebase list golang-github-go-kit-kit / ef2fe97
added sampling logic to kafka collector and some code cleanup Bas van Beek 8 years ago
7 changed file(s) with 129 addition(s) and 79 deletion(s). Raw diff Collapse all Expand all
00 package zipkin
11
22 import (
3 "math/rand"
4
35 "github.com/apache/thrift/lib/go/thrift"
46 "gopkg.in/Shopify/sarama.v1"
57
68 "github.com/go-kit/kit/log"
79 )
810
9 // KafkaTopic sets the Kafka topic our Collector will publish on. The
10 // default topic for zipkin-receiver-kafka is "zipkin", see:
11 // defaultKafkaTopic sets the standard Kafka topic our Collector will publish
12 // on. The default topic for zipkin-receiver-kafka is "zipkin", see:
1113 // https://github.com/openzipkin/zipkin/tree/master/zipkin-receiver-kafka
12 var KafkaTopic = "zipkin"
14 const defaultKafkaTopic = "zipkin"
1315
14 // KafkaCollector implements Collector by forwarding spans to a Kafka
15 // service.
16 // KafkaCollector implements Collector by publishing spans to a Kafka
17 // broker.
1618 type KafkaCollector struct {
17 producer sarama.AsyncProducer
18 logger log.Logger
19 producer sarama.AsyncProducer
20 logger log.Logger
21 topic string
22 shouldSample Sampler
1923 }
2024
2125 // KafkaOption sets a parameter for the KafkaCollector
22 type KafkaOption func(s *KafkaCollector)
26 type KafkaOption func(c *KafkaCollector)
2327
2428 // KafkaLogger sets the logger used to report errors in the collection
2529 // process. By default, a no-op logger is used, i.e. no errors are logged
2630 // anywhere. It's important to set this option.
2731 func KafkaLogger(logger log.Logger) KafkaOption {
28 return func(k *KafkaCollector) { k.logger = logger }
32 return func(c *KafkaCollector) { c.logger = logger }
2933 }
3034
3135 // KafkaProducer sets the producer used to produce to Kafka.
3337 return func(c *KafkaCollector) { c.producer = p }
3438 }
3539
40 // KafkaTopic sets the kafka topic to attach the collector producer on.
41 func KafkaTopic(t string) KafkaOption {
42 return func(c *KafkaCollector) { c.topic = t }
43 }
44
45 // KafkaSampleRate sets the sample rate used to determine if a trace will be
46 // sent to the collector. By default, the sample rate is 1.0, i.e. all traces
47 // are sent.
48 func KafkaSampleRate(sr Sampler) KafkaOption {
49 return func(c *KafkaCollector) { c.shouldSample = sr }
50 }
51
3652 // NewKafkaCollector returns a new Kafka-backed Collector. addrs should be a
3753 // slice of TCP endpoints of the form "host:port".
3854 func NewKafkaCollector(addrs []string, options ...KafkaOption) (Collector, error) {
39 c := &KafkaCollector{}
55 c := &KafkaCollector{
56 logger: log.NewNopLogger(),
57 topic: defaultKafkaTopic,
58 shouldSample: SampleRate(1.0, rand.Int63()),
59 }
60
4061 for _, option := range options {
4162 option(c)
4263 }
64
4365 if c.producer == nil {
4466 p, err := sarama.NewAsyncProducer(addrs, nil)
4567 if err != nil {
4769 }
4870 c.producer = p
4971 }
50 if c.logger == nil {
51 c.logger = log.NewNopLogger()
52 }
5372
5473 go c.logErrors()
74
5575 return c, nil
5676 }
5777
6383
6484 // Collect implements Collector.
6585 func (c *KafkaCollector) Collect(s *Span) error {
66 c.producer.Input() <- &sarama.ProducerMessage{
67 Topic: KafkaTopic,
68 Key: nil,
69 Value: sarama.ByteEncoder(byteSerialize(s)),
86 if c.shouldSample(s.traceID) {
87 c.producer.Input() <- &sarama.ProducerMessage{
88 Topic: c.topic,
89 Key: nil,
90 Value: sarama.ByteEncoder(kafkaSerialize(s)),
91 }
7092 }
7193 return nil
7294 }
7698 return c.producer.Close()
7799 }
78100
79 func byteSerialize(s *Span) []byte {
101 func kafkaSerialize(s *Span) []byte {
80102 t := thrift.NewTMemoryBuffer()
81103 p := thrift.NewTBinaryProtocolTransport(t)
82104 if err := s.Encode().Write(p); err != nil {
0 package zipkin
1
2 import "math"
3
4 // Sampler functions return if a Zipkin span should be sampled, based on its
5 // traceID.
6 type Sampler func(id int64) bool
7
8 // SampleRate returns a sampler function using a particular sample rate and a
9 // sample salt to identify if a Zipkin span based on its spanID should be
10 // collected.
11 func SampleRate(rate float64, salt int64) Sampler {
12 if rate <= 0 {
13 return func(_ int64) bool {
14 return false
15 }
16 }
17 if rate >= 1.0 {
18 return func(_ int64) bool {
19 return true
20 }
21 }
22 return func(id int64) bool {
23 return int64(math.Abs(float64(id^salt)))%10000 < int64(rate*10000)
24 }
25 }
0 package zipkin
1
2 import "testing"
3
4 func TestSampleRate(t *testing.T) {
5 type triple struct {
6 id, salt int64
7 rate float64
8 }
9 for input, want := range map[triple]bool{
10 triple{123, 456, 1.0}: true,
11 triple{123, 456, 999}: true,
12 triple{123, 456, 0.0}: false,
13 triple{123, 456, -42}: false,
14 triple{1229998, 0, 0.01}: false,
15 triple{1229999, 0, 0.01}: false,
16 triple{1230000, 0, 0.01}: true,
17 triple{1230001, 0, 0.01}: true,
18 triple{1230098, 0, 0.01}: true,
19 triple{1230099, 0, 0.01}: true,
20 triple{1230100, 0, 0.01}: false,
21 triple{1230101, 0, 0.01}: false,
22 triple{1, 9999999, 0.01}: false,
23 triple{999, 0, 0.99}: true,
24 triple{9999, 0, 0.99}: false,
25 } {
26 sampler := SampleRate(input.rate, input.salt)
27 if have := sampler(input.id); want != have {
28 t.Errorf("%#+v: want %v, have %v", input, want, have)
29 }
30 }
31 }
22 import (
33 "encoding/base64"
44 "fmt"
5 "math"
65 "math/rand"
76 "net"
87 "time"
1211 "github.com/go-kit/kit/log"
1312 "github.com/go-kit/kit/tracing/zipkin/_thrift/gen-go/scribe"
1413 )
14
15 const defaultScribeCategory = "zipkin"
16
17 // defaultBatchInterval in seconds
18 const defaultBatchInterval = 1
1519
1620 // ScribeCollector implements Collector by forwarding spans to a Scribe
1721 // service, in batches.
2428 nextSend time.Time
2529 batchInterval time.Duration
2630 batchSize int
27 sampleRate float64
28 sampleSalt int64
31 shouldSample Sampler
2932 logger log.Logger
33 category string
34 quit chan struct{}
3035 }
3136
3237 // NewScribeCollector returns a new Scribe-backed Collector. addr should be a
4146 if err != nil {
4247 return nil, err
4348 }
44 defaultBatchInterval := time.Second
4549 c := &ScribeCollector{
4650 client: client,
4751 factory: factory,
4852 spanc: make(chan *Span),
4953 sendc: make(chan struct{}),
5054 batch: []*scribe.LogEntry{},
51 nextSend: time.Now().Add(defaultBatchInterval),
52 batchInterval: defaultBatchInterval,
55 batchInterval: defaultBatchInterval * time.Second,
5356 batchSize: 100,
54 sampleRate: 1.0,
55 sampleSalt: rand.Int63(),
57 shouldSample: SampleRate(1.0, rand.Int63()),
5658 logger: log.NewNopLogger(),
59 category: defaultScribeCategory,
60 quit: make(chan struct{}),
5761 }
5862 for _, option := range options {
5963 option(c)
6064 }
65 c.nextSend = time.Now().Add(c.batchInterval)
6166 go c.loop()
6267 return c, nil
6368 }
6873 return nil // accepted
6974 }
7075
76 // Close implements Collector.
7177 func (c *ScribeCollector) Close() error {
72 // TODO Close the underlying transport here?
78 close(c.quit)
7379 return nil
7480 }
7581
7985 for {
8086 select {
8187 case span := <-c.spanc:
82 if !shouldSample(span.traceID, c.sampleSalt, c.sampleRate) {
88 if !c.shouldSample(span.traceID) {
8389 continue
8490 }
8591 c.batch = append(c.batch, &scribe.LogEntry{
86 Category: "zipkin", // TODO parameterize?
87 Message: serialize(span),
92 Category: c.category,
93 Message: scribeSerialize(span),
8894 })
8995 if len(c.batch) >= c.batchSize {
9096 go c.sendNow()
101107 c.logger.Log("err", err.Error())
102108 }
103109 c.batch = c.batch[:0]
110 case <-c.quit:
111 return
104112 }
105113 }
106114 }
144152 // ScribeSampleRate sets the sample rate used to determine if a trace will be
145153 // sent to the collector. By default, the sample rate is 1.0, i.e. all traces
146154 // are sent.
147 func ScribeSampleRate(f float64) ScribeOption {
148 return func(s *ScribeCollector) { s.sampleRate = f }
155 func ScribeSampleRate(sr Sampler) ScribeOption {
156 return func(s *ScribeCollector) { s.shouldSample = sr }
149157 }
150158
151159 // ScribeLogger sets the logger used to report errors in the collection
153161 // anywhere. It's important to set this option in a production service.
154162 func ScribeLogger(logger log.Logger) ScribeOption {
155163 return func(s *ScribeCollector) { s.logger = logger }
164 }
165
166 // ScribeCategory sets the Scribe category used to transmit the spans.
167 func ScribeCategory(category string) ScribeOption {
168 return func(s *ScribeCollector) { s.category = category }
156169 }
157170
158171 func scribeClientFactory(addr string, timeout time.Duration) func() (scribe.Scribe, error) {
173186 }
174187 }
175188
176 func serialize(s *Span) string {
189 func scribeSerialize(s *Span) string {
177190 t := thrift.NewTMemoryBuffer()
178191 p := thrift.NewTBinaryProtocolTransport(t)
179192 if err := s.Encode().Write(p); err != nil {
181194 }
182195 return base64.StdEncoding.EncodeToString(t.Buffer.Bytes())
183196 }
184
185 func shouldSample(id int64, salt int64, rate float64) bool {
186 if rate <= 0 {
187 return false
188 }
189 if rate >= 1.0 {
190 return true
191 }
192 return int64(math.Abs(float64(id^salt)))%10000 < int64(rate*10000)
193 }
+0
-31
tracing/zipkin/scribe_internal_test.go less more
0 package zipkin
1
2 import "testing"
3
4 func TestShouldSample(t *testing.T) {
5 type triple struct {
6 id, salt int64
7 rate float64
8 }
9 for input, want := range map[triple]bool{
10 triple{123, 456, 1.0}: true,
11 triple{123, 456, 999}: true,
12 triple{123, 456, 0.0}: false,
13 triple{123, 456, -42}: false,
14 triple{1229998, 0, 0.01}: false,
15 triple{1229999, 0, 0.01}: false,
16 triple{1230000, 0, 0.01}: true,
17 triple{1230001, 0, 0.01}: true,
18 triple{1230098, 0, 0.01}: true,
19 triple{1230099, 0, 0.01}: true,
20 triple{1230100, 0, 0.01}: false,
21 triple{1230101, 0, 0.01}: false,
22 triple{1, 9999999, 0.01}: false,
23 triple{999, 0, 0.99}: true,
24 triple{9999, 0, 0.99}: false,
25 } {
26 if have := shouldSample(input.id, input.salt, input.rate); want != have {
27 t.Errorf("%#+v: want %v, have %v", input, want, have)
28 }
29 }
30 }
88 "time"
99
1010 "github.com/go-kit/kit/tracing/zipkin/_thrift/gen-go/zipkincore"
11 )
12
13 var (
14 // SpanContextKey represents the Span in the request context.
15 SpanContextKey = "Zipkin-Span"
1611 )
1712
1813 // A Span is a named collection of annotations. It represents meaningful
2424 // • https://gist.github.com/yoavaa/3478d3a0df666f21a98c
2525
2626 const (
27 // SpanContextKey holds the key used to store Zipkin spans in the context.
28 SpanContextKey = "Zipkin-Span"
29
2730 // https://github.com/racker/tryfer#headers
2831 traceIDHTTPHeader = "X-B3-TraceId"
2932 spanIDHTTPHeader = "X-B3-SpanId"