Zipkin and Kafka Collector updates
Minor code formatting fixes
Added missing comments on exported functions
Added CollectionError to aid with handling errors returned by
MultiCollector
Bas van Beek
8 years ago
0 | 0 | package zipkin |
1 | 1 | |
2 | import ( | |
3 | "errors" | |
4 | "strings" | |
5 | ) | |
2 | import "strings" | |
6 | 3 | |
7 | 4 | // Collector represents a Zipkin trace collector, which is probably a set of |
8 | 5 | // remote endpoints. |
16 | 13 | |
17 | 14 | // Collect implements Collector. |
18 | 15 | func (NopCollector) Collect(*Span) error { return nil } |
19 | func (NopCollector) Close() error { return nil } | |
16 | ||
17 | // Close implements Collector. | |
18 | func (NopCollector) Close() error { return nil } | |
20 | 19 | |
21 | 20 | // MultiCollector implements Collector by sending spans to all collectors. |
22 | 21 | type MultiCollector []Collector |
26 | 25 | return c.aggregateErrors(func(coll Collector) error { return coll.Collect(s) }) |
27 | 26 | } |
28 | 27 | |
28 | // Close implements Collector. | |
29 | 29 | func (c MultiCollector) Close() error { |
30 | 30 | return c.aggregateErrors(func(coll Collector) error { return coll.Close() }) |
31 | 31 | } |
32 | 32 | |
33 | 33 | func (c MultiCollector) aggregateErrors(f func(c Collector) error) error { |
34 | var e *collectionError | |
35 | for i, collector := range c { | |
36 | if err := f(collector); err != nil { | |
37 | if e == nil { | |
38 | e = &collectionError{ | |
39 | errs: make([]error, len(c)), | |
40 | } | |
41 | } | |
42 | e.errs[i] = err | |
43 | } | |
44 | } | |
45 | return e | |
46 | } | |
47 | ||
48 | // CollectionError represents an array of errors returned by one or more | |
49 | // failed Collector methods. | |
50 | type CollectionError interface { | |
51 | Error() string | |
52 | GetErrors() []error | |
53 | } | |
54 | ||
55 | type collectionError struct { | |
56 | errs []error | |
57 | } | |
58 | ||
59 | func (c *collectionError) Error() string { | |
34 | 60 | errs := []string{} |
35 | for _, collector := range c { | |
36 | if err := f(collector); err != nil { | |
61 | for _, err := range c.errs { | |
62 | if err != nil { | |
37 | 63 | errs = append(errs, err.Error()) |
38 | 64 | } |
39 | 65 | } |
40 | if len(errs) > 0 { | |
41 | return errors.New(strings.Join(errs, "; ")) | |
42 | } | |
43 | return nil | |
66 | return strings.Join(errs, "; ") | |
44 | 67 | } |
68 | ||
69 | // GetErrors implements CollectionError | |
70 | func (c *collectionError) GetErrors() []error { | |
71 | return c.errs | |
72 | } |
6 | 6 | "github.com/go-kit/kit/tracing/zipkin" |
7 | 7 | ) |
8 | 8 | |
9 | var s *zipkin.Span = zipkin.NewSpan("203.0.113.10:1234", "service1", "avg", 123, 456, 0) | |
9 | var s = zipkin.NewSpan("203.0.113.10:1234", "service1", "avg", 123, 456, 0) | |
10 | 10 | |
11 | 11 | func TestNopCollector(t *testing.T) { |
12 | 12 | c := zipkin.NopCollector{} |
31 | 31 | } |
32 | 32 | return nil |
33 | 33 | } |
34 | ||
34 | 35 | func (c *stubCollector) Close() error { |
35 | 36 | c.closed = true |
36 | 37 | if c.errid != 0 { |
46 | 47 | &stubCollector{errid: 2}, |
47 | 48 | } |
48 | 49 | err := cs.Collect(s) |
49 | wanted := "error 1; error 2" | |
50 | if err == nil || err.Error() != wanted { | |
51 | t.Errorf("errors not propagated. got %v, wanted %s", err, wanted) | |
50 | if err == nil { | |
51 | t.Fatal("wanted error, got none") | |
52 | } | |
53 | if want, have := "error 1; error 2", err.Error(); want != have { | |
54 | t.Errorf("want %q, have %q", want, have) | |
55 | } | |
56 | collectionError := err.(zipkin.CollectionError).GetErrors() | |
57 | if want, have := 3, len(collectionError); want != have { | |
58 | t.Fatalf("want %d, have %d", want, have) | |
59 | } | |
60 | if want, have := cs[0].Collect(s).Error(), collectionError[0].Error(); want != have { | |
61 | t.Errorf("want %q, have %q", want, have) | |
62 | } | |
63 | if want, have := cs[1].Collect(s), collectionError[1]; want != have { | |
64 | t.Errorf("want %q, have %q", want, have) | |
65 | } | |
66 | if want, have := cs[2].Collect(s).Error(), collectionError[2].Error(); want != have { | |
67 | t.Errorf("want %q, have %q", want, have) | |
52 | 68 | } |
53 | 69 | |
54 | 70 | for _, c := range cs { |
55 | sc := c.(*stubCollector) | |
56 | if !sc.collected { | |
71 | if !c.(*stubCollector).collected { | |
57 | 72 | t.Error("collect not called") |
58 | 73 | } |
59 | 74 | } |
66 | 81 | &stubCollector{errid: 2}, |
67 | 82 | } |
68 | 83 | err := cs.Close() |
69 | wanted := "error 1; error 2" | |
70 | if err == nil || err.Error() != wanted { | |
71 | t.Errorf("errors not propagated. got %v, wanted %s", err, wanted) | |
84 | if err == nil { | |
85 | t.Fatal("wanted error, got none") | |
86 | } | |
87 | if want, have := "error 1; error 2", err.Error(); want != have { | |
88 | t.Errorf("want %q, have %q", want, have) | |
72 | 89 | } |
73 | 90 | |
74 | 91 | for _, c := range cs { |
75 | sc := c.(*stubCollector) | |
76 | if !sc.closed { | |
92 | if !c.(*stubCollector).closed { | |
77 | 93 | t.Error("close not called") |
78 | 94 | } |
79 | 95 | } |
6 | 6 | "github.com/go-kit/kit/log" |
7 | 7 | ) |
8 | 8 | |
9 | // KafkaTopic sets the Kafka topic our Collector will publish on. The | |
10 | // default topic for zipkin-receiver-kafka is "zipkin", see: | |
11 | // https://github.com/openzipkin/zipkin/tree/master/zipkin-receiver-kafka | |
9 | 12 | var KafkaTopic = "zipkin" |
10 | 13 | |
11 | 14 | // KafkaCollector implements Collector by forwarding spans to a Kafka |
68 | 71 | return nil |
69 | 72 | } |
70 | 73 | |
74 | // Close implements Collector. | |
71 | 75 | func (c *KafkaCollector) Close() error { |
72 | 76 | return c.producer.Close() |
73 | 77 | } |
1 | 1 | |
2 | 2 | import ( |
3 | 3 | "encoding/binary" |
4 | "fmt" | |
5 | "math" | |
4 | 6 | "net" |
5 | 7 | "strconv" |
6 | 8 | "time" |
62 | 64 | return nil |
63 | 65 | } |
64 | 66 | endpoint := zipkincore.NewEndpoint() |
65 | binary.LittleEndian.PutUint32(addrs[0], (uint32)(endpoint.Ipv4)) | |
67 | endpoint.Ipv4 = (int32)(binary.BigEndian.Uint32(addrs[0].To4())) | |
66 | 68 | endpoint.Port = int16(portInt) |
67 | 69 | endpoint.ServiceName = serviceName |
68 | 70 | return endpoint |
93 | 95 | s.AnnotateDuration(value, 0) |
94 | 96 | } |
95 | 97 | |
96 | // AnnotateBinary annotates the span with a key and a byte value. | |
97 | func (s *Span) AnnotateBinary(key string, value []byte) { | |
98 | // AnnotateBinary annotates the span with a key and a value that will be []byte | |
99 | // encoded. | |
100 | func (s *Span) AnnotateBinary(key string, value interface{}) { | |
101 | var a zipkincore.AnnotationType | |
102 | var b []byte | |
103 | // We are not using zipkincore.AnnotationType_I16 for types that could fit | |
104 | // as reporting on it seems to be broken on the zipkin web interface | |
105 | // (however, we can properly extract the number from zipkin storage | |
106 | // directly). int64 has issues with negative numbers but seems ok for | |
107 | // positive numbers needing more than 32 bit. | |
108 | switch v := value.(type) { | |
109 | case bool: | |
110 | a = zipkincore.AnnotationType_BOOL | |
111 | b = []byte("\x00") | |
112 | if v { | |
113 | b = []byte("\x01") | |
114 | } | |
115 | case []byte: | |
116 | a = zipkincore.AnnotationType_BYTES | |
117 | b = v | |
118 | case byte: | |
119 | a = zipkincore.AnnotationType_I32 | |
120 | b = make([]byte, 4) | |
121 | binary.BigEndian.PutUint32(b, uint32(v)) | |
122 | case int8: | |
123 | a = zipkincore.AnnotationType_I32 | |
124 | b = make([]byte, 4) | |
125 | binary.BigEndian.PutUint32(b, uint32(v)) | |
126 | case int16: | |
127 | a = zipkincore.AnnotationType_I32 | |
128 | b = make([]byte, 4) | |
129 | binary.BigEndian.PutUint32(b, uint32(v)) | |
130 | case uint16: | |
131 | a = zipkincore.AnnotationType_I32 | |
132 | b = make([]byte, 4) | |
133 | binary.BigEndian.PutUint32(b, uint32(v)) | |
134 | case int32: | |
135 | a = zipkincore.AnnotationType_I32 | |
136 | b = make([]byte, 4) | |
137 | binary.BigEndian.PutUint32(b, uint32(v)) | |
138 | case uint32: | |
139 | a = zipkincore.AnnotationType_I32 | |
140 | b = make([]byte, 4) | |
141 | binary.BigEndian.PutUint32(b, uint32(v)) | |
142 | case int64: | |
143 | a = zipkincore.AnnotationType_I64 | |
144 | b = make([]byte, 8) | |
145 | binary.BigEndian.PutUint64(b, uint64(v)) | |
146 | case int: | |
147 | a = zipkincore.AnnotationType_I32 | |
148 | b = make([]byte, 8) | |
149 | binary.BigEndian.PutUint32(b, uint32(v)) | |
150 | case uint: | |
151 | a = zipkincore.AnnotationType_I32 | |
152 | b = make([]byte, 8) | |
153 | binary.BigEndian.PutUint32(b, uint32(v)) | |
154 | case uint64: | |
155 | a = zipkincore.AnnotationType_I64 | |
156 | b = make([]byte, 8) | |
157 | binary.BigEndian.PutUint64(b, uint64(v)) | |
158 | case float32: | |
159 | a = zipkincore.AnnotationType_DOUBLE | |
160 | b = make([]byte, 8) | |
161 | bits := math.Float64bits(float64(v)) | |
162 | binary.BigEndian.PutUint64(b, bits) | |
163 | case float64: | |
164 | a = zipkincore.AnnotationType_DOUBLE | |
165 | b = make([]byte, 8) | |
166 | bits := math.Float64bits(v) | |
167 | binary.BigEndian.PutUint64(b, bits) | |
168 | case string: | |
169 | a = zipkincore.AnnotationType_STRING | |
170 | b = []byte(v) | |
171 | default: | |
172 | // we have no handler for type's value, but let's get a string | |
173 | // representation of it. | |
174 | a = zipkincore.AnnotationType_STRING | |
175 | b = []byte(fmt.Sprintf("%+v", value)) | |
176 | } | |
98 | 177 | s.binaryAnnotations = append(s.binaryAnnotations, binaryAnnotation{ |
99 | 178 | key: key, |
100 | value: value, | |
101 | annotationType: zipkincore.AnnotationType_BYTES, | |
179 | value: b, | |
180 | annotationType: a, | |
102 | 181 | host: s.host, |
103 | 182 | }) |
104 | 183 | } |