Codebase list python-py-zipkin / 21b0d92
New upstream version 0.9.0 Olivier Sallou 6 years ago
17 changed file(s) with 1779 addition(s) and 0 deletion(s). Raw diff Collapse all Expand all
(New empty file)
0 Metadata-Version: 1.1
1 Name: py_zipkin
2 Version: 0.9.0
3 Summary: Library for using Zipkin in Python.
4 Home-page: https://github.com/Yelp/py_zipkin
5 Author: Yelp, Inc.
6 Author-email: opensource+py-zipkin@yelp.com
7 License: Copyright Yelp 2017
8 Description: UNKNOWN
9 Platform: UNKNOWN
10 Classifier: Development Status :: 3 - Alpha
11 Classifier: Intended Audience :: Developers
12 Classifier: Topic :: Software Development :: Libraries :: Python Modules
13 Classifier: License :: OSI Approved :: Apache Software License
14 Classifier: Operating System :: OS Independent
15 Classifier: Programming Language :: Python :: 2.7
16 Classifier: Programming Language :: Python :: 3.4
17 Classifier: Programming Language :: Python :: 3.5
18 Classifier: Programming Language :: Python :: 3.6
19 Provides: py_zipkin
(New empty file)
0 # -*- coding: utf-8 -*-
1
2
3 class ZipkinError(Exception):
4 """Custom error to be raised on Zipkin exceptions."""
0 # -*- coding: utf-8 -*-
1 import logging
2 import time
3 from collections import defaultdict
4
5 from py_zipkin.exception import ZipkinError
6 from py_zipkin.thrift import annotation_list_builder
7 from py_zipkin.thrift import binary_annotation_list_builder
8 from py_zipkin.thrift import copy_endpoint_with_new_service_name
9 from py_zipkin.thrift import create_span
10 from py_zipkin.thrift import thrift_objs_in_bytes
11 from py_zipkin.util import generate_random_64bit_string
12
13
14 try: # Python 2.7+
15 from logging import NullHandler
16 except ImportError: # pragma: no cover
17 class NullHandler(logging.Handler):
18 def emit(self, record):
19 pass
20
21 null_handler = NullHandler()
22 zipkin_logger = logging.getLogger('py_zipkin.logger')
23 zipkin_logger.addHandler(null_handler)
24 zipkin_logger.setLevel(logging.DEBUG)
25
26 LOGGING_END_KEY = 'py_zipkin.logging_end'
27
28
29 class ZipkinLoggingContext(object):
30 """A logging context specific to a Zipkin trace. If the trace is sampled,
31 the logging context sends serialized Zipkin spans to a transport_handler.
32 The logging context sends root "server" or "client" span, as well as all
33 local child spans collected within this context.
34
35 This class should only be used by the main `zipkin_span` entrypoint.
36 """
37
38 def __init__(
39 self,
40 zipkin_attrs,
41 thrift_endpoint,
42 log_handler,
43 span_name,
44 transport_handler,
45 report_root_timestamp,
46 binary_annotations=None,
47 add_logging_annotation=False,
48 client_context=False,
49 max_span_batch_size=None,
50 ):
51 self.zipkin_attrs = zipkin_attrs
52 self.thrift_endpoint = thrift_endpoint
53 self.log_handler = log_handler
54 self.span_name = span_name
55 self.transport_handler = transport_handler
56 self.response_status_code = 0
57 self.report_root_timestamp = report_root_timestamp
58 self.binary_annotations_dict = binary_annotations or {}
59 self.sa_binary_annotations = []
60 self.add_logging_annotation = add_logging_annotation
61 self.client_context = client_context
62 self.max_span_batch_size = max_span_batch_size
63
64 def start(self):
65 """Actions to be taken before request is handled.
66 1) Attach `zipkin_logger` to :class:`ZipkinLoggerHandler` object.
67 2) Record the start timestamp.
68 """
69 zipkin_logger.removeHandler(null_handler)
70 zipkin_logger.addHandler(self.log_handler)
71 self.start_timestamp = time.time()
72 return self
73
74 def stop(self):
75 """Actions to be taken post request handling.
76 1) Log the service annotations to scribe.
77 2) Detach `zipkin_logger` handler.
78 """
79 self.log_spans()
80 zipkin_logger.removeHandler(self.log_handler)
81 zipkin_logger.addHandler(null_handler)
82
83 def log_spans(self):
84 """Main function to log all the annotations stored during the entire
85 request. This is done if the request is sampled and the response was
86 a success. It also logs the service (`ss` and `sr`) or the client
87 ('cs' and 'cr') annotations.
88 """
89 if not self.zipkin_attrs.is_sampled:
90 return
91
92 span_sender = ZipkinBatchSender(self.transport_handler,
93 self.max_span_batch_size)
94 with span_sender:
95 end_timestamp = time.time()
96 # Collect additional annotations from the logging handler
97 annotations_by_span_id = defaultdict(dict)
98 binary_annotations_by_span_id = defaultdict(dict)
99 for msg in self.log_handler.extra_annotations:
100 span_id = msg['parent_span_id'] or self.zipkin_attrs.span_id
101 # This should check if these are non-None
102 annotations_by_span_id[span_id].update(msg['annotations'])
103 binary_annotations_by_span_id[span_id].update(
104 msg['binary_annotations']
105 )
106
107 # Collect, annotate, and log client spans from the logging handler
108 for span in self.log_handler.client_spans:
109 # The parent_span_id is either the parent ID set in the
110 # logging handler or the current Zipkin context's span ID.
111 parent_span_id = (
112 span['parent_span_id'] or
113 self.zipkin_attrs.span_id
114 )
115 # A new client span's span ID can be overridden
116 span_id = span['span_id'] or generate_random_64bit_string()
117 endpoint = copy_endpoint_with_new_service_name(
118 self.thrift_endpoint, span['service_name']
119 )
120 # Collect annotations both logged with the new spans and
121 # logged in separate log messages.
122 annotations = span['annotations']
123 annotations.update(annotations_by_span_id[span_id])
124 binary_annotations = span['binary_annotations']
125 binary_annotations.update(
126 binary_annotations_by_span_id[span_id])
127
128 timestamp, duration = get_local_span_timestamp_and_duration(
129 annotations
130 )
131 # Create serializable thrift objects of annotations
132 thrift_annotations = annotation_list_builder(
133 annotations, endpoint
134 )
135 thrift_binary_annotations = binary_annotation_list_builder(
136 binary_annotations, endpoint
137 )
138 if span.get('sa_binary_annotations'):
139 thrift_binary_annotations += span['sa_binary_annotations']
140
141 span_sender.add_span(
142 span_id=span_id,
143 parent_span_id=parent_span_id,
144 trace_id=self.zipkin_attrs.trace_id,
145 span_name=span['span_name'],
146 annotations=thrift_annotations,
147 binary_annotations=thrift_binary_annotations,
148 timestamp_s=timestamp,
149 duration_s=duration,
150 )
151
152 extra_annotations = annotations_by_span_id[
153 self.zipkin_attrs.span_id]
154 extra_binary_annotations = binary_annotations_by_span_id[
155 self.zipkin_attrs.span_id
156 ]
157
158 k1, k2 = ('sr', 'ss')
159 if self.client_context:
160 k1, k2 = ('cs', 'cr')
161 annotations = {k1: self.start_timestamp, k2: end_timestamp}
162 annotations.update(extra_annotations)
163
164 if self.add_logging_annotation:
165 annotations[LOGGING_END_KEY] = time.time()
166
167 thrift_annotations = annotation_list_builder(
168 annotations,
169 self.thrift_endpoint,
170 )
171
172 # Binary annotations can be set through debug messages or the
173 # set_extra_binary_annotations registry setting.
174 self.binary_annotations_dict.update(extra_binary_annotations)
175 thrift_binary_annotations = binary_annotation_list_builder(
176 self.binary_annotations_dict,
177 self.thrift_endpoint,
178 )
179 if self.sa_binary_annotations:
180 thrift_binary_annotations += self.sa_binary_annotations
181
182 if self.report_root_timestamp:
183 timestamp = self.start_timestamp
184 duration = end_timestamp - self.start_timestamp
185 else:
186 timestamp = duration = None
187
188 span_sender.add_span(
189 span_id=self.zipkin_attrs.span_id,
190 parent_span_id=self.zipkin_attrs.parent_span_id,
191 trace_id=self.zipkin_attrs.trace_id,
192 span_name=self.span_name,
193 annotations=thrift_annotations,
194 binary_annotations=thrift_binary_annotations,
195 timestamp_s=timestamp,
196 duration_s=duration,
197 )
198
199
200 def get_local_span_timestamp_and_duration(annotations):
201 if 'cs' in annotations and 'cr' in annotations:
202 return annotations['cs'], annotations['cr'] - annotations['cs']
203 elif 'sr' in annotations and 'ss' in annotations:
204 return annotations['sr'], annotations['ss'] - annotations['sr']
205 return None, None
206
207
208 class ZipkinLoggerHandler(logging.StreamHandler, object):
209 """Logger Handler to log span annotations or additional client spans to
210 scribe. To connect to the handler, logger name must be
211 'py_zipkin.logger'.
212
213 :param zipkin_attrs: ZipkinAttrs namedtuple object
214 """
215
216 def __init__(self, zipkin_attrs):
217 super(ZipkinLoggerHandler, self).__init__()
218 # If parent_span_id is set, the application is in a logging context
219 # where each additional client span logged has this span as its parent.
220 # This is to allow logging of hierarchies of spans instead of just
221 # single client spans. See the SpanContext class.
222 self.parent_span_id = None
223 self.zipkin_attrs = zipkin_attrs
224 self.client_spans = []
225 self.extra_annotations = []
226
227 def store_local_span(
228 self,
229 span_name,
230 service_name,
231 annotations,
232 binary_annotations,
233 sa_binary_annotations=None,
234 span_id=None,
235 ):
236 """Convenience method for storing a local child span (a zipkin_span
237 inside other zipkin_spans) to be logged when the outermost zipkin_span
238 exits.
239 """
240 self.client_spans.append({
241 'span_name': span_name,
242 'service_name': service_name,
243 'parent_span_id': self.parent_span_id,
244 'span_id': span_id,
245 'annotations': annotations,
246 'binary_annotations': binary_annotations,
247 'sa_binary_annotations': sa_binary_annotations,
248 })
249
250 def emit(self, record):
251 """Handle each record message. This function is called whenever
252 zipkin_logger.debug() is called.
253
254 :param record: object containing the `msg` object.
255 Structure of record.msg should be the following:
256 ::
257
258 {
259 "annotations": {
260 "cs": ts1,
261 "cr": ts2,
262 },
263 "binary_annotations": {
264 "http.uri": "/foo/bar",
265 },
266 "name": "foo_span",
267 "service_name": "myService",
268 }
269
270 Keys:
271 - annotations: str -> timestamp annotations
272 - binary_annotations: str -> str binary annotations
273 (One of either annotations or binary_annotations is required)
274 - name: str of new span name; only used if service-name is also
275 specified.
276 - service_name: str of new client span's service name.
277
278 If service_name is specified, this log msg is considered to
279 represent a new client span. If service_name is omitted, this is
280 considered additional annotation for the currently active
281 "parent span" (either the server span or the parent client span
282 inside a SpanContext).
283 """
284 if not self.zipkin_attrs.is_sampled:
285 return
286 span_name = record.msg.get('name', 'span')
287 annotations = record.msg.get('annotations', {})
288 binary_annotations = record.msg.get('binary_annotations', {})
289 if not annotations and not binary_annotations:
290 raise ZipkinError(
291 "At least one of annotation/binary annotation has"
292 " to be provided for {0} span".format(span_name)
293 )
294 service_name = record.msg.get('service_name', None)
295 # Presence of service_name means this is to be a new local span.
296 if service_name is not None:
297 self.store_local_span(
298 span_name=span_name,
299 service_name=service_name,
300 annotations=annotations,
301 binary_annotations=binary_annotations,
302 )
303 else:
304 self.extra_annotations.append({
305 'annotations': annotations,
306 'binary_annotations': binary_annotations,
307 'parent_span_id': self.parent_span_id,
308 })
309
310
311 class ZipkinBatchSender(object):
312
313 MAX_PORTION_SIZE = 100
314
315 def __init__(self, transport_handler, max_portion_size=None):
316 self.transport_handler = transport_handler
317 self.max_portion_size = max_portion_size or self.MAX_PORTION_SIZE
318
319 def __enter__(self):
320 self.queue = []
321 return self
322
323 def __exit__(self, _exc_type, _exc_value, _exc_traceback):
324 if any((_exc_type, _exc_value, _exc_traceback)):
325 error = '{0}: {1}'.format(_exc_type.__name__, _exc_value)
326 raise ZipkinError(error)
327 else:
328 self.flush()
329
330 def add_span(
331 self,
332 span_id,
333 parent_span_id,
334 trace_id,
335 span_name,
336 annotations,
337 binary_annotations,
338 timestamp_s,
339 duration_s,
340 ):
341 thrift_span = create_span(
342 span_id,
343 parent_span_id,
344 trace_id,
345 span_name,
346 annotations,
347 binary_annotations,
348 timestamp_s,
349 duration_s,
350 )
351
352 self.queue.append(thrift_span)
353 if len(self.queue) >= self.max_portion_size:
354 self.flush()
355
356 def flush(self):
357 if self.transport_handler and len(self.queue) > 0:
358 message = thrift_objs_in_bytes(self.queue)
359 self.transport_handler(message)
360 self.queue = []
0 # -*- coding: utf-8 -*-
1 import threading
2
3 _thread_local = threading.local()
4
5
6 def get_thread_local_zipkin_attrs():
7 """A wrapper to return _thread_local.requests
8
9 :returns: list that may contain zipkin attribute tuples
10 :rtype: list
11 """
12 if not hasattr(_thread_local, 'zipkin_attrs'):
13 _thread_local.zipkin_attrs = []
14 return _thread_local.zipkin_attrs
15
16
17 def get_zipkin_attrs():
18 """Get the topmost level zipkin attributes stored.
19
20 :returns: tuple containing zipkin attrs
21 :rtype: :class:`zipkin.ZipkinAttrs`
22 """
23 zipkin_attrs = get_thread_local_zipkin_attrs()
24 if zipkin_attrs:
25 return zipkin_attrs[-1]
26
27
28 def pop_zipkin_attrs():
29 """Pop the topmost level zipkin attributes, if present.
30
31 :returns: tuple containing zipkin attrs
32 :rtype: :class:`zipkin.ZipkinAttrs`
33 """
34 zipkin_attrs = get_thread_local_zipkin_attrs()
35 if zipkin_attrs:
36 return zipkin_attrs.pop()
37
38
39 def push_zipkin_attrs(zipkin_attr):
40 """Stores the zipkin attributes to thread local.
41
42 :param zipkin_attr: tuple containing zipkin related attrs
43 :type zipkin_attr: :class:`zipkin.ZipkinAttrs`
44 """
45 get_thread_local_zipkin_attrs().append(zipkin_attr)
0 # -*- coding: utf-8 -*-
1 import os
2 import socket
3 import struct
4
5 import thriftpy
6 from thriftpy.protocol.binary import TBinaryProtocol
7 from thriftpy.protocol.binary import write_list_begin
8 from thriftpy.thrift import TType
9 from thriftpy.transport import TMemoryBuffer
10
11 from py_zipkin.util import unsigned_hex_to_signed_int
12
13
14 thrift_filepath = os.path.join(os.path.dirname(__file__), 'zipkinCore.thrift')
15 zipkin_core = thriftpy.load(thrift_filepath, module_name="zipkinCore_thrift")
16
17 SERVER_ADDR_VAL = '\x01'
18
19 dummy_endpoint = zipkin_core.Endpoint()
20
21
22 def create_annotation(timestamp, value, host):
23 """
24 Create a zipkin annotation object
25
26 :param timestamp: timestamp of when the annotation occured in microseconds
27 :param value: name of the annotation, such as 'sr'
28 :param host: zipkin endpoint object
29
30 :returns: zipkin annotation object
31 """
32 return zipkin_core.Annotation(timestamp=timestamp, value=value, host=host)
33
34
35 def create_binary_annotation(key, value, annotation_type, host):
36 """
37 Create a zipkin binary annotation object
38
39 :param key: name of the annotation, such as 'http.uri'
40 :param value: value of the annotation, such as a URI
41 :param annotation_type: type of annotation, such as AnnotationType.I32
42 :param host: zipkin endpoint object
43
44 :returns: zipkin binary annotation object
45 """
46 return zipkin_core.BinaryAnnotation(
47 key=key,
48 value=value,
49 annotation_type=annotation_type,
50 host=host,
51 )
52
53
54 def create_endpoint(port=0, service_name='unknown', host=None):
55 """Create a zipkin Endpoint object.
56
57 An Endpoint object holds information about the network context of a span.
58
59 :param port: int value of the port. Defaults to 0
60 :param service_name: service name as a str. Defaults to 'unknown'
61 :param host: string containing ipv4 value of the host, if not provided,
62 host is determined automatically
63 :returns: zipkin Endpoint object
64 """
65 if host is None:
66 try:
67 host = socket.gethostbyname(socket.gethostname())
68 except socket.gaierror:
69 host = '127.0.0.1'
70 # Convert ip address to network byte order
71 ipv4 = struct.unpack('!i', socket.inet_aton(host))[0]
72 # Zipkin passes unsigned values in signed types because Thrift has no
73 # unsigned types, so we have to convert the value.
74 port = struct.unpack('h', struct.pack('H', port))[0]
75 return zipkin_core.Endpoint(
76 ipv4=ipv4,
77 port=port,
78 service_name=service_name,
79 )
80
81
82 def copy_endpoint_with_new_service_name(endpoint, service_name):
83 """Copies a copy of a given endpoint with a new service name.
84 This should be very fast, on the order of several microseconds.
85
86 :param endpoint: existing zipkin_core.Endpoint object
87 :param service_name: str of new service name
88 :returns: zipkin Endpoint object
89 """
90 return zipkin_core.Endpoint(
91 ipv4=endpoint.ipv4,
92 port=endpoint.port,
93 service_name=service_name,
94 )
95
96
97 def annotation_list_builder(annotations, host):
98 """
99 Reformat annotations dict to return list of corresponding zipkin_core objects.
100
101 :param annotations: dict containing key as annotation name,
102 value being timestamp in seconds(float).
103 :type host: :class:`zipkin_core.Endpoint`
104 :returns: a list of annotation zipkin_core objects
105 :rtype: list
106 """
107 return [
108 create_annotation(int(timestamp * 1000000), key, host)
109 for key, timestamp in annotations.items()
110 ]
111
112
113 def binary_annotation_list_builder(binary_annotations, host):
114 """
115 Reformat binary annotations dict to return list of zipkin_core objects. The
116 value of the binary annotations MUST be in string format.
117
118 :param binary_annotations: dict with key, value being the name and value
119 of the binary annotation being logged.
120 :type host: :class:`zipkin_core.Endpoint`
121 :returns: a list of binary annotation zipkin_core objects
122 :rtype: list
123 """
124 # TODO: Remove the type hard-coding of STRING to take it as a param option.
125 ann_type = zipkin_core.AnnotationType.STRING
126 return [
127 create_binary_annotation(key, str(value), ann_type, host)
128 for key, value in binary_annotations.items()
129 ]
130
131
132 def create_span(
133 span_id,
134 parent_span_id,
135 trace_id,
136 span_name,
137 annotations,
138 binary_annotations,
139 timestamp_s,
140 duration_s,
141 ):
142 """Takes a bunch of span attributes and returns a thriftpy representation
143 of the span. Timestamps passed in are in seconds, they're converted to
144 microseconds before thrift encoding.
145 """
146 # Check if trace_id is 128-bit. If so, record trace_id_high separately.
147 trace_id_length = len(trace_id)
148 trace_id_high = None
149 if trace_id_length > 16:
150 assert trace_id_length == 32
151 trace_id, trace_id_high = trace_id[16:], trace_id[:16]
152
153 if trace_id_high:
154 trace_id_high = unsigned_hex_to_signed_int(trace_id_high)
155
156 span_dict = {
157 'trace_id': unsigned_hex_to_signed_int(trace_id),
158 'name': span_name,
159 'id': unsigned_hex_to_signed_int(span_id),
160 'annotations': annotations,
161 'binary_annotations': binary_annotations,
162 'timestamp': int(timestamp_s * 1000000) if timestamp_s else None,
163 'duration': int(duration_s * 1000000) if duration_s else None,
164 'trace_id_high': trace_id_high,
165 }
166 if parent_span_id:
167 span_dict['parent_id'] = unsigned_hex_to_signed_int(parent_span_id)
168 return zipkin_core.Span(**span_dict)
169
170
171 def thrift_objs_in_bytes(thrift_obj_list): # pragma: no cover
172 """
173 Returns TBinaryProtocol encoded Thrift objects.
174
175 :param thrift_obj_list: thrift objects list to encode
176 :returns: thrift objects in TBinaryProtocol format bytes.
177 """
178 transport = TMemoryBuffer()
179 protocol = TBinaryProtocol(transport)
180 write_list_begin(transport, TType.STRUCT, len(thrift_obj_list))
181 for thrift_obj in thrift_obj_list:
182 thrift_obj.write(protocol)
183
184 return bytes(transport.getvalue())
0 # Copyright 2012 Twitter Inc.
1 #
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at
5 #
6 # http://www.apache.org/licenses/LICENSE-2.0
7 #
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
13 namespace java com.twitter.zipkin.thriftjava
14 #@namespace scala com.twitter.zipkin.thriftscala
15 namespace rb Zipkin
16
17 #************** Annotation.value **************
18 /**
19 * The client sent ("cs") a request to a server. There is only one send per
20 * span. For example, if there's a transport error, each attempt can be logged
21 * as a WIRE_SEND annotation.
22 *
23 * If chunking is involved, each chunk could be logged as a separate
24 * CLIENT_SEND_FRAGMENT in the same span.
25 *
26 * Annotation.host is not the server. It is the host which logged the send
27 * event, almost always the client. When logging CLIENT_SEND, instrumentation
28 * should also log the SERVER_ADDR.
29 */
30 const string CLIENT_SEND = "cs"
31 /**
32 * The client received ("cr") a response from a server. There is only one
33 * receive per span. For example, if duplicate responses were received, each
34 * can be logged as a WIRE_RECV annotation.
35 *
36 * If chunking is involved, each chunk could be logged as a separate
37 * CLIENT_RECV_FRAGMENT in the same span.
38 *
39 * Annotation.host is not the server. It is the host which logged the receive
40 * event, almost always the client. The actual endpoint of the server is
41 * recorded separately as SERVER_ADDR when CLIENT_SEND is logged.
42 */
43 const string CLIENT_RECV = "cr"
44 /**
45 * The server sent ("ss") a response to a client. There is only one response
46 * per span. If there's a transport error, each attempt can be logged as a
47 * WIRE_SEND annotation.
48 *
49 * Typically, a trace ends with a server send, so the last timestamp of a trace
50 * is often the timestamp of the root span's server send.
51 *
52 * If chunking is involved, each chunk could be logged as a separate
53 * SERVER_SEND_FRAGMENT in the same span.
54 *
55 * Annotation.host is not the client. It is the host which logged the send
56 * event, almost always the server. The actual endpoint of the client is
57 * recorded separately as CLIENT_ADDR when SERVER_RECV is logged.
58 */
59 const string SERVER_SEND = "ss"
60 /**
61 * The server received ("sr") a request from a client. There is only one
62 * request per span. For example, if duplicate responses were received, each
63 * can be logged as a WIRE_RECV annotation.
64 *
65 * Typically, a trace starts with a server receive, so the first timestamp of a
66 * trace is often the timestamp of the root span's server receive.
67 *
68 * If chunking is involved, each chunk could be logged as a separate
69 * SERVER_RECV_FRAGMENT in the same span.
70 *
71 * Annotation.host is not the client. It is the host which logged the receive
72 * event, almost always the server. When logging SERVER_RECV, instrumentation
73 * should also log the CLIENT_ADDR.
74 */
75 const string SERVER_RECV = "sr"
76 /**
77 * Optionally logs an attempt to send a message on the wire. Multiple wire send
78 * events could indicate network retries. A lag between client or server send
79 * and wire send might indicate queuing or processing delay.
80 */
81 const string WIRE_SEND = "ws"
82 /**
83 * Optionally logs an attempt to receive a message from the wire. Multiple wire
84 * receive events could indicate network retries. A lag between wire receive
85 * and client or server receive might indicate queuing or processing delay.
86 */
87 const string WIRE_RECV = "wr"
88 /**
89 * Optionally logs progress of a (CLIENT_SEND, WIRE_SEND). For example, this
90 * could be one chunk in a chunked request.
91 */
92 const string CLIENT_SEND_FRAGMENT = "csf"
93 /**
94 * Optionally logs progress of a (CLIENT_RECV, WIRE_RECV). For example, this
95 * could be one chunk in a chunked response.
96 */
97 const string CLIENT_RECV_FRAGMENT = "crf"
98 /**
99 * Optionally logs progress of a (SERVER_SEND, WIRE_SEND). For example, this
100 * could be one chunk in a chunked response.
101 */
102 const string SERVER_SEND_FRAGMENT = "ssf"
103 /**
104 * Optionally logs progress of a (SERVER_RECV, WIRE_RECV). For example, this
105 * could be one chunk in a chunked request.
106 */
107 const string SERVER_RECV_FRAGMENT = "srf"
108
109 #***** BinaryAnnotation.key ******
110 /**
111 * The domain portion of the URL or host header. Ex. "mybucket.s3.amazonaws.com"
112 *
113 * Used to filter by host as opposed to ip address.
114 */
115 const string HTTP_HOST = "http.host"
116
117 /**
118 * The HTTP method, or verb, such as "GET" or "POST".
119 *
120 * Used to filter against an http route.
121 */
122 const string HTTP_METHOD = "http.method"
123
124 /**
125 * The absolute http path, without any query parameters. Ex. "/objects/abcd-ff"
126 *
127 * Used to filter against an http route, portably with zipkin v1.
128 *
129 * In zipkin v1, only equals filters are supported. Dropping query parameters makes the number
130 * of distinct URIs less. For example, one can query for the same resource, regardless of signing
131 * parameters encoded in the query line. This does not reduce cardinality to a HTTP single route.
132 * For example, it is common to express a route as an http URI template like
133 * "/resource/{resource_id}". In systems where only equals queries are available, searching for
134 * http/path=/resource won't match if the actual request was /resource/abcd-ff.
135 *
136 * Historical note: This was commonly expressed as "http.uri" in zipkin, eventhough it was most
137 * often just a path.
138 */
139 const string HTTP_PATH = "http.path"
140
141 /**
142 * The entire URL, including the scheme, host and query parameters if available. Ex.
143 * "https://mybucket.s3.amazonaws.com/objects/abcd-ff?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Algorithm=AWS4-HMAC-SHA256..."
144 *
145 * Combined with HTTP_METHOD, you can understand the fully-qualified request line.
146 *
147 * This is optional as it may include private data or be of considerable length.
148 */
149 const string HTTP_URL = "http.url"
150
151 /**
152 * The HTTP status code, when not in 2xx range. Ex. "503"
153 *
154 * Used to filter for error status.
155 */
156 const string HTTP_STATUS_CODE = "http.status_code"
157
158 /**
159 * The size of the non-empty HTTP request body, in bytes. Ex. "16384"
160 *
161 * Large uploads can exceed limits or contribute directly to latency.
162 */
163 const string HTTP_REQUEST_SIZE = "http.request.size"
164
165 /**
166 * The size of the non-empty HTTP response body, in bytes. Ex. "16384"
167 *
168 * Large downloads can exceed limits or contribute directly to latency.
169 */
170 const string HTTP_RESPONSE_SIZE = "http.response.size"
171
172 /**
173 * The value of "lc" is the component or namespace of a local span.
174 *
175 * BinaryAnnotation.host adds service context needed to support queries.
176 *
177 * Local Component("lc") supports three key features: flagging, query by
178 * service and filtering Span.name by namespace.
179 *
180 * While structurally the same, local spans are fundamentally different than
181 * RPC spans in how they should be interpreted. For example, zipkin v1 tools
182 * center on RPC latency and service graphs. Root local-spans are neither
183 * indicative of critical path RPC latency, nor have impact on the shape of a
184 * service graph. By flagging with "lc", tools can special-case local spans.
185 *
186 * Zipkin v1 Spans are unqueryable unless they can be indexed by service name.
187 * The only path to a service name is by (Binary)?Annotation.host.serviceName.
188 * By logging "lc", a local span can be queried even if no other annotations
189 * are logged.
190 *
191 * The value of "lc" is the namespace of Span.name. For example, it might be
192 * "finatra2", for a span named "bootstrap". "lc" allows you to resolves
193 * conflicts for the same Span.name, for example "finatra/bootstrap" vs
194 * "finch/bootstrap". Using local component, you'd search for spans named
195 * "bootstrap" where "lc=finch"
196 */
197 const string LOCAL_COMPONENT = "lc"
198
199 #***** Annotation.value or BinaryAnnotation.key ******
200 /**
201 * When an annotation value, this indicates when an error occurred. When a
202 * binary annotation key, the value is a human readable message associated
203 * with an error.
204 *
205 * Due to transient errors, an ERROR annotation should not be interpreted
206 * as a span failure, even the annotation might explain additional latency.
207 * Instrumentation should add the ERROR binary annotation when the operation
208 * failed and couldn't be recovered.
209 *
210 * Here's an example: A span has an ERROR annotation, added when a WIRE_SEND
211 * failed. Another WIRE_SEND succeeded, so there's no ERROR binary annotation
212 * on the span because the overall operation succeeded.
213 *
214 * Note that RPC spans often include both client and server hosts: It is
215 * possible that only one side perceived the error.
216 */
217 const string ERROR = "error"
218
219 #***** BinaryAnnotation.key where value = [1] and annotation_type = BOOL ******
220 /**
221 * Indicates a client address ("ca") in a span. Most likely, there's only one.
222 * Multiple addresses are possible when a client changes its ip or port within
223 * a span.
224 */
225 const string CLIENT_ADDR = "ca"
226 /**
227 * Indicates a server address ("sa") in a span. Most likely, there's only one.
228 * Multiple addresses are possible when a client is redirected, or fails to a
229 * different server ip or port.
230 */
231 const string SERVER_ADDR = "sa"
232
233 /**
234 * Indicates the network context of a service recording an annotation with two
235 * exceptions.
236 *
237 * When a BinaryAnnotation, and key is CLIENT_ADDR or SERVER_ADDR,
238 * the endpoint indicates the source or destination of an RPC. This exception
239 * allows zipkin to display network context of uninstrumented services, or
240 * clients such as web browsers.
241 */
242 struct Endpoint {
243 /**
244 * IPv4 host address packed into 4 bytes.
245 *
246 * Ex for the ip 1.2.3.4, it would be (1 << 24) | (2 << 16) | (3 << 8) | 4
247 */
248 1: i32 ipv4
249 /**
250 * IPv4 port or 0, if unknown.
251 *
252 * Note: this is to be treated as an unsigned integer, so watch for negatives.
253 */
254 2: i16 port
255 /**
256 * Classifier of a source or destination in lowercase, such as "zipkin-web".
257 *
258 * This is the primary parameter for trace lookup, so should be intuitive as
259 * possible, for example, matching names in service discovery.
260 *
261 * Conventionally, when the service name isn't known, service_name = "unknown".
262 * However, it is also permissible to set service_name = "" (empty string).
263 * The difference in the latter usage is that the span will not be queryable
264 * by service name unless more information is added to the span with non-empty
265 * service name, e.g. an additional annotation from the server.
266 *
267 * Particularly clients may not have a reliable service name at ingest. One
268 * approach is to set service_name to "" at ingest, and later assign a
269 * better label based on binary annotations, such as user agent.
270 */
271 3: string service_name
272 /**
273 * IPv6 host address packed into 16 bytes. Ex Inet6Address.getBytes()
274 */
275 4: optional binary ipv6
276 }
277
278 /**
279 * Associates an event that explains latency with a timestamp.
280 *
281 * Unlike log statements, annotations are often codes: for example "sr".
282 */
283 struct Annotation {
284 /**
285 * Microseconds from epoch.
286 *
287 * This value should use the most precise value possible. For example,
288 * gettimeofday or multiplying currentTimeMillis by 1000.
289 */
290 1: i64 timestamp
291 /**
292 * Usually a short tag indicating an event, like "sr" or "finagle.retry".
293 */
294 2: string value
295 /**
296 * The host that recorded the value, primarily for query by service name.
297 */
298 3: optional Endpoint host
299 // don't reuse 4: optional i32 OBSOLETE_duration // how long did the operation take? microseconds
300 }
301
302 /**
303 * A subset of thrift base types, except BYTES.
304 */
305 enum AnnotationType {
306 /**
307 * Set to 0x01 when key is CLIENT_ADDR or SERVER_ADDR
308 */
309 BOOL,
310 /**
311 * No encoding, or type is unknown.
312 */
313 BYTES,
314 I16,
315 I32,
316 I64,
317 DOUBLE,
318 /**
319 * the only type zipkin v1 supports search against.
320 */
321 STRING
322 }
323
324 /**
325 * Binary annotations are tags applied to a Span to give it context. For
326 * example, a binary annotation of HTTP_PATH ("http.path") could the path
327 * to a resource in a RPC call.
328 *
329 * Binary annotations of type STRING are always queryable, though more a
330 * historical implementation detail than a structural concern.
331 *
332 * Binary annotations can repeat, and vary on the host. Similar to Annotation,
333 * the host indicates who logged the event. This allows you to tell the
334 * difference between the client and server side of the same key. For example,
335 * the key "http.path" might be different on the client and server side due to
336 * rewriting, like "/api/v1/myresource" vs "/myresource. Via the host field,
337 * you can see the different points of view, which often help in debugging.
338 */
339 struct BinaryAnnotation {
340 /**
341 * Name used to lookup spans, such as "http.path" or "finagle.version".
342 */
343 1: string key,
344 /**
345 * Serialized thrift bytes, in TBinaryProtocol format.
346 *
347 * For legacy reasons, byte order is big-endian. See THRIFT-3217.
348 */
349 2: binary value,
350 /**
351 * The thrift type of value, most often STRING.
352 *
353 * annotation_type shouldn't vary for the same key.
354 */
355 3: AnnotationType annotation_type,
356 /**
357 * The host that recorded value, allowing query by service name or address.
358 *
359 * There are two exceptions: when key is "ca" or "sa", this is the source or
360 * destination of an RPC. This exception allows zipkin to display network
361 * context of uninstrumented services, such as browsers or databases.
362 */
363 4: optional Endpoint host
364 }
365
366 /**
367 * A trace is a series of spans (often RPC calls) which form a latency tree.
368 *
369 * Spans are usually created by instrumentation in RPC clients or servers, but
370 * can also represent in-process activity. Annotations in spans are similar to
371 * log statements, and are sometimes created directly by application developers
372 * to indicate events of interest, such as a cache miss.
373 *
374 * The root span is where parent_id = Nil; it usually has the longest duration
375 * in the trace.
376 *
377 * Span identifiers are packed into i64s, but should be treated opaquely.
378 * String encoding is fixed-width lower-hex, to avoid signed interpretation.
379 */
380 struct Span {
381 /**
382 * Unique 8-byte identifier for a trace, set on all spans within it.
383 */
384 1: i64 trace_id
385 /**
386 * Span name in lowercase, rpc method for example. Conventionally, when the
387 * span name isn't known, name = "unknown".
388 */
389 3: string name,
390 /**
391 * Unique 8-byte identifier of this span within a trace. A span is uniquely
392 * identified in storage by (trace_id, id).
393 */
394 4: i64 id,
395 /**
396 * The parent's Span.id; absent if this the root span in a trace.
397 */
398 5: optional i64 parent_id,
399 /**
400 * Associates events that explain latency with a timestamp. Unlike log
401 * statements, annotations are often codes: for example SERVER_RECV("sr").
402 * Annotations are sorted ascending by timestamp.
403 */
404 6: list<Annotation> annotations,
405 /**
406 * Tags a span with context, usually to support query or aggregation. For
407 * example, a binary annotation key could be "http.path".
408 */
409 8: list<BinaryAnnotation> binary_annotations
410 /**
411 * True is a request to store this span even if it overrides sampling policy.
412 */
413 9: optional bool debug = 0
414 /**
415 * Epoch microseconds of the start of this span, absent if this an incomplete
416 * span.
417 *
418 * This value should be set directly by instrumentation, using the most
419 * precise value possible. For example, gettimeofday or syncing nanoTime
420 * against a tick of currentTimeMillis.
421 *
422 * For compatibilty with instrumentation that precede this field, collectors
423 * or span stores can derive this via Annotation.timestamp.
424 * For example, SERVER_RECV.timestamp or CLIENT_SEND.timestamp.
425 *
426 * Timestamp is nullable for input only. Spans without a timestamp cannot be
427 * presented in a timeline: Span stores should not output spans missing a
428 * timestamp.
429 *
430 * There are two known edge-cases where this could be absent: both cases
431 * exist when a collector receives a span in parts and a binary annotation
432 * precedes a timestamp. This is possible when..
433 * - The span is in-flight (ex not yet received a timestamp)
434 * - The span's start event was lost
435 */
436 10: optional i64 timestamp,
437 /**
438 * Measurement in microseconds of the critical path, if known. Durations of
439 * less than one microsecond must be rounded up to 1 microsecond.
440 *
441 * This value should be set directly, as opposed to implicitly via annotation
442 * timestamps. Doing so encourages precision decoupled from problems of
443 * clocks, such as skew or NTP updates causing time to move backwards.
444 *
445 * For compatibility with instrumentation that precede this field, collectors
446 * or span stores can derive this by subtracting Annotation.timestamp.
447 * For example, SERVER_SEND.timestamp - SERVER_RECV.timestamp.
448 *
449 * If this field is persisted as unset, zipkin will continue to work, except
450 * duration query support will be implementation-specific. Similarly, setting
451 * this field non-atomically is implementation-specific.
452 *
453 * This field is i64 vs i32 to support spans longer than 35 minutes.
454 */
455 11: optional i64 duration
456 /**
457 * Optional unique 8-byte additional identifier for a trace. If non zero, this
458 * means the trace uses 128 bit traceIds instead of 64 bit.
459 */
460 12: optional i64 trace_id_high
461 }
0 # -*- coding: utf-8 -*-
1 import codecs
2 import os
3 import struct
4
5
6 def generate_random_64bit_string():
7 """Returns a 64 bit UTF-8 encoded string. In the interests of simplicity,
8 this is always cast to a `str` instead of (in py2 land) a unicode string.
9 Certain clients (I'm looking at you, Twisted) don't enjoy unicode headers.
10
11 :returns: random 16-character string
12 """
13 return str(codecs.encode(os.urandom(8), 'hex_codec').decode('utf-8'))
14
15
16 def generate_random_128bit_string():
17 """Returns a 128 bit UTF-8 encoded string. Follows the same conventions
18 as generate_random_64bit_string().
19
20 :returns: random 32-character string
21 """
22 return str(codecs.encode(os.urandom(16), 'hex_codec').decode('utf-8'))
23
24
25 def unsigned_hex_to_signed_int(hex_string):
26 """Converts a 64-bit hex string to a signed int value.
27
28 This is due to the fact that Apache Thrift only has signed values.
29
30 Examples:
31 '17133d482ba4f605' => 1662740067609015813
32 'b6dbb1c2b362bf51' => -5270423489115668655
33
34 :param hex_string: the string representation of a zipkin ID
35 :returns: signed int representation
36 """
37 return struct.unpack('q', struct.pack('Q', int(hex_string, 16)))[0]
38
39
40 def signed_int_to_unsigned_hex(signed_int):
41 """Converts a signed int value to a 64-bit hex string.
42
43 Examples:
44 1662740067609015813 => '17133d482ba4f605'
45 -5270423489115668655 => 'b6dbb1c2b362bf51'
46
47 :param signed_int: an int to convert
48 :returns: unsigned hex string
49 """
50 hex_string = hex(struct.unpack('Q', struct.pack('q', signed_int))[0])[2:]
51 if hex_string.endswith('L'):
52 return hex_string[:-1]
53 return hex_string
0 # -*- coding: utf-8 -*-
1 import functools
2 import random
3 import time
4 from collections import namedtuple
5
6 from py_zipkin.exception import ZipkinError
7 from py_zipkin.logging_helper import zipkin_logger
8 from py_zipkin.logging_helper import ZipkinLoggerHandler
9 from py_zipkin.logging_helper import ZipkinLoggingContext
10 from py_zipkin.thread_local import get_zipkin_attrs
11 from py_zipkin.thread_local import pop_zipkin_attrs
12 from py_zipkin.thread_local import push_zipkin_attrs
13 from py_zipkin.thrift import SERVER_ADDR_VAL
14 from py_zipkin.thrift import create_binary_annotation
15 from py_zipkin.thrift import create_endpoint
16 from py_zipkin.thrift import zipkin_core
17 from py_zipkin.util import generate_random_64bit_string
18 from py_zipkin.util import generate_random_128bit_string
19
20
21 """
22 Holds the basic attributes needed to log a zipkin trace
23
24 :param trace_id: Unique trace id
25 :param span_id: Span Id of the current request span
26 :param parent_span_id: Parent span Id of the current request span
27 :param flags: stores flags header. Currently unused
28 :param is_sampled: pre-computed boolean whether the trace should be logged
29 """
30 ZipkinAttrs = namedtuple(
31 'ZipkinAttrs',
32 ['trace_id', 'span_id', 'parent_span_id', 'flags', 'is_sampled'],
33 )
34
35
36 STANDARD_ANNOTATIONS = {
37 'client': {'cs', 'cr'},
38 'server': {'ss', 'sr'},
39 }
40 STANDARD_ANNOTATIONS_KEYS = frozenset(STANDARD_ANNOTATIONS.keys())
41
42
43 class zipkin_span(object):
44 """Context manager/decorator for all of your zipkin tracing needs.
45
46 Usage #1: Start a trace with a given sampling rate
47
48 This begins the zipkin trace and also records the root span. The required
49 params are service_name, transport_handler, and sample_rate.
50
51 # Start a trace with do_stuff() as the root span
52 def some_batch_job(a, b):
53 with zipkin_span(
54 service_name='my_service',
55 span_name='my_span_name',
56 transport_handler=some_handler,
57 port=22,
58 sample_rate=0.05,
59 ):
60 do_stuff()
61
62 Usage #2: Trace a service call.
63
64 The typical use case is instrumenting a framework like Pyramid or Django. Only
65 ss and sr times are recorded for the root span. Required params are
66 service_name, zipkin_attrs, transport_handler, and port.
67
68 # Used in a pyramid tween
69 def tween(request):
70 zipkin_attrs = some_zipkin_attr_creator(request)
71 with zipkin_span(
72 service_name='my_service,'
73 span_name='my_span_name',
74 zipkin_attrs=zipkin_attrs,
75 transport_handler=some_handler,
76 port=22,
77 ) as zipkin_context:
78 response = handler(request)
79 zipkin_context.update_binary_annotations(
80 some_binary_annotations)
81 return response
82
83 Usage #3: Log a span within the context of a zipkin trace
84
85 If you're already in a zipkin trace, you can use this to log a span inside. The
86 only required param is service_name. If you're not in a zipkin trace, this
87 won't do anything.
88
89 # As a decorator
90 @zipkin_span(service_name='my_service', span_name='my_function')
91 def my_function():
92 do_stuff()
93
94 # As a context manager
95 def my_function():
96 with zipkin_span(service_name='my_service', span_name='do_stuff'):
97 do_stuff()
98 """
99
100 def __init__(
101 self,
102 service_name,
103 span_name='span',
104 zipkin_attrs=None,
105 transport_handler=None,
106 max_span_batch_size=None,
107 annotations=None,
108 binary_annotations=None,
109 port=0,
110 sample_rate=None,
111 include=('client', 'server'),
112 add_logging_annotation=False,
113 report_root_timestamp=False,
114 use_128bit_trace_id=False,
115 host=None
116 ):
117 """Logs a zipkin span. If this is the root span, then a zipkin
118 trace is started as well.
119
120 :param service_name: The name of the called service
121 :type service_name: string
122 :param span_name: Optional name of span, defaults to 'span'
123 :type span_name: string
124 :param zipkin_attrs: Optional set of zipkin attributes to be used
125 :type zipkin_attrs: ZipkinAttrs
126 :param transport_handler: Callback function that takes a message parameter
127 and handles logging it
128 :type transport_handler: function
129 :param max_span_batch_size: Spans in a trace are sent in batches,
130 max_span_batch_size defines max size of one batch
131 :type max_span_batch_size: int
132 :param annotations: Optional dict of str -> timestamp annotations
133 :type annotations: dict of str -> int
134 :param binary_annotations: Optional dict of str -> str span attrs
135 :type binary_annotations: dict of str -> str
136 :param port: The port number of the service. Defaults to 0.
137 :type port: int
138 :param sample_rate: Rate at which to sample; 0.0 - 100.0. If passed-in
139 zipkin_attrs have is_sampled=False and the sample_rate param is > 0,
140 a new span will be generated at this rate. This means that if you
141 propagate sampling decisions to downstream services, but still have
142 sample_rate > 0 in those services, the actual rate of generated
143 spans for those services will be > sampling_rate.
144 :type sample_rate: float
145 :param include: which annotations to include
146 can be one of {'client', 'server'}
147 corresponding to ('cs', 'cr') and ('ss', 'sr') respectively
148 :type include: iterable
149 :param add_logging_annotation: Whether to add a 'logging_end'
150 annotation when py_zipkin finishes logging spans
151 :type add_logging_annotation: boolean
152 :param report_root_timestamp: Whether the span should report timestamp
153 and duration. Only applies to "root" spans in this local context,
154 so spans created inside other span contexts will always log
155 timestamp/duration. Note that this is only an override for spans
156 that have zipkin_attrs passed in. Spans that make their own
157 sampling decisions (i.e. are the root spans of entire traces) will
158 always report timestamp/duration.
159 :type report_root_timestamp: boolean
160 :param use_128bit_trace_id: If true, generate 128-bit trace_ids
161 :type use_128bit_trace_id: boolean
162 :param host: Contains the ipv4 value of the host. The ipv4 value isn't
163 automatically determined in a docker environment
164 :type host: string
165 """
166 self.service_name = service_name
167 self.span_name = span_name
168 self.zipkin_attrs = zipkin_attrs
169 self.transport_handler = transport_handler
170 self.max_span_batch_size = max_span_batch_size
171 self.annotations = annotations or {}
172 self.binary_annotations = binary_annotations or {}
173 self.port = port
174 self.logging_context = None
175 self.sample_rate = sample_rate
176 self.include = include
177 self.add_logging_annotation = add_logging_annotation
178 self.report_root_timestamp_override = report_root_timestamp
179 self.use_128bit_trace_id = use_128bit_trace_id
180 self.host = host
181 self.logging_configured = False
182
183 # Spans that log a 'cs' timestamp can additionally record
184 # 'sa' binary annotations that show where the request is going.
185 # This holds a list of 'sa' binary annotations.
186 self.sa_binary_annotations = []
187
188 # Validation checks
189 if self.zipkin_attrs or self.sample_rate is not None:
190 if self.transport_handler is None:
191 raise ZipkinError(
192 'Root spans require a transport handler to be given')
193
194 if self.sample_rate is not None and not (0.0 <= self.sample_rate <= 100.0):
195 raise ZipkinError('Sample rate must be between 0.0 and 100.0')
196
197 if not set(include).issubset(STANDARD_ANNOTATIONS_KEYS):
198 raise ZipkinError(
199 'Only %s are supported as annotations' %
200 STANDARD_ANNOTATIONS_KEYS
201 )
202 else:
203 # get a list of all of the mapped annotations
204 self.annotation_filter = set()
205 for include_name in include:
206 self.annotation_filter.update(STANDARD_ANNOTATIONS[include_name])
207
208 def __call__(self, f):
209 @functools.wraps(f)
210 def decorated(*args, **kwargs):
211 with zipkin_span(
212 service_name=self.service_name,
213 span_name=self.span_name,
214 zipkin_attrs=self.zipkin_attrs,
215 transport_handler=self.transport_handler,
216 annotations=self.annotations,
217 binary_annotations=self.binary_annotations,
218 port=self.port,
219 sample_rate=self.sample_rate,
220 include=self.include,
221 host=self.host
222 ):
223 return f(*args, **kwargs)
224 return decorated
225
226 def __enter__(self):
227 return self.start()
228
229 def start(self):
230 """Enter the new span context. All annotations logged inside this
231 context will be attributed to this span. All new spans generated
232 inside this context will have this span as their parent.
233
234 In the unsampled case, this context still generates new span IDs and
235 pushes them onto the threadlocal stack, so downstream services calls
236 made will pass the correct headers. However, the logging handler is
237 never attached in the unsampled case, so the spans are never logged.
238 """
239 self.do_pop_attrs = False
240 # If zipkin_attrs are passed in or this span is doing its own sampling,
241 # it will need to actually log spans at __exit__.
242 self.perform_logging = self.zipkin_attrs or self.sample_rate is not None
243 report_root_timestamp = False
244
245 if self.sample_rate is not None:
246 if self.zipkin_attrs and not self.zipkin_attrs.is_sampled:
247 report_root_timestamp = True
248 self.zipkin_attrs = create_attrs_for_span(
249 sample_rate=self.sample_rate,
250 trace_id=self.zipkin_attrs.trace_id,
251 use_128bit_trace_id=self.use_128bit_trace_id,
252 )
253 elif not self.zipkin_attrs:
254 report_root_timestamp = True
255 self.zipkin_attrs = create_attrs_for_span(
256 sample_rate=self.sample_rate,
257 use_128bit_trace_id=self.use_128bit_trace_id,
258 )
259
260 if not self.zipkin_attrs:
261 # This span is inside the context of an existing trace
262 existing_zipkin_attrs = get_zipkin_attrs()
263 if existing_zipkin_attrs:
264 self.zipkin_attrs = ZipkinAttrs(
265 trace_id=existing_zipkin_attrs.trace_id,
266 span_id=generate_random_64bit_string(),
267 parent_span_id=existing_zipkin_attrs.span_id,
268 flags=existing_zipkin_attrs.flags,
269 is_sampled=existing_zipkin_attrs.is_sampled,
270 )
271
272 # If zipkin_attrs are not set up by now, that means this span is not
273 # configured to perform logging itself, and it's not in an existing
274 # Zipkin trace. That means there's nothing else to do and it can exit
275 # early.
276 if not self.zipkin_attrs:
277 return self
278
279 push_zipkin_attrs(self.zipkin_attrs)
280 self.do_pop_attrs = True
281
282 self.start_timestamp = time.time()
283
284 if self.perform_logging:
285 # Don't set up any logging if we're not sampling
286 if not self.zipkin_attrs.is_sampled:
287 return self
288 endpoint = create_endpoint(self.port, self.service_name, self.host)
289 client_context = set(self.include) == {'client'}
290 self.log_handler = ZipkinLoggerHandler(self.zipkin_attrs)
291 self.logging_context = ZipkinLoggingContext(
292 self.zipkin_attrs,
293 endpoint,
294 self.log_handler,
295 self.span_name,
296 self.transport_handler,
297 report_root_timestamp or self.report_root_timestamp_override,
298 binary_annotations=self.binary_annotations,
299 add_logging_annotation=self.add_logging_annotation,
300 client_context=client_context,
301 max_span_batch_size=self.max_span_batch_size,
302 )
303 self.logging_context.start()
304 self.logging_configured = True
305 return self
306 else:
307 # In the sampled case, patch the ZipkinLoggerHandler.
308 if self.zipkin_attrs.is_sampled:
309 # Be defensive about logging setup. Since ZipkinAttrs are local to
310 # the thread, multithreaded frameworks can get in strange states.
311 # The logging is not going to be correct in these cases, so we set
312 # a flag that turns off logging on __exit__.
313 try:
314 # Assume there's only a single handler, since all logging
315 # should be set up in this package.
316 log_handler = zipkin_logger.handlers[0]
317 except IndexError:
318 return self
319 # Make sure it's not a NullHandler or something
320 if not isinstance(log_handler, ZipkinLoggerHandler):
321 return self
322 # Put span ID on logging handler.
323 self.log_handler = zipkin_logger.handlers[0]
324 # Store the old parent_span_id, probably None, in case we have
325 # nested zipkin_spans
326 self.old_parent_span_id = self.log_handler.parent_span_id
327 self.log_handler.parent_span_id = self.zipkin_attrs.span_id
328 self.logging_configured = True
329
330 return self
331
332 def __exit__(self, _exc_type, _exc_value, _exc_traceback):
333 self.stop(_exc_type, _exc_value, _exc_traceback)
334
335 def stop(self, _exc_type=None, _exc_value=None, _exc_traceback=None):
336 """Exit the span context. Zipkin attrs are pushed onto the
337 threadlocal stack regardless of sampling, so they always need to be
338 popped off. The actual logging of spans depends on sampling and that
339 the logging was correctly set up.
340 """
341 if self.do_pop_attrs:
342 pop_zipkin_attrs()
343
344 if not self.logging_configured:
345 return
346
347 # Add the error annotation if an exception occurred
348 if any((_exc_type, _exc_value, _exc_traceback)):
349 error_msg = '{0}: {1}'.format(_exc_type.__name__, _exc_value)
350 self.update_binary_annotations({
351 zipkin_core.ERROR: error_msg,
352 })
353
354 # Logging context is only initialized for "root" spans of the local
355 # process (i.e. this zipkin_span not inside of any other local
356 # zipkin_spans)
357 if self.logging_context:
358 self.logging_context.stop()
359 self.logging_context = None
360 return
361
362 # If we've gotten here, that means that this span is a child span of
363 # this context's root span (i.e. it's a zipkin_span inside another
364 # zipkin_span).
365 end_timestamp = time.time()
366
367 self.log_handler.parent_span_id = self.old_parent_span_id
368
369 # We are simulating a full two-part span locally, so set cs=sr and ss=cr
370 full_annotations = {
371 'cs': self.start_timestamp,
372 'sr': self.start_timestamp,
373 'ss': end_timestamp,
374 'cr': end_timestamp,
375 }
376 # But we filter down if we only want to emit some of the annotations
377 filtered_annotations = {
378 k: v for k, v in full_annotations.items()
379 if k in self.annotation_filter
380 }
381
382 self.annotations.update(filtered_annotations)
383
384 self.log_handler.store_local_span(
385 span_name=self.span_name,
386 service_name=self.service_name,
387 annotations=self.annotations,
388 binary_annotations=self.binary_annotations,
389 sa_binary_annotations=self.sa_binary_annotations,
390 span_id=self.zipkin_attrs.span_id,
391 )
392
393 def update_binary_annotations(self, extra_annotations):
394 """Updates the binary annotations for the current span.
395
396 If this trace is not being sampled then this is a no-op.
397 """
398 if not self.zipkin_attrs:
399 return
400 if not self.zipkin_attrs.is_sampled:
401 return
402 if not self.logging_context:
403 # This is not the root span, so binary annotations will be added
404 # to the log handler when this span context exits.
405 self.binary_annotations.update(extra_annotations)
406 else:
407 # Otherwise, we're in the context of the root span, so just update
408 # the binary annotations for the logging context directly.
409 self.logging_context.binary_annotations_dict.update(extra_annotations)
410
411 def add_sa_binary_annotation(
412 self,
413 port=0,
414 service_name='unknown',
415 host='127.0.0.1',
416 ):
417 """Adds a 'sa' binary annotation to the current span.
418
419 'sa' binary annotations are useful for situations where you need to log
420 where a request is going but the destination doesn't support zipkin.
421
422 Note that the span must have 'cs'/'cr' annotations.
423
424 :param port: The port number of the destination
425 :type port: int
426 :param service_name: The name of the destination service
427 :type service_name: str
428 :param host: Host address of the destination
429 :type host: str
430 """
431 if not self.zipkin_attrs or not self.zipkin_attrs.is_sampled:
432 return
433
434 if 'client' not in self.include:
435 # TODO: trying to set a sa binary annotation for a non-client span
436 # should result in a logged error
437 return
438
439 sa_endpoint = create_endpoint(
440 port=port,
441 service_name=service_name,
442 host=host,
443 )
444 sa_binary_annotation = create_binary_annotation(
445 key=zipkin_core.SERVER_ADDR,
446 value=SERVER_ADDR_VAL,
447 annotation_type=zipkin_core.AnnotationType.BOOL,
448 host=sa_endpoint,
449 )
450 if not self.logging_context:
451 self.sa_binary_annotations.append(sa_binary_annotation)
452 else:
453 self.logging_context.sa_binary_annotations.append(sa_binary_annotation)
454
455
456 def _validate_args(kwargs):
457 if 'include' in kwargs:
458 raise ValueError(
459 '"include" is not valid in this context. '
460 'You probably want to use zipkin_span()'
461 )
462
463
464 class zipkin_client_span(zipkin_span):
465 """Logs a client-side zipkin span.
466
467 Subclass of :class:`zipkin_span` using only annotations relevant to clients
468 """
469
470 def __init__(self, *args, **kwargs):
471 """Logs a zipkin span with client annotations.
472
473 See :class:`zipkin_span` for arguments
474 """
475 _validate_args(kwargs)
476
477 kwargs['include'] = ('client',)
478 super(zipkin_client_span, self).__init__(*args, **kwargs)
479
480
481 class zipkin_server_span(zipkin_span):
482 """Logs a server-side zipkin span.
483
484 Subclass of :class:`zipkin_span` using only annotations relevant to servers
485 """
486
487 def __init__(self, *args, **kwargs):
488 """Logs a zipkin span with server annotations.
489
490 See :class:`zipkin_span` for arguments
491 """
492 _validate_args(kwargs)
493
494 kwargs['include'] = ('server',)
495 super(zipkin_server_span, self).__init__(*args, **kwargs)
496
497
498 def create_attrs_for_span(
499 sample_rate=100.0,
500 trace_id=None,
501 span_id=None,
502 use_128bit_trace_id=False,
503 ):
504 """Creates a set of zipkin attributes for a span.
505
506 :param sample_rate: Float between 0.0 and 100.0 to determine sampling rate
507 :type sample_rate: float
508 :param trace_id: Optional 16-character hex string representing a trace_id.
509 If this is None, a random trace_id will be generated.
510 :type trace_id: str
511 :param span_id: Optional 16-character hex string representing a span_id.
512 If this is None, a random span_id will be generated.
513 :type span_id: str
514 """
515 # Calculate if this trace is sampled based on the sample rate
516 if trace_id is None:
517 if use_128bit_trace_id:
518 trace_id = generate_random_128bit_string()
519 else:
520 trace_id = generate_random_64bit_string()
521 if span_id is None:
522 span_id = generate_random_64bit_string()
523 if sample_rate == 0.0:
524 is_sampled = False
525 else:
526 is_sampled = (random.random() * 100) < sample_rate
527
528 return ZipkinAttrs(
529 trace_id=trace_id,
530 span_id=span_id,
531 parent_span_id=None,
532 flags='0',
533 is_sampled=is_sampled,
534 )
535
536
537 def create_http_headers_for_new_span():
538 """
539 Generate the headers for a new zipkin span.
540
541 .. note::
542
543 If the method is not called from within a zipkin_trace conext,
544 empty dict will be returned back.
545
546 :returns: dict containing (X-B3-TraceId, X-B3-SpanId, X-B3-ParentSpanId,
547 X-B3-Flags and X-B3-Sampled) keys OR an empty dict.
548 """
549 zipkin_attrs = get_zipkin_attrs()
550
551 if not zipkin_attrs:
552 return {}
553
554 return {
555 'X-B3-TraceId': zipkin_attrs.trace_id,
556 'X-B3-SpanId': generate_random_64bit_string(),
557 'X-B3-ParentSpanId': zipkin_attrs.span_id,
558 'X-B3-Flags': '0',
559 'X-B3-Sampled': '1' if zipkin_attrs.is_sampled else '0',
560 }
0 Metadata-Version: 1.1
1 Name: py-zipkin
2 Version: 0.9.0
3 Summary: Library for using Zipkin in Python.
4 Home-page: https://github.com/Yelp/py_zipkin
5 Author: Yelp, Inc.
6 Author-email: opensource+py-zipkin@yelp.com
7 License: Copyright Yelp 2017
8 Description: UNKNOWN
9 Platform: UNKNOWN
10 Classifier: Development Status :: 3 - Alpha
11 Classifier: Intended Audience :: Developers
12 Classifier: Topic :: Software Development :: Libraries :: Python Modules
13 Classifier: License :: OSI Approved :: Apache Software License
14 Classifier: Operating System :: OS Independent
15 Classifier: Programming Language :: Python :: 2.7
16 Classifier: Programming Language :: Python :: 3.4
17 Classifier: Programming Language :: Python :: 3.5
18 Classifier: Programming Language :: Python :: 3.6
19 Provides: py_zipkin
0 MANIFEST.in
1 setup.cfg
2 setup.py
3 py_zipkin/__init__.py
4 py_zipkin/exception.py
5 py_zipkin/logging_helper.py
6 py_zipkin/thread_local.py
7 py_zipkin/util.py
8 py_zipkin/zipkin.py
9 py_zipkin.egg-info/PKG-INFO
10 py_zipkin.egg-info/SOURCES.txt
11 py_zipkin.egg-info/dependency_links.txt
12 py_zipkin.egg-info/requires.txt
13 py_zipkin.egg-info/top_level.txt
14 py_zipkin/thrift/__init__.py
15 py_zipkin/thrift/zipkinCore.thrift
0 [wheel]
1 universal = True
2
3 [pep8]
4 ignore = E265,E309,E501
5
6 [egg_info]
7 tag_date = 0
8 tag_build =
9 tag_svn_revision = 0
10
0 #!/usr/bin/python
1 # -*- coding: utf-8 -*-
2 from setuptools import find_packages
3 from setuptools import setup
4
5 __version__ = '0.9.0'
6
7 setup(
8 name='py_zipkin',
9 version=__version__,
10 provides=["py_zipkin"],
11 author='Yelp, Inc.',
12 author_email='opensource+py-zipkin@yelp.com',
13 license='Copyright Yelp 2017',
14 url="https://github.com/Yelp/py_zipkin",
15 description='Library for using Zipkin in Python.',
16 packages=find_packages(exclude=('tests*', 'testing*', 'tools*')),
17 package_data={'': ['*.thrift']},
18 install_requires=[
19 'six',
20 'thriftpy',
21 ],
22 classifiers=[
23 "Development Status :: 3 - Alpha",
24 "Intended Audience :: Developers",
25 "Topic :: Software Development :: Libraries :: Python Modules",
26 "License :: OSI Approved :: Apache Software License",
27 "Operating System :: OS Independent",
28 "Programming Language :: Python :: 2.7",
29 "Programming Language :: Python :: 3.4",
30 "Programming Language :: Python :: 3.5",
31 "Programming Language :: Python :: 3.6",
32 ],
33 )