Codebase list graphite-carbon / upstream/latest
New upstream version 1.1.4 Thomas Goirand 5 years ago
43 changed file(s) with 1707 addition(s) and 609 deletion(s). Raw diff Collapse all Expand all
00 Metadata-Version: 1.1
11 Name: carbon
2 Version: 1.0.2
2 Version: 1.1.4
33 Summary: Backend data caching and persistence daemon for Graphite
44 Home-page: http://graphiteapp.org/
55 Author: Chris Davis
1313 Classifier: Programming Language :: Python
1414 Classifier: Programming Language :: Python :: 2
1515 Classifier: Programming Language :: Python :: 2.7
16 Classifier: Programming Language :: Python :: 2 :: Only
16 Classifier: Programming Language :: Python :: 3
17 Classifier: Programming Language :: Python :: 3.4
18 Classifier: Programming Language :: Python :: 3.5
19 Classifier: Programming Language :: Python :: 3.6
20 Classifier: Programming Language :: Python :: 3.7
21 Classifier: Programming Language :: Python :: Implementation :: CPython
22 Classifier: Programming Language :: Python :: Implementation :: PyPy
0 #!/usr/bin/env python
1 """Copyright 2009 Chris Davis
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License."""
14
15 import sys
16 import os.path
17
18 # Figure out where we're installed
19 BIN_DIR = os.path.dirname(os.path.abspath(__file__))
20 ROOT_DIR = os.path.dirname(BIN_DIR)
21
22 # Make sure that carbon's 'lib' dir is in the $PYTHONPATH if we're running from
23 # source.
24 LIB_DIR = os.path.join(ROOT_DIR, "lib")
25 sys.path.insert(0, LIB_DIR)
26
27 from carbon.util import run_twistd_plugin # noqa
28 from carbon.exceptions import CarbonConfigException # noqa
29
30 try:
31 run_twistd_plugin(__file__)
32 except CarbonConfigException as exc:
33 raise SystemExit(str(exc))
2424 LIB_DIR = os.path.join(ROOT_DIR, "lib")
2525 sys.path.insert(0, LIB_DIR)
2626
27 from carbon.util import run_twistd_plugin
28 from carbon.exceptions import CarbonConfigException
27 from carbon.util import run_twistd_plugin # noqa
28 from carbon.exceptions import CarbonConfigException # noqa
2929
3030 try:
3131 run_twistd_plugin(__file__)
32 except CarbonConfigException, exc:
32 except CarbonConfigException as exc:
3333 raise SystemExit(str(exc))
2424 LIB_DIR = os.path.join(ROOT_DIR, "lib")
2525 sys.path.insert(0, LIB_DIR)
2626
27 from carbon.util import run_twistd_plugin
28 from carbon.exceptions import CarbonConfigException
27 from carbon.util import run_twistd_plugin # noqa
28 from carbon.exceptions import CarbonConfigException # noqa
2929
3030 try:
3131 run_twistd_plugin(__file__)
32 except CarbonConfigException, exc:
32 except CarbonConfigException as exc:
3333 raise SystemExit(str(exc))
1313 limitations under the License."""
1414
1515 import sys
16 import imp
1716 from os.path import dirname, join, abspath, exists
1817 from optparse import OptionParser
1918
3433 except ImportError:
3534 pass
3635
37 from twisted.internet import stdio, reactor, defer
38 from twisted.protocols.basic import LineReceiver
39 from carbon.routers import ConsistentHashingRouter, RelayRulesRouter
40 from carbon.client import CarbonClientManager
41 from carbon import log, events
36 from twisted.internet import stdio, reactor, defer # noqa
37 from twisted.protocols.basic import LineReceiver # noqa
38 from carbon.routers import ConsistentHashingRouter, RelayRulesRouter # noqa
39 from carbon.client import CarbonClientManager # noqa
40 from carbon import log, events # noqa
4241
4342
4443 option_parser = OptionParser(usage="%prog [options] <host:port:instance> <host:port:instance> ...")
4544 option_parser.add_option('--debug', action='store_true', help="Log debug info to stdout")
4645 option_parser.add_option('--keyfunc', help="Use a custom key function (path/to/module.py:myFunc)")
4746 option_parser.add_option('--replication', type='int', default=1, help='Replication factor')
48 option_parser.add_option('--routing', default='consistent-hashing',
47 option_parser.add_option(
48 '--routing', default='consistent-hashing',
4949 help='Routing method: "consistent-hashing" (default) or "relay"')
50 option_parser.add_option('--diverse-replicas', action='store_true', help="Spread replicas across diff. servers")
51 option_parser.add_option('--relayrules', default=default_relayrules,
52 help='relay-rules.conf file to use for relay routing')
50 option_parser.add_option(
51 '--diverse-replicas', action='store_true', help="Spread replicas across diff. servers")
52 option_parser.add_option(
53 '--relayrules', default=default_relayrules, help='relay-rules.conf file to use for relay routing')
5354
5455 options, args = option_parser.parse_args()
5556
5657 if not args:
57 print 'At least one host:port destination required\n'
58 print('At least one host:port destination required\n')
5859 option_parser.print_usage()
5960 raise SystemExit(1)
6061
6162 if options.routing not in ('consistent-hashing', 'relay'):
62 print "Invalid --routing value, must be one of:"
63 print " consistent-hashing"
64 print " relay"
63 print("Invalid --routing value, must be one of:")
64 print(" consistent-hashing")
65 print(" relay")
6566 raise SystemExit(1)
6667
6768 destinations = []
7374 instance = parts[2]
7475 else:
7576 instance = None
76 destinations.append( (host, port, instance) )
77 destinations.append((host, port, instance))
7778
7879 if options.debug:
7980 log.logToStdout()
8687 if exists(options.relayrules):
8788 router = RelayRulesRouter(options.relayrules)
8889 else:
89 print "relay rules file %s does not exist" % options.relayrules
90 print("relay rules file %s does not exist" % options.relayrules)
9091 raise SystemExit(1)
9192
9293 client_manager = CarbonClientManager(router)
103104 delimiter = '\n'
104105
105106 def lineReceived(self, line):
106 #log.msg("[DEBUG] lineReceived(): %s" % line)
107 # log.msg("[DEBUG] lineReceived(): %s" % line)
107108 try:
108109 (metric, value, timestamp) = line.split()
109110 datapoint = (float(timestamp), float(value))
110 assert datapoint[1] == datapoint[1] # filter out NaNs
111 assert datapoint[1] == datapoint[1] # filter out NaNs
111112 client_manager.sendDatapoint(metric, datapoint)
112113 except ValueError:
113114 log.err(None, 'Dropping invalid line: %s' % line)
114115
115116 def connectionLost(self, reason):
116117 log.msg('stdin disconnected')
118
117119 def startShutdown(results):
118120 log.msg("startShutdown(%s)" % str(results))
119121 allStopped = client_manager.stopAllClients()
120122 allStopped.addCallback(shutdown)
123
121124 firstConnectsAttempted.addCallback(startShutdown)
122125
123 stdio.StandardIO( StdinMetricsReader() )
126
127 stdio.StandardIO(StdinMetricsReader())
124128
125129 exitCode = 0
130
131
126132 def shutdown(results):
127133 global exitCode
128134 for success, result in results:
132138 if reactor.running:
133139 reactor.stop()
134140
141
135142 reactor.run()
136143 raise SystemExit(exitCode)
2424 LIB_DIR = os.path.join(ROOT_DIR, "lib")
2525 sys.path.insert(0, LIB_DIR)
2626
27 from carbon.util import run_twistd_plugin
28 from carbon.exceptions import CarbonConfigException
27 from carbon.util import run_twistd_plugin # noqa
28 from carbon.exceptions import CarbonConfigException # noqa
2929
3030 try:
3131 run_twistd_plugin(__file__)
32 except CarbonConfigException, exc:
32 except CarbonConfigException as exc:
3333 raise SystemExit(str(exc))
1414
1515 import sys
1616 import whisper
17 from os.path import dirname, exists, join, realpath
18 from ConfigParser import ConfigParser
17 from os.path import dirname, join, realpath
18
19 try:
20 from ConfigParser import ConfigParser
21 except ImportError:
22 from configparser import ConfigParser
1923
2024 if len(sys.argv) == 2:
2125 SCHEMAS_FILE = sys.argv[1]
22 print "Loading storage-schemas configuration from: '%s'" % SCHEMAS_FILE
26 print("Loading storage-schemas configuration from: '%s'" % SCHEMAS_FILE)
2327 else:
2428 SCHEMAS_FILE = realpath(join(dirname(__file__), '..', 'conf', 'storage-schemas.conf'))
25 print "Loading storage-schemas configuration from default location at: '%s'" % SCHEMAS_FILE
29 print("Loading storage-schemas configuration from default location at: '%s'" % SCHEMAS_FILE)
2630
2731 config_parser = ConfigParser()
2832 if not config_parser.read(SCHEMAS_FILE):
3135 errors_found = 0
3236
3337 for section in config_parser.sections():
34 print "Section '%s':" % section
38 print("Section '%s':" % section)
3539 options = dict(config_parser.items(section))
3640 retentions = options['retentions'].split(',')
3741
4044 for retention in retentions:
4145 try:
4246 archives.append(whisper.parseRetentionDef(retention))
43 except ValueError, e:
44 print " - Error: Section '%s' contains an invalid item in its retention definition ('%s')" % \
47 except ValueError as e:
48 print(
49 " - Error: Section '%s' contains an invalid item in its retention definition ('%s')" %
4550 (section, retention)
46 print " %s" % e.message
51 )
52 print(" %s" % e)
4753 section_failed = True
4854
4955 if not section_failed:
5056 try:
5157 whisper.validateArchiveList(archives)
52 except whisper.InvalidConfiguration, e:
53 print " - Error: Section '%s' contains an invalid retention definition ('%s')" % \
58 except whisper.InvalidConfiguration as e:
59 print(
60 " - Error: Section '%s' contains an invalid retention definition ('%s')" %
5461 (section, ','.join(retentions))
55 print " %s" % e.message
62 )
63 print(" %s" % e)
5664
5765 if section_failed:
5866 errors_found += 1
5967 else:
60 print " OK"
68 print(" OK")
6169
6270 if errors_found:
63 raise SystemExit( "Storage-schemas configuration '%s' failed validation" % SCHEMAS_FILE)
71 raise SystemExit("Storage-schemas configuration '%s' failed validation" % SCHEMAS_FILE)
6472
65 print "Storage-schemas configuration '%s' is valid" % SCHEMAS_FILE
73 print("Storage-schemas configuration '%s' is valid" % SCHEMAS_FILE)
8484 # second.
8585 MIN_TIMESTAMP_RESOLUTION = 1
8686
87 # Set the minimum lag in seconds for a point to be written to the database
88 # in order to optimize batching. This means that each point will wait at least
89 # the duration of this lag before being written. Setting this to 0 disable the feature.
90 # This currently only works when using the timesorted write strategy.
91 # MIN_TIMESTAMP_LAG = 0
92
8793 # Set the interface and port for the line (plain text) listener. Setting the
8894 # interface to 0.0.0.0 listens on all interfaces. Port can be set to 0 to
8995 # disable this listener if it is not required.
130136 USE_FLOW_CONTROL = True
131137
132138 # If enabled this setting is used to timeout metric client connection if no
133 # metrics have been sent in specified time in seconds
139 # metrics have been sent in specified time in seconds
134140 #METRIC_CLIENT_IDLE_TIMEOUT = None
135141
136142 # By default, carbon-cache will log every whisper update and cache hit.
200206
201207 # On systems which has a large number of metrics, an amount of Whisper write(2)'s
202208 # pageback sometimes cause disk thrashing due to memory shortage, so that abnormal
203 # disk reads occur. Enabling this option makes it possible to decrease useless
209 # disk reads occur. Enabling this option makes it possible to decrease useless
204210 # page cache memory by posix_fadvise(2) with POSIX_FADVISE_RANDOM option.
205211 # WHISPER_FADVISE_RANDOM = False
206212
295301 # Example: store everything
296302 # BIND_PATTERNS = #
297303
304 # URL of graphite-web instance, this is used to add incoming series to the tag database
305 # GRAPHITE_URL = http://127.0.0.1:80
306
307 # Tag support, when enabled carbon will make HTTP calls to graphite-web to update the tag index
308 # ENABLE_TAGS = True
309
310 # Tag update interval, this specifies how frequently updates to existing series will trigger
311 # an update to the tag index, the default setting is once every 100 updates
312 # TAG_UPDATE_INTERVAL = 100
313
314 # Tag hash filenames, this specifies whether tagged metric filenames should use the hash of the metric name
315 # or a human-readable name, using hashed names avoids issues with path length when using a large number of tags
316 # TAG_HASH_FILENAMES = True
317
318 # Tag batch size, this specifies the maximum number of series to be sent to graphite-web in a single batch
319 # TAG_BATCH_SIZE = 100
320
321 # Tag queue size, this specifies the maximum number of series to be queued for sending to graphite-web
322 # There are separate queues for new series and for updates to existing series
323 # TAG_QUEUE_SIZE = 10000
324
325 # Set to enable Sentry.io exception monitoring.
326 # RAVEN_DSN='YOUR_DSN_HERE'.
327
298328 # To configure special settings for the carbon-cache instance 'b', uncomment this:
299329 #[cache:b]
300330 #LINE_RECEIVER_PORT = 2103
340370
341371 # For REPLICATION_FACTOR >=2, set DIVERSE_REPLICAS to True to guarantee replicas
342372 # across distributed hosts. With this setting disabled, it's possible that replicas
343 # may be sent to different caches on the same host. This has been the default
373 # may be sent to different caches on the same host. This has been the default
344374 # behavior since introduction of 'consistent-hashing' relay method.
345375 # Note that enabling this on an existing pre-0.9.14 cluster will require rebalancing
346376 # your metrics across the cluster nodes using a tool like Carbonate.
368398 # set to one of "line", "pickle", "udp" and "protobuf". This list can be
369399 # extended with CarbonClientFactory plugins and defaults to "pickle".
370400 # DESTINATION_PROTOCOL = pickle
401
402 # This defines the wire transport, either none or ssl.
403 # If SSL is used any TCP connection will be upgraded to TLS1. The system's
404 # trust authority will be used unless DESTINATION_SSL_CA is specified in
405 # which case an alternative certificate authority chain will be used for
406 # verifying the remote certificate.
407 # To use SSL you'll need the cryptography, service_identity, and twisted >= 14
408 # DESTINATION_TRANSPORT = none
409 # DESTINATION_SSL_CA=/path/to/private-ca.crt
410
411 # This allows to have multiple connections per destinations, this will
412 # pool all the replicas of a single host in the same queue and distribute
413 # points accross these replicas instead of replicating them.
414 # The following example will balance the load between :0 and :1.
415 ## DESTINATIONS = foo:2001:0, foo:2001:1
416 ## RELAY_METHOD = rules
417 # Note: this is currently incompatible with USE_RATIO_RESET which gets
418 # disabled if this option is enabled.
419 # DESTINATIONS_POOL_REPLICAS = False
420
421 # When using consistent hashing it sometime makes sense to make
422 # the ring dynamic when you don't want to loose points when a
423 # single destination is down. Replication is an answer to that
424 # but it can be quite expensive.
425 # DYNAMIC_ROUTER = False
426
427 # Controls the number of connection attempts before marking a
428 # destination as down. We usually do one connection attempt per
429 # second.
430 # DYNAMIC_ROUTER_MAX_RETRIES = 5
371431
372432 # This is the maximum number of datapoints that can be queued up
373433 # for a single destination. Once this limit is hit, we will
412472 USE_FLOW_CONTROL = True
413473
414474 # If enabled this setting is used to timeout metric client connection if no
415 # metrics have been sent in specified time in seconds
475 # metrics have been sent in specified time in seconds
416476 #METRIC_CLIENT_IDLE_TIMEOUT = None
417477
418478 # Set this to True to enable whitelisting and blacklisting of metrics in
460520 # reset connections for no good reason.
461521 MIN_RESET_INTERVAL=121
462522
523 # Enable TCP Keep Alive (http://tldp.org/HOWTO/TCP-Keepalive-HOWTO/overview.html).
524 # Default settings will send a probe every 30s. Default is False.
525 # TCP_KEEPALIVE=True
526 # The interval between the last data packet sent (simple ACKs are not
527 # considered data) and the first keepalive probe; after the connection is marked
528 # to need keepalive, this counter is not used any further.
529 # TCP_KEEPIDLE=10
530 # The interval between subsequential keepalive probes, regardless of what
531 # the connection has exchanged in the meantime.
532 # TCP_KEEPINTVL=30
533 # The number of unacknowledged probes to send before considering the connection
534 # dead and notifying the application layer.
535 # TCP_KEEPCNT=2
536
537
463538 [aggregator]
464539 LINE_RECEIVER_INTERFACE = 0.0.0.0
465540 LINE_RECEIVER_PORT = 2023
510585 USE_FLOW_CONTROL = True
511586
512587 # If enabled this setting is used to timeout metric client connection if no
513 # metrics have been sent in specified time in seconds
588 # metrics have been sent in specified time in seconds
514589 #METRIC_CLIENT_IDLE_TIMEOUT = None
515590
516591 # This defines the maximum "message size" between carbon daemons.
558633 # Specify the user to drop privileges to
559634 # If this is blank carbon-aggregator runs as the user that invokes it
560635 # USER =
636
637 # Part of the code, and particularly aggregator rules, need
638 # to cache metric names. To avoid leaking too much memory you
639 # can tweak the size of this cache. The default allow for 1M
640 # different metrics per rule (~200MiB).
641 # CACHE_METRIC_NAMES_MAX=1000000
642
643 # You can optionally set a ttl to this cache.
644 # CACHE_METRIC_NAMES_TTL=600
2323 [sum]
2424 pattern = \.count$
2525 xFilesFactor = 0
26 aggregationMethod = sum
26 aggregationMethod = max
2727
2828 [default_average]
2929 pattern = .*
1313 # Valid: 60s:7d,300s:30d (300/60 = 5)
1414 # Invalid: 180s:7d,300s:30d (300/180 = 3.333)
1515 #
16 # This retention is set at the time the first metric is sent.
17 # Changing this file will not affect already-created .wsp files.
18 # Use whisper-resize.py to change existing data files.
1619
1720 # Carbon's internal metrics. This entry should match what is specified in
1821 # CARBON_METRIC_PREFIX and CARBON_METRIC_INTERVAL settings
33 from carbon import log
44
55
6 class BufferManager:
6 class _BufferManager:
77 def __init__(self):
88 self.buffers = {}
99
1212
1313 def get_buffer(self, metric_path):
1414 if metric_path not in self.buffers:
15 log.aggregator("Allocating new metric buffer for %s" % metric_path)
15 log.debug("Allocating new metric buffer for %s" % metric_path)
1616 self.buffers[metric_path] = MetricBuffer(metric_path)
1717
1818 return self.buffers[metric_path]
5050 self.aggregation_frequency = int(frequency)
5151 self.aggregation_func = func
5252 self.compute_task = LoopingCall(self.compute_value)
53 compute_frequency = min(settings['WRITE_BACK_FREQUENCY'], frequency) or frequency
53 if settings['WRITE_BACK_FREQUENCY'] is not None:
54 compute_frequency = min(settings['WRITE_BACK_FREQUENCY'], frequency)
55 else:
56 compute_frequency = frequency
5457 self.compute_task.start(compute_frequency, now=False)
5558 self.configured = True
5659
5760 def compute_value(self):
58 now = int( time.time() )
61 now = int(time.time())
5962 current_interval = now - (now % self.aggregation_frequency)
60 age_threshold = current_interval - (settings['MAX_AGGREGATION_INTERVALS'] * self.aggregation_frequency)
63 age_threshold = current_interval - (
64 settings['MAX_AGGREGATION_INTERVALS'] * self.aggregation_frequency)
6165
62 for buffer in self.interval_buffers.values():
66 for buffer in list(self.interval_buffers.values()):
6367 if buffer.active:
6468 value = self.aggregation_func(buffer.values)
6569 datapoint = (buffer.interval, value)
9296 self.active = True
9397
9498 def input(self, datapoint):
95 self.values.append( datapoint[1] )
99 self.values.append(datapoint[1])
96100 self.active = True
97101
98102 def mark_inactive(self):
100104
101105
102106 # Shared importable singleton
103 BufferManager = BufferManager()
107 BufferManager = _BufferManager()
104108
105109 # Avoid import circularity
106 from carbon import state
110 from carbon import state # NOQA
11 from carbon.aggregator.buffers import BufferManager
22 from carbon.instrumentation import increment
33 from carbon.pipeline import Processor
4 from carbon.rewrite import PRE, POST, RewriteRuleManager
54 from carbon.conf import settings
65 from carbon import log
76
1110
1211 def process(self, metric, datapoint):
1312 increment('datapointsReceived')
14
15 for rule in RewriteRuleManager.rules(PRE):
16 metric = rule.apply(metric)
1713
1814 aggregate_metrics = set()
1915
3228
3329 values_buffer.input(datapoint)
3430
35 for rule in RewriteRuleManager.rules(POST):
36 metric = rule.apply(metric)
37
3831 if settings.FORWARD_ALL and metric not in aggregate_metrics:
3932 if settings.LOG_AGGREGATOR_MISSES and len(aggregate_metrics) == 0:
40 log.msg("Couldn't match metric %s with any aggregation rule. Passing on un-aggregated." % metric)
33 log.msg(
34 "Couldn't match metric %s with any aggregation rule. Passing on un-aggregated." % metric)
4135 yield (metric, datapoint)
0 import time
10 import re
1
2 from math import floor, ceil
3
24 from os.path import exists, getmtime
35 from twisted.internet.task import LoopingCall
6 from cachetools import TTLCache, LRUCache
7
48 from carbon import log
9 from carbon.conf import settings
510 from carbon.aggregator.buffers import BufferManager
611
712
8 class RuleManager:
13 def get_cache():
14 ttl = settings.CACHE_METRIC_NAMES_TTL
15 size = settings.CACHE_METRIC_NAMES_MAX
16 if ttl > 0 and size > 0:
17 return TTLCache(size, ttl)
18 elif size > 0:
19 return LRUCache(size)
20 else:
21 return dict()
22
23
24 class RuleManager(object):
925 def __init__(self):
1026 self.rules = []
1127 self.rules_file = None
5571 left_side, right_side = line.split('=', 1)
5672 output_pattern, frequency = left_side.split()
5773 method, input_pattern = right_side.split()
58 frequency = int( frequency.lstrip('(').rstrip(')') )
74 frequency = int(frequency.lstrip('(').rstrip(')'))
5975 return AggregationRule(input_pattern, output_pattern, method, frequency)
6076
6177 except ValueError:
6379 raise
6480
6581
66 class AggregationRule:
82 class AggregationRule(object):
6783 def __init__(self, input_pattern, output_pattern, method, frequency):
6884 self.input_pattern = input_pattern
6985 self.output_pattern = output_pattern
7692 self.aggregation_func = AGGREGATION_METHODS[method]
7793 self.build_regex()
7894 self.build_template()
79 self.cache = {}
95 self.cache = get_cache()
8096
8197 def get_aggregate_metric(self, metric_path):
8298 if metric_path in self.cache:
94110 try:
95111 result = self.output_template % extracted_fields
96112 except TypeError:
97 log.err("Failed to interpolate template %s with fields %s" % (self.output_template, extracted_fields))
113 log.err("Failed to interpolate template %s with fields %s" % (
114 self.output_template, extracted_fields))
98115
99 if result:
100 self.cache[metric_path] = result
116 self.cache[metric_path] = result
101117 return result
102118
103119 def build_regex(self):
109125 i = input_part.find('<<')
110126 j = input_part.find('>>')
111127 pre = input_part[:i]
112 post = input_part[j+2:]
113 field_name = input_part[i+2:j]
128 post = input_part[j + 2:]
129 field_name = input_part[i + 2:j]
114130 regex_part = '%s(?P<%s>.+)%s' % (pre, field_name, post)
115131
116132 else:
118134 j = input_part.find('>')
119135 if i > -1 and j > i:
120136 pre = input_part[:i]
121 post = input_part[j+1:]
122 field_name = input_part[i+1:j]
137 post = input_part[j + 1:]
138 field_name = input_part[i + 1:j]
123139 regex_part = '%s(?P<%s>[^.]+)%s' % (pre, field_name, post)
124140 elif input_part == '*':
125141 regex_part = '[^.]+'
137153
138154 def avg(values):
139155 if values:
140 return float( sum(values) ) / len(values)
156 return float(sum(values)) / len(values)
157
141158
142159 def count(values):
143160 if values:
144161 return len(values)
145162
163
164 def percentile(factor):
165 def func(values):
166 if values:
167 values = sorted(values)
168 rank = factor * (len(values) - 1)
169 rank_left = int(floor(rank))
170 rank_right = int(ceil(rank))
171
172 if rank_left == rank_right:
173 return values[rank_left]
174 else:
175 return values[rank_left] * (rank_right - rank) + values[rank_right] * (rank - rank_left)
176
177 return func
178
179
146180 AGGREGATION_METHODS = {
147 'sum' : sum,
148 'avg' : avg,
149 'min' : min,
150 'max' : max,
151 'count' : count
181 'sum': sum,
182 'avg': avg,
183 'min': min,
184 'max': max,
185 'p50': percentile(0.50),
186 'p75': percentile(0.75),
187 'p80': percentile(0.80),
188 'p90': percentile(0.90),
189 'p95': percentile(0.95),
190 'p99': percentile(0.99),
191 'p999': percentile(0.999),
192 'count': count,
152193 }
153194
154195 # Importable singleton
3030 (see example config file provided)
3131 """
3232 import sys
33
3334 import os
3435 import socket
3536 from optparse import OptionParser
3839 from twisted.internet import reactor
3940 from twisted.internet.protocol import ReconnectingClientFactory
4041 from twisted.application.internet import TCPClient
41 from txamqp.protocol import AMQClient
42 from txamqp.client import TwistedDelegate
43 import txamqp.spec
42
43 # txamqp is currently not ported to py3
44 try:
45 from txamqp.protocol import AMQClient
46 from txamqp.client import TwistedDelegate
47 import txamqp.spec
48 except ImportError:
49 raise ImportError
4450
4551 try:
4652 import carbon
4955 LIB_DIR = os.path.dirname(os.path.dirname(__file__))
5056 sys.path.insert(0, LIB_DIR)
5157
52 import carbon.protocols #satisfy import order requirements
58 import carbon.protocols # NOQA satisfy import order requirements
5359 from carbon.protocols import CarbonServerProtocol
5460 from carbon.conf import settings
55 from carbon import log, events, instrumentation
61 from carbon import log, events
5662
5763
5864 HOSTNAME = socket.gethostname().split('.')[0]
116122
117123 # bind each configured metric pattern
118124 for bind_pattern in settings.BIND_PATTERNS:
119 log.listener("binding exchange '%s' to queue '%s' with pattern %s" \
125 log.listener("binding exchange '%s' to queue '%s' with pattern %s"
120126 % (exchange, my_queue, bind_pattern))
121127 yield chan.queue_bind(exchange=exchange, queue=my_queue,
122128 routing_key=bind_pattern)
123129
124130 yield chan.basic_consume(queue=my_queue, no_ack=True,
125131 consumer_tag=self.consumer_tag)
132
126133 @inlineCallbacks
127134 def receive_loop(self):
128135 queue = yield self.queue(self.consumer_tag)
148155 metric, value, timestamp = line.split()
149156 else:
150157 value, timestamp = line.split()
151 datapoint = ( float(timestamp), float(value) )
158 datapoint = (float(timestamp), float(value))
152159 if datapoint[1] != datapoint[1]: # filter out NaN values
153160 continue
154161 except ValueError:
247254
248255 (options, args) = parser.parse_args()
249256
250
251257 startReceiver(options.host, options.port, options.username,
252258 options.password, vhost=options.vhost,
253259 exchange_name=options.exchange, verbose=options.verbose)
254260 reactor.run()
255261
262
256263 if __name__ == "__main__":
257264 main()
2020 from optparse import OptionParser
2121
2222 from twisted.internet.defer import inlineCallbacks
23 from twisted.internet import reactor, task
23 from twisted.internet import reactor
2424 from twisted.internet.protocol import ClientCreator
2525 from txamqp.protocol import AMQClient
2626 from txamqp.client import TwistedDelegate
5353 yield channel.exchange_declare(exchange=exchange, type="topic",
5454 durable=True, auto_delete=False)
5555
56 message = Content( "%f %d" % (value, timestamp) )
56 message = Content("%f %d" % (value, timestamp))
5757 message["delivery mode"] = 2
5858
5959 channel.basic_publish(exchange=exchange, content=message, routing_key=metric_path)
111111 d.addBoth(lambda _: reactor.stop())
112112 reactor.run()
113113
114
114115 if __name__ == "__main__":
115116 main()
2020 from carbon.conf import settings
2121 from carbon import events, log
2222 from carbon.pipeline import Processor
23
24
25 def by_timestamp((timestamp, value)): # useful sort key function
23 from carbon.util import TaggedSeries
24
25
26 def by_timestamp(t_v): # useful sort key function
27 (timestamp, _) = t_v
2628 return timestamp
2729
2830
3032 plugin_name = 'write'
3133
3234 def __init__(self, *args, **kwargs):
33 super(Processor, self).__init__(*args, **kwargs)
35 super(CacheFeedingProcessor, self).__init__(*args, **kwargs)
3436 self.cache = MetricCache()
3537
3638 def process(self, metric, datapoint):
39 # normalize metric name (reorder tags)
40 try:
41 metric = TaggedSeries.parse(metric).path
42 except Exception as err:
43 log.msg('Error parsing metric %s: %s' % (metric, err))
44
3745 self.cache.store(metric, datapoint)
3846 return Processor.NO_OUTPUT
3947
4654 self.cache = cache
4755
4856 def choose_item(self):
49 raise NotImplemented
57 raise NotImplementedError()
5058
5159
5260 class NaiveStrategy(DrainStrategy):
5664
5765 def _generate_queue():
5866 while True:
59 metric_names = self.cache.keys()
67 metric_names = list(self.cache.keys())
6068 while metric_names:
6169 yield metric_names.pop()
6270
6371 self.queue = _generate_queue()
6472
6573 def choose_item(self):
66 return self.queue.next()
74 return next(self.queue)
6775
6876
6977 class MaxStrategy(DrainStrategy):
7280 that infrequently or irregularly updated metrics may not be written
7381 until shutdown """
7482 def choose_item(self):
75 metric_name, size = max(self.cache.items(), key=lambda x: len(itemgetter(1)(x)))
83 metric_name, _ = max(self.cache.items(), key=lambda x: len(itemgetter(1)(x)))
7684 return metric_name
7785
7886
7987 class RandomStrategy(DrainStrategy):
8088 """Pop points randomly"""
8189 def choose_item(self):
82 return choice(self.cache.keys())
90 return choice(list(self.cache.keys())) # nosec
8391
8492
8593 class SortedStrategy(DrainStrategy):
104112 self.queue = _generate_queue()
105113
106114 def choose_item(self):
107 return self.queue.next()
115 return next(self.queue)
108116
109117
110118 class TimeSortedStrategy(DrainStrategy):
118126 while True:
119127 t = time.time()
120128 metric_lw = sorted(self.cache.watermarks, key=lambda x: x[1], reverse=True)
129 if settings.MIN_TIMESTAMP_LAG:
130 metric_lw = [x for x in metric_lw if t - x[1] > settings.MIN_TIMESTAMP_LAG]
121131 size = len(metric_lw)
122132 if settings.LOG_CACHE_QUEUE_SORTS and size:
123133 log.msg("Sorted %d cache queues in %.6f seconds" % (size, time.time() - t))
134 if not metric_lw:
135 # If there is nothing to do give a chance to sleep to the reader.
136 yield None
124137 while metric_lw:
125138 yield itemgetter(0)(metric_lw.pop())
126139 if settings.LOG_CACHE_QUEUE_SORTS and size:
129142 self.queue = _generate_queue()
130143
131144 def choose_item(self):
132 return self.queue.next()
145 return next(self.queue)
133146
134147
135148 class _MetricCache(defaultdict):
173186 metric = self.strategy.choose_item()
174187 else:
175188 # Avoid .keys() as it dumps the whole list
176 metric = self.iterkeys().next()
189 metric = next(iter(self))
190 if metric is None:
191 return (None, [])
177192 return (metric, self.pop(metric))
178193
179194 def get_datapoints(self, metric):
206221
207222 _Cache = None
208223
224
209225 def MetricCache():
210226 global _Cache
211227 if _Cache is not None:
229245 return _Cache
230246
231247
232
233248 # Avoid import circularities
234 from carbon import state
249 from carbon import state # NOQA
11 # source: carbon.proto
22
33 import sys
4 _b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1'))
54 from google.protobuf import descriptor as _descriptor
65 from google.protobuf import message as _message
76 from google.protobuf import reflection as _reflection
87 from google.protobuf import symbol_database as _symbol_database
9 from google.protobuf import descriptor_pb2
108 # @@protoc_insertion_point(imports)
9
1110
1211 _sym_db = _symbol_database.Default()
1312
14
15
13 _b = sys.version_info[0] < 3 and (lambda x: x) or (lambda x: x.encode('latin1'))
1614
1715 DESCRIPTOR = _descriptor.FileDescriptor(
1816 name='carbon.proto',
1917 package='carbon',
2018 syntax='proto3',
21 serialized_pb=_b('\n\x0c\x63\x61rbon.proto\x12\x06\x63\x61rbon\")\n\x05Point\x12\x11\n\ttimestamp\x18\x01 \x01(\r\x12\r\n\x05value\x18\x02 \x01(\x01\"7\n\x06Metric\x12\x0e\n\x06metric\x18\x01 \x01(\t\x12\x1d\n\x06points\x18\x02 \x03(\x0b\x32\r.carbon.Point\"*\n\x07Payload\x12\x1f\n\x07metrics\x18\x01 \x03(\x0b\x32\x0e.carbon.Metricb\x06proto3')
19 serialized_pb=_b('\n\x0c\x63\x61rbon.proto\x12\x06\x63\x61rbon\")\n\x05Point\x12\x11\n\ttimestamp'
20 '\x18\x01 \x01(\r\x12\r\n\x05value\x18\x02 \x01(\x01\"7\n\x06Metric\x12\x0e\n'
21 '\x06metric\x18\x01 \x01(\t\x12\x1d\n\x06points\x18\x02 \x03(\x0b\x32\r.carbon.'
22 'Point\"*\n\x07Payload\x12\x1f\n\x07metrics\x18\x01 \x03(\x0b\x32\x0e.carbon.'
23 'Metricb\x06proto3')
2224 )
2325 _sym_db.RegisterFileDescriptor(DESCRIPTOR)
24
25
26
2726
2827 _POINT = _descriptor.Descriptor(
2928 name='Point',
138137 DESCRIPTOR.message_types_by_name['Payload'] = _PAYLOAD
139138
140139 Point = _reflection.GeneratedProtocolMessageType('Point', (_message.Message,), dict(
141 DESCRIPTOR = _POINT,
142 __module__ = 'carbon_pb2'
140 DESCRIPTOR=_POINT,
141 __module__='carbon_pb2'
143142 # @@protoc_insertion_point(class_scope:carbon.Point)
144 ))
143 ))
145144 _sym_db.RegisterMessage(Point)
146145
147146 Metric = _reflection.GeneratedProtocolMessageType('Metric', (_message.Message,), dict(
148 DESCRIPTOR = _METRIC,
149 __module__ = 'carbon_pb2'
147 DESCRIPTOR=_METRIC,
148 __module__='carbon_pb2'
150149 # @@protoc_insertion_point(class_scope:carbon.Metric)
151 ))
150 ))
152151 _sym_db.RegisterMessage(Metric)
153152
154153 Payload = _reflection.GeneratedProtocolMessageType('Payload', (_message.Message,), dict(
155 DESCRIPTOR = _PAYLOAD,
156 __module__ = 'carbon_pb2'
154 DESCRIPTOR=_PAYLOAD,
155 __module__='carbon_pb2'
157156 # @@protoc_insertion_point(class_scope:carbon.Payload)
158 ))
157 ))
159158 _sym_db.RegisterMessage(Payload)
160159
161
162160 # @@protoc_insertion_point(module_scope)
0 from collections import deque
0 from collections import deque, defaultdict
11 from time import time
2 from six import with_metaclass
23
34 from twisted.application.service import Service
45 from twisted.internet import reactor
56 from twisted.internet.defer import Deferred, DeferredList
67 from twisted.internet.protocol import ReconnectingClientFactory
78 from twisted.protocols.basic import LineOnlyReceiver, Int32StringReceiver
9
810 from carbon.conf import settings
911 from carbon.util import pickle
12 from carbon.util import PluginRegistrar
13 from carbon.util import enableTcpKeepAlive
1014 from carbon import instrumentation, log, pipeline, state
11 from carbon.util import PluginRegistrar
15
16 try:
17 from OpenSSL import SSL
18 except ImportError:
19 SSL = None
20 try:
21 from twisted.internet import ssl
22 except ImportError:
23 ssl = None
1224
1325 try:
1426 import signal
1527 except ImportError:
1628 log.debug("Couldn't import signal module")
29
30 try:
31 from carbon.resolver import setUpRandomResolver
32 except ImportError:
33 setUpRandomResolver = None
1734
1835
1936 SEND_QUEUE_LOW_WATERMARK = settings.MAX_QUEUE_SIZE * settings.QUEUE_LOW_WATERMARK_PCT
2845 self.transport.registerProducer(self, streaming=True)
2946 # Define internal metric names
3047 self.lastResetTime = time()
48 self.destination = self.factory.destination
3149 self.destinationName = self.factory.destinationName
3250 self.queuedUntilReady = 'destinations.%s.queuedUntilReady' % self.destinationName
3351 self.sent = 'destinations.%s.sent' % self.destinationName
3452 self.batchesSent = 'destinations.%s.batchesSent' % self.destinationName
3553
3654 self.slowConnectionReset = 'destinations.%s.slowConnectionReset' % self.destinationName
37
38 self.factory.connectionMade.callback(self)
55 enableTcpKeepAlive(self.transport, settings.TCP_KEEPALIVE, settings)
56
57 d = self.factory.connectionMade
58 # Setup a new deferred before calling the callback to allow callbacks
59 # to re-register themselves.
3960 self.factory.connectionMade = Deferred()
61 d.callback(self)
62
4063 self.sendQueued()
4164
4265 def connectionLost(self, reason):
103126 if not self.factory.hasQueuedDatapoints():
104127 return
105128
106 if settings.USE_RATIO_RESET is True:
107 if not self.connectionQualityMonitor():
108 self.resetConnectionForQualityReasons("Sent: {0}, Received: {1}".format(
109 instrumentation.prior_stats.get(self.sent, 0),
110 instrumentation.prior_stats.get('metricsReceived', 0)))
129 if not self.connectionQualityMonitor():
130 self.resetConnectionForQualityReasons("Sent: {0}, Received: {1}".format(
131 instrumentation.prior_stats.get(self.sent, 0),
132 instrumentation.prior_stats.get('metricsReceived', 0)))
111133
112134 self.sendDatapointsNow(self.factory.takeSomeFromQueue())
113 if (self.factory.queueFull.called and
114 queueSize < SEND_QUEUE_LOW_WATERMARK):
135 if (self.factory.queueFull.called and queueSize < SEND_QUEUE_LOW_WATERMARK):
115136 if not self.factory.queueHasSpace.called:
116137 self.factory.queueHasSpace.callback(queueSize)
117138 if self.factory.hasQueuedDatapoints():
118139 self.factory.scheduleSend()
119140
120
121141 def connectionQualityMonitor(self):
122142 """Checks to see if the connection for this factory appears to
123143 be delivering stats at a speed close to what we're receiving
131151 True means that the total received is less than settings.MIN_RESET_STAT_FLOW
132152
133153 False means that quality is bad
134
135154 """
155 if not settings.USE_RATIO_RESET:
156 return True
157
158 if settings.DESTINATION_POOL_REPLICAS:
159 received = self.factory.attemptedRelays
160 else:
161 received = 'metricsReceived'
162
136163 destination_sent = float(instrumentation.prior_stats.get(self.sent, 0))
137 total_received = float(instrumentation.prior_stats.get('metricsReceived', 0))
164 total_received = float(instrumentation.prior_stats.get(received, 0))
138165 instrumentation.increment(self.slowConnectionReset, 0)
139166 if total_received < settings.MIN_RESET_STAT_FLOW:
140167 return True
164191 __repr__ = __str__
165192
166193
167 class CarbonClientFactory(object, ReconnectingClientFactory):
168 __metaclass__ = PluginRegistrar
194 class CAReplaceClientContextFactory:
195 """A context factory for SSL clients needing a different CA chain."""
196
197 isClient = 1
198 # SSLv23_METHOD allows SSLv2, SSLv3, and TLSv1. We disable SSLv2 below,
199 # though.
200 method = SSL.SSLv23_METHOD if SSL else None
201
202 _cafile = None
203
204 def __init__(self, file=None):
205 self._cafile = file
206
207 def getContext(self):
208 ctx = SSL.Context(self.method)
209 ctx.set_options(SSL.OP_NO_SSLv2)
210 if self._cafile is not None:
211 ctx.use_certificate_chain_file(self._cafile)
212 return ctx
213
214
215 class CarbonClientFactory(with_metaclass(PluginRegistrar, ReconnectingClientFactory, object)):
169216 plugins = {}
170217 maxDelay = 5
171218
172 def __init__(self, destination):
219 def __init__(self, destination, router):
173220 self.destination = destination
221 self.router = router
174222 self.destinationName = ('%s:%d:%s' % destination).replace('.', '_')
175223 self.host, self.port, self.carbon_instance = destination
176224 self.addr = (self.host, self.port)
180228 self.connectedProtocol = None
181229 self.queueEmpty = Deferred()
182230 self.queueFull = Deferred()
183 self.queueFull.addCallback(self.queueFullCallback)
231 self.queueFull.addCallbacks(self.queueFullCallback, log.err)
184232 self.queueHasSpace = Deferred()
185 self.queueHasSpace.addCallback(self.queueSpaceCallback)
233 self.queueHasSpace.addCallbacks(self.queueSpaceCallback, log.err)
234 # Args: {'connector': connector, 'reason': reason}
186235 self.connectFailed = Deferred()
236 # Args: {'connector': connector, 'reason': reason}
237 self.connectionLost = Deferred()
238 # Args: protocol instance
187239 self.connectionMade = Deferred()
188 self.connectionLost = Deferred()
240 self.connectionMade.addCallbacks(self.clientConnectionMade, log.err)
189241 self.deferSendPending = None
190242 # Define internal metric names
191243 self.attemptedRelays = 'destinations.%s.attemptedRelays' % self.destinationName
213265 if self.queueFull.called:
214266 log.clients('%s send queue has space available' % self.connectedProtocol)
215267 self.queueFull = Deferred()
216 self.queueFull.addCallback(self.queueFullCallback)
268 self.queueFull.addCallbacks(self.queueFullCallback, log.err)
217269 state.events.cacheSpaceAvailable()
218270 self.queueHasSpace = Deferred()
219 self.queueHasSpace.addCallback(self.queueSpaceCallback)
271 self.queueHasSpace.addCallbacks(self.queueSpaceCallback, log.err)
220272
221273 def buildProtocol(self, addr):
222274 self.connectedProtocol = self.clientProtocol()
225277
226278 def startConnecting(self): # calling this startFactory yields recursion problems
227279 self.started = True
228 self.connector = reactor.connectTCP(self.host, self.port, self)
280
281 if settings['DESTINATION_TRANSPORT'] == "ssl":
282 if not SSL or not ssl:
283 print("SSL destination transport request, but no Python OpenSSL available.")
284 raise SystemExit(1)
285 authority = None
286 if settings['DESTINATION_SSL_CA']:
287 try:
288 with open(settings['DESTINATION_SSL_CA']) as f:
289 authority = ssl.Certificate.loadPEM(f.read())
290 except IOError:
291 print("Failed to read CA chain: %s" % settings['DESTINATION_SSL_CA'])
292 raise SystemExit(1)
293 # Twisted 14 introduced this function, it might not be around on older installs.
294 if hasattr(ssl, "optionsForClientTLS"):
295 from six import u
296 client = ssl.optionsForClientTLS(u(self.host), authority)
297 else:
298 client = CAReplaceClientContextFactory(settings['DESTINATION_SSL_CA'])
299 self.connector = reactor.connectSSL(self.host, self.port, self, client)
300 else:
301 self.connector = reactor.connectTCP(self.host, self.port, self)
229302
230303 def stopConnecting(self):
231304 self.started = False
246319 queue.
247320 """
248321 def yield_max_datapoints():
249 for count in range(settings.MAX_DATAPOINTS_PER_MESSAGE):
322 for _ in range(settings.MAX_DATAPOINTS_PER_MESSAGE):
250323 try:
251324 yield self.queue.popleft()
252325 except IndexError:
253 raise StopIteration
326 return
254327 return list(yield_max_datapoints())
255328
256329 def checkQueue(self):
305378 instrumentation.increment(self.queuedUntilConnected)
306379
307380 def startedConnecting(self, connector):
308 log.clients("%s::startedConnecting (%s:%d)" % (self, connector.host, connector.port))
381 log.clients("%s::startedConnecting (%s:%d)" % (
382 self, connector.host, connector.port))
383
384 def clientConnectionMade(self, client):
385 log.clients("%s::connectionMade (%s)" % (self, client))
386 self.resetDelay()
387 self.destinationUp(client.destination)
388 self.connectionMade.addCallbacks(self.clientConnectionMade, log.err)
389 return client
309390
310391 def clientConnectionLost(self, connector, reason):
311392 ReconnectingClientFactory.clientConnectionLost(self, connector, reason)
312 log.clients("%s::clientConnectionLost (%s:%d) %s" % (self, connector.host, connector.port, reason.getErrorMessage()))
393 log.clients("%s::clientConnectionLost (%s:%d) %s" % (
394 self, connector.host, connector.port, reason.getErrorMessage()))
313395 self.connectedProtocol = None
314 self.connectionLost.callback(0)
396
397 self.destinationDown(self.destination)
398
399 args = dict(connector=connector, reason=reason)
400 d = self.connectionLost
315401 self.connectionLost = Deferred()
402 d.callback(args)
316403
317404 def clientConnectionFailed(self, connector, reason):
318405 ReconnectingClientFactory.clientConnectionFailed(self, connector, reason)
319 log.clients("%s::clientConnectionFailed (%s:%d) %s" % (self, connector.host, connector.port, reason.getErrorMessage()))
320 self.connectFailed.callback(dict(connector=connector, reason=reason))
406 log.clients("%s::clientConnectionFailed (%s:%d) %s" % (
407 self, connector.host, connector.port, reason.getErrorMessage()))
408
409 self.destinationDown(self.destination)
410
411 args = dict(connector=connector, reason=reason)
412 d = self.connectFailed
321413 self.connectFailed = Deferred()
414 d.callback(args)
415
416 def destinationUp(self, destination):
417 log.clients("Destination is up: %s:%d:%s" % destination)
418 if not self.router.hasDestination(destination):
419 log.clients("Adding client %s:%d:%s to router" % destination)
420 self.router.addDestination(destination)
421 state.events.resumeReceivingMetrics()
422
423 def destinationDown(self, destination):
424 # Only blacklist the destination if we tried a lot.
425 log.clients("Destination is down: %s:%d:%s (%d/%d)" % (
426 destination[0], destination[1], destination[2], self.retries,
427 settings.DYNAMIC_ROUTER_MAX_RETRIES))
428 # Retries comes from the ReconnectingClientFactory.
429 if self.retries < settings.DYNAMIC_ROUTER_MAX_RETRIES:
430 return
431
432 if settings.DYNAMIC_ROUTER and self.router.hasDestination(destination):
433 log.clients("Removing client %s:%d:%s to router" % destination)
434 self.router.removeDestination(destination)
435 # Do not receive more metrics if we don't have any usable destinations.
436 if not self.router.countDestinations():
437 state.events.pauseReceivingMetrics()
438 # Re-inject queued metrics.
439 metrics = list(self.queue)
440 log.clients("Re-injecting %d metrics from %s" % (len(metrics), self))
441 for metric, datapoint in metrics:
442 state.events.metricGenerated(metric, datapoint)
443 self.queue.clear()
322444
323445 def disconnect(self):
324 self.queueEmpty.addCallback(lambda result: self.stopConnecting())
446 self.queueEmpty.addCallbacks(lambda result: self.stopConnecting(), log.err)
325447 readyToStop = DeferredList(
326448 [self.connectionLost, self.connectFailed],
327449 fireOnOneCallback=True,
343465 class CarbonPickleClientProtocol(CarbonClientProtocol, Int32StringReceiver):
344466
345467 def _sendDatapointsNow(self, datapoints):
346 self.sendString(pickle.dumps(datapoints, protocol=-1))
468 self.sendString(pickle.dumps(datapoints, protocol=2))
347469
348470
349471 class CarbonPickleClientFactory(CarbonClientFactory):
357479
358480 def _sendDatapointsNow(self, datapoints):
359481 for metric, datapoint in datapoints:
360 self.sendLine("%s %s %d" % (metric, datapoint[1], datapoint[0]))
482 if isinstance(datapoint[1], float):
483 value = ("%.10f" % datapoint[1]).rstrip('0').rstrip('.')
484 else:
485 value = "%d" % datapoint[1]
486 self.sendLine("%s %s %d" % (metric, value, datapoint[0]))
361487
362488
363489 class CarbonLineClientFactory(CarbonClientFactory):
367493 return CarbonLineClientProtocol()
368494
369495
496 class FakeClientFactory(object):
497 """Fake client factory that buffers points
498
499 This is used when all the destinations are down and before we
500 pause the reception of metrics to avoid loosing points.
501 """
502
503 def __init__(self):
504 # This queue isn't explicitely bounded but will implicitely be. It receives
505 # only metrics when no destinations are available, and as soon as we detect
506 # that we don't have any destination we pause the producer: this mean that
507 # it will contain only a few seconds of metrics.
508 self.queue = deque()
509 self.started = False
510
511 def startConnecting(self):
512 pass
513
514 def sendDatapoint(self, metric, datapoint):
515 self.queue.append((metric, datapoint))
516
517 def sendHighPriorityDatapoint(self, metric, datapoint):
518 self.queue.append((metric, datapoint))
519
520 def reinjectDatapoints(self):
521 metrics = list(self.queue)
522 log.clients("Re-injecting %d metrics from %s" % (len(metrics), self))
523 for metric, datapoint in metrics:
524 state.events.metricGenerated(metric, datapoint)
525 self.queue.clear()
526
527
370528 class CarbonClientManager(Service):
371529 def __init__(self, router):
530 if settings.DESTINATION_POOL_REPLICAS:
531 # If we decide to open multiple TCP connection to a replica, we probably
532 # want to try to also load-balance accross hosts. In this case we need
533 # to make sure rfc3484 doesn't get in the way.
534 if setUpRandomResolver:
535 setUpRandomResolver(reactor)
536 else:
537 print("Import error, Twisted >= 17.1.0 needed for using DESTINATION_POOL_REPLICAS.")
538 raise SystemExit(1)
539
372540 self.router = router
373541 self.client_factories = {} # { destination : CarbonClientFactory() }
542 # { destination[0:2]: set(CarbonClientFactory()) }
543 self.pooled_factories = defaultdict(set)
544
545 # This fake factory will be used as a buffer when we did not manage
546 # to connect to any destination.
547 fake_factory = FakeClientFactory()
548 self.client_factories[None] = fake_factory
549 state.events.resumeReceivingMetrics.addHandler(fake_factory.reinjectDatapoints)
374550
375551 def createFactory(self, destination):
376 from carbon.conf import settings
377
378552 factory_name = settings["DESTINATION_PROTOCOL"]
379553 factory_class = CarbonClientFactory.plugins.get(factory_name)
380554
381555 if not factory_class:
382 print ("In carbon.conf, DESTINATION_PROTOCOL must be one of %s. "
383 "Invalid value: '%s'" % (', '.join(CarbonClientFactory.plugins), factory_name))
556 print("In carbon.conf, DESTINATION_PROTOCOL must be one of %s. "
557 "Invalid value: '%s'" % (', '.join(CarbonClientFactory.plugins), factory_name))
384558 raise SystemExit(1)
385559
386 return factory_class(destination)
560 return factory_class(destination, self.router)
387561
388562 def startService(self):
389563 if 'signal' in globals().keys():
403577 return
404578
405579 log.clients("connecting to carbon daemon at %s:%d:%s" % destination)
406 self.router.addDestination(destination)
580 if not settings.DYNAMIC_ROUTER:
581 # If not using a dynamic router we add the destination before
582 # it's known to be working.
583 self.router.addDestination(destination)
407584
408585 factory = self.createFactory(destination)
409586 self.client_factories[destination] = factory
587 self.pooled_factories[destination[0:2]].add(factory)
588
410589 connectAttempted = DeferredList(
411590 [factory.connectionMade, factory.connectFailed],
412591 fireOnOneCallback=True,
418597
419598 def stopClient(self, destination):
420599 factory = self.client_factories.get(destination)
421 if factory is None:
422 return
600 if factory is None or destination is None:
601 return None
423602
424603 self.router.removeDestination(destination)
425604 stopCompleted = factory.disconnect()
426 stopCompleted.addCallback(lambda result: self.disconnectClient(destination))
605 stopCompleted.addCallbacks(
606 lambda result: self.disconnectClient(destination), log.err
607 )
427608 return stopCompleted
428609
429610 def disconnectClient(self, destination):
430611 factory = self.client_factories.pop(destination)
612 self.pooled_factories[destination[0:2]].remove(factory)
431613 c = factory.connector
432614 if c and c.state == 'connecting' and not factory.hasQueuedDatapoints():
433615 c.stopConnecting()
435617 def stopAllClients(self):
436618 deferreds = []
437619 for destination in list(self.client_factories):
438 deferreds.append(self.stopClient(destination))
620 deferred = self.stopClient(destination)
621 if deferred:
622 deferreds.append(deferred)
439623 return DeferredList(deferreds)
440624
625 def getDestinations(self, metric):
626 destinations = list(self.router.getDestinations(metric))
627 # If we can't find any destination we just buffer the
628 # points. We will also pause the socket on the receiving side.
629 if not destinations:
630 return [None]
631 return destinations
632
633 def getFactories(self, metric):
634 destinations = self.getDestinations(metric)
635 factories = set()
636
637 if not settings.DESTINATION_POOL_REPLICAS:
638 # Simple case, with only one replica per destination.
639 for d in destinations:
640 # If we can't find it, we add to the 'fake' factory / buffer.
641 factories.add(self.client_factories.get(d))
642 else:
643 # Here we might have multiple replicas per destination.
644 for d in destinations:
645 if d is None:
646 # d == None means there are no destinations currently available, so
647 # we just put the data into our fake factory / buffer.
648 factories.add(self.client_factories[None])
649 else:
650 # Else we take the replica with the smallest queue size.
651 key = d[0:2] # Take only host:port, not instance.
652 factories.add(min(self.pooled_factories[key], key=lambda f: f.queueSize))
653 return factories
654
441655 def sendDatapoint(self, metric, datapoint):
442 for destination in self.router.getDestinations(metric):
443 self.client_factories[destination].sendDatapoint(metric, datapoint)
656 for factory in self.getFactories(metric):
657 factory.sendDatapoint(metric, datapoint)
444658
445659 def sendHighPriorityDatapoint(self, metric, datapoint):
446 for destination in self.router.getDestinations(metric):
447 self.client_factories[destination].sendHighPriorityDatapoint(metric, datapoint)
660 for factory in self.getFactories(metric):
661 factory.sendHighPriorityDatapoint(metric, datapoint)
448662
449663 def __str__(self):
450664 return "<%s[%x]>" % (self.__class__.__name__, id(self))
1818
1919 from os.path import join, dirname, normpath, exists, isdir
2020 from optparse import OptionParser
21 from ConfigParser import ConfigParser
21
22 try:
23 from ConfigParser import ConfigParser
24 # ConfigParser is renamed to configparser in py3
25 except ImportError:
26 from configparser import ConfigParser
2227
2328 from carbon import log, state
2429 from carbon.database import TimeSeriesDatabase
3439 MAX_UPDATES_PER_SECOND=500,
3540 MAX_CREATES_PER_MINUTE=float('inf'),
3641 MIN_TIMESTAMP_RESOLUTION=0,
42 MIN_TIMESTAMP_LAG=0,
3743 LINE_RECEIVER_INTERFACE='0.0.0.0',
3844 LINE_RECEIVER_PORT=2003,
3945 ENABLE_UDP_LISTENER=False,
6975 AMQP_VERBOSE=False,
7076 AMQP_SPEC=None,
7177 BIND_PATTERNS=['#'],
78 GRAPHITE_URL='http://127.0.0.1:80',
79 ENABLE_TAGS=True,
80 TAG_UPDATE_INTERVAL=100,
81 TAG_BATCH_SIZE=100,
82 TAG_QUEUE_SIZE=10000,
83 TAG_HASH_FILENAMES=True,
7284 ENABLE_MANHOLE=False,
7385 MANHOLE_INTERFACE='127.0.0.1',
7486 MANHOLE_PORT=7222,
7587 MANHOLE_USER="",
7688 MANHOLE_PUBLIC_KEY="",
7789 RELAY_METHOD='rules',
90 DYNAMIC_ROUTER=False,
91 DYNAMIC_ROUTER_MAX_RETRIES=5,
92 ROUTER_HASH_TYPE=None,
7893 REPLICATION_FACTOR=1,
7994 DIVERSE_REPLICAS=True,
8095 DESTINATIONS=[],
8196 DESTINATION_PROTOCOL="pickle",
97 DESTINATION_TRANSPORT="none",
98 DESTINATION_SSL_CA=None,
99 DESTINATION_POOL_REPLICAS=False,
82100 USE_FLOW_CONTROL=True,
83101 USE_INSECURE_UNPICKLER=False,
84102 USE_WHITELIST=False,
89107 MIN_RESET_STAT_FLOW=1000,
90108 MIN_RESET_RATIO=0.9,
91109 MIN_RESET_INTERVAL=121,
110 TCP_KEEPALIVE=True,
111 TCP_KEEPIDLE=10,
112 TCP_KEEPINTVL=30,
113 TCP_KEEPCNT=2,
92114 USE_RATIO_RESET=False,
93115 LOG_LISTENER_CONN_SUCCESS=True,
94116 LOG_AGGREGATOR_MISSES=True,
97119 RELAY_RULES='relay-rules.conf',
98120 ENABLE_LOGROTATION=True,
99121 METRIC_CLIENT_IDLE_TIMEOUT=None,
122 CACHE_METRIC_NAMES_MAX=0,
123 CACHE_METRIC_NAMES_TTL=0,
124 RAVEN_DSN=None,
100125 )
101126
102127
107132 try:
108133 os.kill(int(pid), 0)
109134 return True
110 except OSError, err:
135 except OSError as err:
111136 return err.errno == errno.EPERM
112137
113138
215240 self["pidfile"] = pidfile
216241
217242 # Enforce a default umask of '022' if none was set.
218 if not self.parent.has_key("umask") or self.parent["umask"] is None:
219 self.parent["umask"] = 022
243 if "umask" not in self.parent or self.parent["umask"] is None:
244 self.parent["umask"] = 0o022
220245
221246 # Read extra settings from the configuration file.
222247 program_settings = read_config(program, self)
224249 settings["program"] = program
225250
226251 # Normalize and expand paths
227 settings["STORAGE_DIR"] = os.path.normpath(os.path.expanduser(settings["STORAGE_DIR"]))
228 settings["LOCAL_DATA_DIR"] = os.path.normpath(os.path.expanduser(settings["LOCAL_DATA_DIR"]))
229 settings["WHITELISTS_DIR"] = os.path.normpath(os.path.expanduser(settings["WHITELISTS_DIR"]))
230 settings["PID_DIR"] = os.path.normpath(os.path.expanduser(settings["PID_DIR"]))
231 settings["LOG_DIR"] = os.path.normpath(os.path.expanduser(settings["LOG_DIR"]))
232 settings["pidfile"] = os.path.normpath(os.path.expanduser(settings["pidfile"]))
252 def cleanpath(path):
253 return os.path.normpath(os.path.expanduser(path))
254 settings["STORAGE_DIR"] = cleanpath(settings["STORAGE_DIR"])
255 settings["LOCAL_DATA_DIR"] = cleanpath(settings["LOCAL_DATA_DIR"])
256 settings["WHITELISTS_DIR"] = cleanpath(settings["WHITELISTS_DIR"])
257 settings["PID_DIR"] = cleanpath(settings["PID_DIR"])
258 settings["LOG_DIR"] = cleanpath(settings["LOG_DIR"])
259 settings["pidfile"] = cleanpath(settings["pidfile"])
233260
234261 # Set process uid/gid by changing the parent config, if a user was
235262 # provided in the configuration file.
243270
244271 storage_schemas = join(settings["CONF_DIR"], "storage-schemas.conf")
245272 if not exists(storage_schemas):
246 print "Error: missing required config %s" % storage_schemas
273 print("Error: missing required config %s" % storage_schemas)
247274 sys.exit(1)
248275
249276 if settings.CACHE_WRITE_STRATEGY not in ('timesorted', 'sorted', 'max', 'naive'):
255282 # Database-specific settings
256283 database = settings.DATABASE
257284 if database not in TimeSeriesDatabase.plugins:
258 print "No database plugin implemented for '%s'" % database
285 print("No database plugin implemented for '%s'" % database)
259286 raise SystemExit(1)
260287
261288 database_class = TimeSeriesDatabase.plugins[database]
263290
264291 settings.CACHE_SIZE_LOW_WATERMARK = settings.MAX_CACHE_SIZE * 0.95
265292
266 if not "action" in self:
293 if "action" not in self:
267294 self["action"] = "start"
268295 self.handleAction()
269296
314341
315342 if action == "stop":
316343 if not exists(pidfile):
317 print "Pidfile %s does not exist" % pidfile
344 print("Pidfile %s does not exist" % pidfile)
318345 raise SystemExit(0)
319346 pf = open(pidfile, 'r')
320347 try:
321348 pid = int(pf.read().strip())
322349 pf.close()
323350 except ValueError:
324 print "Failed to parse pid from pidfile %s" % pidfile
351 print("Failed to parse pid from pidfile %s" % pidfile)
325352 raise SystemExit(1)
326353 except IOError:
327 print "Could not read pidfile %s" % pidfile
354 print("Could not read pidfile %s" % pidfile)
328355 raise SystemExit(1)
329 print "Sending kill signal to pid %d" % pid
356 print("Sending kill signal to pid %d" % pid)
330357 try:
331358 os.kill(pid, 15)
332 except OSError, e:
359 except OSError as e:
333360 if e.errno == errno.ESRCH:
334 print "No process with pid %d running" % pid
361 print("No process with pid %d running" % pid)
335362 else:
336363 raise
337364
339366
340367 elif action == "status":
341368 if not exists(pidfile):
342 print "%s (instance %s) is not running" % (program, instance)
369 print("%s (instance %s) is not running" % (program, instance))
343370 raise SystemExit(1)
344371 pf = open(pidfile, "r")
345372 try:
346373 pid = int(pf.read().strip())
347374 pf.close()
348375 except ValueError:
349 print "Failed to parse pid from pidfile %s" % pidfile
376 print("Failed to parse pid from pidfile %s" % pidfile)
350377 raise SystemExit(1)
351378 except IOError:
352 print "Failed to read pid from %s" % pidfile
379 print("Failed to read pid from %s" % pidfile)
353380 raise SystemExit(1)
354381
355382 if _process_alive(pid):
356 print ("%s (instance %s) is running with pid %d" %
357 (program, instance, pid))
383 print("%s (instance %s) is running with pid %d" %
384 (program, instance, pid))
358385 raise SystemExit(0)
359386 else:
360 print "%s (instance %s) is not running" % (program, instance)
387 print("%s (instance %s) is not running" % (program, instance))
361388 raise SystemExit(1)
362389
363390 elif action == "start":
367394 pid = int(pf.read().strip())
368395 pf.close()
369396 except ValueError:
370 print "Failed to parse pid from pidfile %s" % pidfile
397 print("Failed to parse pid from pidfile %s" % pidfile)
371398 raise SystemExit(1)
372399 except IOError:
373 print "Could not read pidfile %s" % pidfile
400 print("Could not read pidfile %s" % pidfile)
374401 raise SystemExit(1)
375402 if _process_alive(pid):
376 print ("%s (instance %s) is already running with pid %d" %
377 (program, instance, pid))
403 print("%s (instance %s) is already running with pid %d" %
404 (program, instance, pid))
378405 raise SystemExit(1)
379406 else:
380 print "Removing stale pidfile %s" % pidfile
407 print("Removing stale pidfile %s" % pidfile)
381408 try:
382409 os.unlink(pidfile)
383410 except IOError:
384 print "Could not remove pidfile %s" % pidfile
411 print("Could not remove pidfile %s" % pidfile)
385412 # Try to create the PID directory
386413 else:
387414 if not os.path.exists(settings["PID_DIR"]):
388415 try:
389416 os.makedirs(settings["PID_DIR"])
390 except OSError as exc: # Python >2.5
417 except OSError as exc: # Python >2.5
391418 if exc.errno == errno.EEXIST and os.path.isdir(settings["PID_DIR"]):
392419 pass
393420 else:
394421 raise
395422
396
397
398 print "Starting %s (instance %s)" % (program, instance)
423 print("Starting %s (instance %s)" % (program, instance))
399424
400425 else:
401 print "Invalid action '%s'" % action
402 print "Valid actions: start stop status"
426 print("Invalid action '%s'" % action)
427 print("Valid actions: start stop status")
403428 raise SystemExit(1)
404429
405430
408433 optParameters = [
409434 ["rules", "", None, "Use the given aggregation rules file."],
410435 ["rewrite-rules", "", None, "Use the given rewrite rules file."],
411 ] + CarbonCacheOptions.optParameters
436 ] + CarbonCacheOptions.optParameters
412437
413438 def postOptions(self):
414439 CarbonCacheOptions.postOptions(self)
427452 optParameters = [
428453 ["rules", "", None, "Use the given relay rules file."],
429454 ["aggregation-rules", "", None, "Use the given aggregation rules file."],
430 ] + CarbonCacheOptions.optParameters
455 ] + CarbonCacheOptions.optParameters
431456
432457 def postOptions(self):
433458 CarbonCacheOptions.postOptions(self)
441466
442467 router = settings["RELAY_METHOD"]
443468 if router not in DatapointRouter.plugins:
444 print ("In carbon.conf, RELAY_METHOD must be one of %s. "
445 "Invalid value: '%s'" % (', '.join(DatapointRouter.plugins), router))
469 print("In carbon.conf, RELAY_METHOD must be one of %s. "
470 "Invalid value: '%s'" % (', '.join(DatapointRouter.plugins), router))
446471 raise SystemExit(1)
447472
448473
490515 "--instance",
491516 default='a',
492517 help="Manage a specific carbon instance")
493
518 parser.add_option(
519 "--logfile",
520 default=None,
521 help="Log to a specified file, - for stdout")
522 parser.add_option(
523 "--logger",
524 default=None,
525 help="A fully-qualified name to a log observer factory to use for the initial log "
526 "observer. Takes precedence over --logfile and --syslog (when available).")
494527 return parser
495528
496529
497530 def get_parser(name):
498531 parser = get_default_parser()
499 if name == "carbon-aggregator":
532 if "carbon-aggregator" in name:
500533 parser.add_option(
501534 "--rules",
502535 default=None,
548581 graphite_root = os.environ.get('GRAPHITE_ROOT')
549582 if graphite_root is None:
550583 raise CarbonConfigException("Either ROOT_DIR or GRAPHITE_ROOT "
551 "needs to be provided.")
584 "needs to be provided.")
552585
553586 # Default config directory to root-relative, unless overriden by the
554587 # 'GRAPHITE_CONF_DIR' environment variable.
580613 settings.setdefault(
581614 "WHITELISTS_DIR", join(settings["STORAGE_DIR"], "lists"))
582615
583
584
585616 # Read configuration options from program-specific section.
586617 section = program[len("carbon-"):]
587618 config = options["config"]
601632 "%s:%s" % (section, options["instance"]))
602633 settings["pidfile"] = (
603634 options["pidfile"] or
604 join(settings["PID_DIR"], "%s-%s.pid" %
605 (program, options["instance"])))
606 settings["LOG_DIR"] = (options["logdir"] or
607 join(settings["LOG_DIR"],
608 "%s-%s" % (program, options["instance"])))
635 join(settings["PID_DIR"], "%s-%s.pid" % (program, options["instance"])))
636 settings["LOG_DIR"] = (
637 options["logdir"] or
638 join(settings["LOG_DIR"], "%s-%s" % (program, options["instance"])))
609639 else:
610640 settings["pidfile"] = (
611 options["pidfile"] or
612 join(settings["PID_DIR"], '%s.pid' % program))
641 options["pidfile"] or join(settings["PID_DIR"], '%s.pid' % program))
613642 settings["LOG_DIR"] = (options["logdir"] or settings["LOG_DIR"])
614643
615644 update_STORAGE_DIR_deps()
1212 limitations under the License."""
1313
1414 import os
15 import time
16
1517 from os.path import exists, dirname, join, sep
16 from carbon.util import PluginRegistrar
18 from carbon.util import PluginRegistrar, TaggedSeries
1719 from carbon import log
18
19
20 class TimeSeriesDatabase(object):
20 from six import with_metaclass
21
22
23 class TimeSeriesDatabase(with_metaclass(PluginRegistrar, object)):
2124 "Abstract base class for Carbon database backends."
22 __metaclass__ = PluginRegistrar
2325 plugins = {}
2426
2527 "List of supported aggregation methods for the database."
2628 aggregationMethods = []
2729
30 def __init__(self, settings):
31 self.graphite_url = settings.GRAPHITE_URL
32
2833 def write(self, metric, datapoints):
2934 "Persist datapoints in the database for metric."
30 raise NotImplemented()
35 raise NotImplementedError()
3136
3237 def exists(self, metric):
3338 "Return True if the given metric path exists, False otherwise."
34 raise NotImplemented()
39 raise NotImplementedError()
3540
3641 def create(self, metric, retentions, xfilesfactor, aggregation_method):
3742 "Create an entry in the database for metric using options."
38 raise NotImplemented()
43 raise NotImplementedError()
3944
4045 def getMetadata(self, metric, key):
4146 "Lookup metric metadata."
42 raise NotImplemented()
47 raise NotImplementedError()
4348
4449 def setMetadata(self, metric, key, value):
4550 "Modify metric metadata."
46 raise NotImplemented()
51 raise NotImplementedError()
4752
4853 def getFilesystemPath(self, metric):
4954 "Return filesystem path for metric, defaults to None."
5257 def validateArchiveList(self, archiveList):
5358 "Validate that the database can handle the given archiveList."
5459 pass
60
61 def tag(self, *metrics):
62 from carbon.http import httpRequest
63
64 log.debug("Tagging %s" % ', '.join(metrics), type='tagdb')
65 t = time.time()
66
67 try:
68 httpRequest(
69 self.graphite_url + '/tags/tagMultiSeries',
70 [('path', metric) for metric in metrics]
71 )
72 log.debug("Tagged %s in %s" % (', '.join(metrics), time.time() - t), type='tagdb')
73 except Exception as err:
74 log.msg("Error tagging %s: %s" % (', '.join(metrics), err), type='tagdb')
5575
5676
5777 try:
6484 aggregationMethods = whisper.aggregationMethods
6585
6686 def __init__(self, settings):
87 super(WhisperDatabase, self).__init__(settings)
88
6789 self.data_dir = settings.LOCAL_DATA_DIR
90 self.tag_hash_filenames = settings.TAG_HASH_FILENAMES
6891 self.sparse_create = settings.WHISPER_SPARSE_CREATE
6992 self.fallocate_create = settings.WHISPER_FALLOCATE_CREATE
7093 if settings.WHISPER_AUTOFLUSH:
92115 else:
93116 log.err("WHISPER_FADVISE_RANDOM is enabled but import of ftools module failed.")
94117 except AttributeError:
95 log.err("WHISPER_FADVISE_RANDOM is enabled but skipped because it is not compatible with the version of Whisper.")
118 log.err("WHISPER_FADVISE_RANDOM is enabled but skipped because it is not compatible " +
119 "with the version of Whisper.")
96120
97121 def write(self, metric, datapoints):
98122 path = self.getFilesystemPath(metric)
99123 whisper.update_many(path, datapoints)
100124
101125 def exists(self, metric):
102 return exists(self.getFilesystemPath(metric))
126 if exists(self.getFilesystemPath(metric)):
127 return True
128 # if we're using hashed filenames and a non-hashed file exists then move it to the new name
129 if self.tag_hash_filenames and exists(self._getFilesystemPath(metric, False)):
130 os.rename(self._getFilesystemPath(metric, False), self.getFilesystemPath(metric))
131 return True
132 return False
103133
104134 def create(self, metric, retentions, xfilesfactor, aggregation_method):
105135 path = self.getFilesystemPath(metric)
107137 try:
108138 if not exists(directory):
109139 os.makedirs(directory)
110 except OSError, e:
140 except OSError as e:
111141 log.err("%s" % e)
112142
113143 whisper.create(path, retentions, xfilesfactor, aggregation_method,
128158 return whisper.setAggregationMethod(wsp_path, value)
129159
130160 def getFilesystemPath(self, metric):
131 metric_path = metric.replace('.', sep).lstrip(sep) + '.wsp'
132 return join(self.data_dir, metric_path)
161 return self._getFilesystemPath(metric, self.tag_hash_filenames)
162
163 def _getFilesystemPath(self, metric, tag_hash_filenames):
164 return join(
165 self.data_dir,
166 TaggedSeries.encode(metric, sep, hash_only=tag_hash_filenames) + '.wsp'
167 )
133168
134169 def validateArchiveList(self, archiveList):
135170 try:
136171 whisper.validateArchiveList(archiveList)
137 except whisper.InvalidConfiguration, e:
172 except whisper.InvalidConfiguration as e:
138173 raise ValueError("%s" % e)
139174
140175
145180 else:
146181 class CeresDatabase(TimeSeriesDatabase):
147182 plugin_name = 'ceres'
148 aggregationMethods = ['average','sum','last','max','min']
183 aggregationMethods = ['average', 'sum', 'last', 'max', 'min']
149184
150185 def __init__(self, settings):
186 super(CeresDatabase, self).__init__(settings)
187
151188 self.data_dir = settings.LOCAL_DATA_DIR
189 self.tag_hash_filenames = settings.TAG_HASH_FILENAMES
152190 ceres.setDefaultNodeCachingBehavior(settings.CERES_NODE_CACHING_BEHAVIOR)
153191 ceres.setDefaultSliceCachingBehavior(settings.CERES_SLICE_CACHING_BEHAVIOR)
154192 ceres.MAX_SLICE_GAP = int(settings.CERES_MAX_SLICE_GAP)
162200
163201 self.tree = ceres.CeresTree(self.data_dir)
164202
203 def encode(self, metric, tag_hash_filenames=None):
204 if tag_hash_filenames is None:
205 tag_hash_filenames = self.tag_hash_filenames
206 return TaggedSeries.encode(metric, hash_only=tag_hash_filenames)
207
165208 def write(self, metric, datapoints):
166 self.tree.store(metric, datapoints)
209 self.tree.store(self.encode(metric), datapoints)
167210
168211 def exists(self, metric):
169 return self.tree.hasNode(metric)
212 if self.tree.hasNode(self.encode(metric)):
213 return True
214 # if we're using hashed filenames and a non-hashed file exists then move it to the new name
215 if self.tag_hash_filenames and self.tree.hasNode(self.encode(metric, False)):
216 os.rename(self._getFilesystemPath(metric, False), self.getFilesystemPath(metric))
217 return True
218 return False
170219
171220 def create(self, metric, retentions, xfilesfactor, aggregation_method):
172 self.tree.createNode(metric, retentions=retentions,
221 self.tree.createNode(self.encode(metric),
222 retentions=retentions,
173223 timeStep=retentions[0][0],
174224 xFilesFactor=xfilesfactor,
175225 aggregationMethod=aggregation_method)
176226
177227 def getMetadata(self, metric, key):
178 return self.tree.getNode(metric).readMetadata()[key]
228 return self.tree.getNode(self.encode(metric)).readMetadata()[key]
179229
180230 def setMetadata(self, metric, key, value):
181 node = self.tree.getNode(metric)
231 node = self.tree.getNode(self.encode(metric))
182232 metadata = node.readMetadata()
183233 metadata[key] = value
184234 node.writeMetadata(metadata)
185235
186236 def getFilesystemPath(self, metric):
187 return self.tree.getFilesystemPath(metric)
237 return self._getFilesystemPath(metric, self.tag_hash_filenames)
238
239 def _getFilesystemPath(self, metric, tag_hash_filenames):
240 return self.tree.getFilesystemPath(self.encode(metric, tag_hash_filenames))
0 from twisted.python.failure import Failure
1
2
30 class Event:
41 def __init__(self, name):
52 self.name = name
1815 try:
1916 handler(*args, **kwargs)
2017 except Exception:
21 log.err(None, "Exception in %s event handler: args=%s kwargs=%s" % (self.name, args, kwargs))
18 log.err(
19 None, "Exception in %s event handler: args=%s kwargs=%s" % (self.name, args, kwargs))
2220
2321
2422 metricReceived = Event('metricReceived')
2927 resumeReceivingMetrics = Event('resumeReceivingMetrics')
3028
3129 # Default handlers
32 metricReceived.addHandler(lambda metric, datapoint: state.instrumentation.increment('metricsReceived'))
30 metricReceived.addHandler(
31 lambda metric, datapoint: state.instrumentation.increment('metricsReceived'))
3332
3433
3534 cacheFull.addHandler(lambda: state.instrumentation.increment('cache.overflow'))
4140
4241
4342 # Avoid import circularities
44 from carbon import log, state
43 from carbon import log, state # NOQA
0 from hashlib import md5
1 import bisect
2 import sys
3
04 try:
1 from hashlib import md5
5 import mmh3
26 except ImportError:
3 from md5 import md5
4 import bisect
7 mmh3 = None
58
69 try:
710 import pyhash
811 hasher = pyhash.fnv1a_32()
9 def fnv32a(string, seed=0x811c9dc5):
10 return hasher(string, seed=seed)
12
13 def fnv32a(data, seed=0x811c9dc5):
14 return hasher(data, seed=seed)
1115 except ImportError:
12 def fnv32a(string, seed=0x811c9dc5):
16 def fnv32a(data, seed=0x811c9dc5):
1317 """
1418 FNV-1a Hash (http://isthe.com/chongo/tech/comp/fnv/) in Python.
1519 Taken from https://gist.github.com/vaiorabbit/5670985
1721 hval = seed
1822 fnv_32_prime = 0x01000193
1923 uint32_max = 2 ** 32
20 for s in string:
21 hval = hval ^ ord(s)
22 hval = (hval * fnv_32_prime) % uint32_max
24 if sys.version_info >= (3, 0):
25 # data is a bytes object, s is an integer
26 for s in data:
27 hval = hval ^ s
28 hval = (hval * fnv_32_prime) % uint32_max
29 else:
30 # data is an str object, s is a single character
31 for s in data:
32 hval = hval ^ ord(s)
33 hval = (hval * fnv_32_prime) % uint32_max
2334 return hval
35
36
37 def compactHash(string):
38 return md5(string.encode('utf-8')).hexdigest()
39
40
41 def carbonHash(key, hash_type):
42 if hash_type == 'fnv1a_ch':
43 big_hash = int(fnv32a(key.encode('utf-8')))
44 small_hash = (big_hash >> 16) ^ (big_hash & 0xffff)
45 elif hash_type == 'mmh3_ch':
46 if mmh3 is None:
47 raise Exception('Install "mmh3" to use this hashing function.')
48 small_hash = mmh3.hash(key)
49 else:
50 big_hash = compactHash(key)
51 small_hash = int(big_hash[:4], 16)
52 return small_hash
53
2454
2555 class ConsistentHashRing:
2656 def __init__(self, nodes, replica_count=100, hash_type='carbon_ch'):
2757 self.ring = []
58 self.ring_len = len(self.ring)
2859 self.nodes = set()
60 self.nodes_len = len(self.nodes)
2961 self.replica_count = replica_count
3062 self.hash_type = hash_type
3163 for node in nodes:
3264 self.add_node(node)
3365
3466 def compute_ring_position(self, key):
35 if self.hash_type == 'fnv1a_ch':
36 big_hash = '{:x}'.format(int(fnv32a( str(key) )))
37 small_hash = int(big_hash[:4], 16) ^ int(big_hash[4:], 16)
38 else:
39 big_hash = md5(str(key)).hexdigest()
40 small_hash = int(big_hash[:4], 16)
41 return small_hash
67 return carbonHash(key, self.hash_type)
4268
43 def add_node(self, node):
44 self.nodes.add(node)
69 def add_node(self, key):
70 self.nodes.add(key)
71 self.nodes_len = len(self.nodes)
4572 for i in range(self.replica_count):
4673 if self.hash_type == 'fnv1a_ch':
47 replica_key = "%d-%s" % (i, node[1])
74 replica_key = "%d-%s" % (i, key[1])
4875 else:
49 replica_key = "%s:%d" % (node, i)
76 replica_key = "%s:%d" % (key, i)
5077 position = self.compute_ring_position(replica_key)
5178 while position in [r[0] for r in self.ring]:
5279 position = position + 1
53 entry = (position, node)
80 entry = (position, key)
5481 bisect.insort(self.ring, entry)
82 self.ring_len = len(self.ring)
5583
56 def remove_node(self, node):
57 self.nodes.discard(node)
58 self.ring = [entry for entry in self.ring if entry[1] != node]
84 def remove_node(self, key):
85 self.nodes.discard(key)
86 self.nodes_len = len(self.nodes)
87 self.ring = [entry for entry in self.ring if entry[1] != key]
88 self.ring_len = len(self.ring)
5989
6090 def get_node(self, key):
6191 assert self.ring
62 node = None
63 node_iter = self.get_nodes(key)
64 node = node_iter.next()
65 node_iter.close()
66 return node
92 position = self.compute_ring_position(key)
93 search_entry = (position, ())
94 index = bisect.bisect_left(self.ring, search_entry) % self.ring_len
95 entry = self.ring[index]
96 return entry[1]
6797
6898 def get_nodes(self, key):
69 assert self.ring
70 if len(self.nodes) == 1:
71 # short circuit in simple 1-node case
99 nodes = set()
100 if not self.ring:
101 return
102 if self.nodes_len == 1:
72103 for node in self.nodes:
73104 yield node
74 return
75 nodes = set()
76105 position = self.compute_ring_position(key)
77 search_entry = (position, None)
78 index = bisect.bisect_left(self.ring, search_entry) % len(self.ring)
79 last_index = (index - 1) % len(self.ring)
80 while len(nodes) < len(self.nodes) and index != last_index:
106 search_entry = (position, ())
107 index = bisect.bisect_left(self.ring, search_entry) % self.ring_len
108 last_index = (index - 1) % self.ring_len
109 nodes_len = len(nodes)
110 while nodes_len < self.nodes_len and index != last_index:
81111 next_entry = self.ring[index]
82112 (position, next_node) = next_entry
83113 if next_node not in nodes:
84114 nodes.add(next_node)
115 nodes_len += 1
85116 yield next_node
86117
87 index = (index + 1) % len(self.ring)
118 index = (index + 1) % self.ring_len
0 import urllib3
1
2 # async http client connection pool
3 http = urllib3.PoolManager()
4
5
6 def httpRequest(url, values=None, headers=None, method='POST', timeout=5):
7 try:
8 result = http.request(
9 method,
10 url,
11 fields=values,
12 headers=headers,
13 timeout=timeout)
14 except BaseException as err:
15 raise Exception("Error requesting %s: %s" % (url, err))
16
17 if result.status != 200:
18 raise Exception("Error response %d from %s" % (result.status, url))
19
20 return result.data
2020
2121 # TODO(chrismd) refactor the graphite metrics hierarchy to be cleaner,
2222 # more consistent, and make room for frontend metrics.
23 #metric_prefix = "Graphite.backend.%(program)s.%(instance)s." % settings
23 # metric_prefix = "Graphite.backend.%(program)s.%(instance)s." % settings
2424
2525
2626 def increment(stat, increase=1):
2828 stats[stat] += increase
2929 except KeyError:
3030 stats[stat] = increase
31
3132
3233 def max(stat, newval):
3334 try:
3637 except KeyError:
3738 stats[stat] = newval
3839
40
3941 def append(stat, value):
4042 try:
4143 stats[stat].append(value)
6567
6668
6769 def getMemUsage():
68 rss_pages = int(open('/proc/self/statm').read().split()[1])
70 with open('/proc/self/statm') as statm:
71 rss_pages = int(statm.read().split()[1])
6972 return rss_pages * PAGESIZE
7073
7174
7780 stats.clear()
7881
7982 # cache metrics
80 if settings.program == 'carbon-cache':
83 if 'cache' in settings.program:
8184 record = cache_record
8285 updateTimes = myStats.get('updateTimes', [])
8386 committedPoints = myStats.get('committedPoints', 0)
118121 record('cache.overflow', cacheOverflow)
119122
120123 # aggregator metrics
121 elif settings.program == 'carbon-aggregator':
124 elif 'aggregator' in settings.program:
122125 record = aggregator_record
123126 record('allocatedBuffers', len(BufferManager))
124127 record('bufferedDatapoints',
132135 # shared relay stats for relays & aggregators
133136 if settings.program in ['carbon-aggregator', 'carbon-relay']:
134137 prefix = 'destinations.'
135 relay_stats = [(k,v) for (k,v) in myStats.items() if k.startswith(prefix)]
138 relay_stats = [(k, v) for (k, v) in myStats.items() if k.startswith(prefix)]
136139 for stat_name, stat_value in relay_stats:
137140 record(stat_name, stat_value)
138141 # Preserve the count of sent metrics so that the ratio of
139142 # received : sent can be checked per-relay to determine the
140143 # health of the destination.
141 if stat_name.endswith('.sent'):
144 if stat_name.endswith('.sent') or stat_name.endswith('.attemptedRelays'):
142145 myPriorStats[stat_name] = stat_value
143146
144147 # common metrics
205208
206209
207210 # Avoid import circularities
208 from carbon import state, events, cache
209 from carbon.aggregator.buffers import BufferManager
211 from carbon import state, events, cache # NOQA
212 from carbon.aggregator.buffers import BufferManager # NOQA
00 import os
11 import time
2 from sys import stdout, stderr
3 from zope.interface import implements
4 from twisted.python.log import startLoggingWithObserver, textFromEventDict, msg, err, ILogObserver
2 from sys import stdout
3 from zope.interface import implementer
4 from twisted.python.log import startLoggingWithObserver, textFromEventDict, msg, err, ILogObserver # NOQA
55 from twisted.python.syslog import SyslogObserver
66 from twisted.python.logfile import DailyLogFile
77
1818 """
1919 Fix Umask Issue https://twistedmatrix.com/trac/ticket/7026
2020 """
21 openMode = self.defaultMode or 0777
21 openMode = self.defaultMode or 0o777
2222 self._file = os.fdopen(os.open(
23 self.path, os.O_CREAT|os.O_RDWR, openMode), 'r+', 1)
23 self.path, os.O_CREAT | os.O_RDWR, openMode), 'rb+', 1)
2424 self.closed = False
2525 # Try our best to update permissions for files which already exist.
2626 if self.defaultMode:
4747 else:
4848 path_stat = os.stat(self.path)
4949 fd_stat = os.fstat(self._file.fileno())
50 if not (path_stat.st_ino == fd_stat.st_ino
51 and path_stat.st_dev == fd_stat.st_dev):
50 if not (path_stat.st_ino == fd_stat.st_ino and path_stat.st_dev == fd_stat.st_dev):
5251 self.reopen()
5352 DailyLogFile.write(self, data)
5453
5857 self._openFile()
5958
6059
60 @implementer(ILogObserver)
6161 class CarbonLogObserver(object):
62 implements(ILogObserver)
62
63 def __init__(self):
64 self._raven_client = None
65
66 def raven_client(self):
67 if self._raven_client is not None:
68 return self._raven_client
69
70 # Import here to avoid dependency hell.
71 try:
72 import raven
73 except ImportError:
74 return None
75 from carbon.conf import settings
76
77 if settings.RAVEN_DSN is None:
78 return None
79 self._raven_client = raven.Client(dsn=settings.RAVEN_DSN)
80 return self._raven_client
81
82 def log_to_raven(self, event):
83 if not event.get('isError') or 'failure' not in event:
84 return
85 client = self.raven_client()
86 if client is None:
87 return
88 f = event['failure']
89 client.captureException(
90 (f.type, f.value, f.getTracebackObject())
91 )
6392
6493 def log_to_dir(self, logdir):
6594 self.logdir = logdir
76105 self.observer = syslog_observer
77106
78107 def __call__(self, event):
108 self.log_to_raven(event)
79109 return self.observer(event)
80110
81 def stdout_observer(self, event):
111 @staticmethod
112 def stdout_observer(event):
82113 stdout.write(formatEvent(event, includeType=True) + '\n')
83114 stdout.flush()
84115
95126
96127 # Default to stdout
97128 observer = stdout_observer
98
129
99130
100131 carbonLogObserver = CarbonLogObserver()
101132
166197 if debugEnabled:
167198 msg(message, **context)
168199
200
169201 debugEnabled = False
170202
171203
00 import traceback
11 from carbon import log, state
2
23
34 def getMetadata(metric, key):
45 try:
00 from carbon.util import PluginRegistrar
11 from carbon import state, log
2 from six import with_metaclass
23
34
4 class Processor(object):
5 __metaclass__ = PluginRegistrar
5 class Processor(with_metaclass(PluginRegistrar, object)):
66 plugins = {}
77 NO_OUTPUT = ()
88
00 import time
1 import socket
2 import sys
13
24 from twisted.internet.protocol import ServerFactory, DatagramProtocol
3 from twisted.application.internet import TCPServer, UDPServer
5 # from twisted.application.internet import TCPServer, UDPServer
6 from twisted.application import service
47 from twisted.internet.error import ConnectionDone
8 from twisted.internet import reactor, tcp, udp
59 from twisted.protocols.basic import LineOnlyReceiver, Int32StringReceiver
610 from twisted.protocols.policies import TimeoutMixin
711 from carbon import log, events, state, management
913 from carbon.regexlist import WhiteList, BlackList
1014 from carbon.util import pickle, get_unpickler
1115 from carbon.util import PluginRegistrar
16 from six import with_metaclass
17 from carbon.util import enableTcpKeepAlive
18
19
20 def checkIfAcceptingConnections():
21 clients = len(state.connectedMetricReceiverProtocols)
22 max_clients = settings.MAX_RECEIVER_CONNECTIONS
23
24 if clients < max_clients:
25 for port in state.listeningPorts:
26 if port.paused:
27 log.listener(
28 "Resuming %s (%d/%d connections)" % (port, clients, max_clients))
29 port.resumeProducing()
30 port.paused = False
31 else:
32 for port in state.listeningPorts:
33 if not port.paused:
34 log.listener(
35 "Pausing %s (%d/%d connections)" % (port, clients, max_clients))
36 port.pauseProducing()
37 port.paused = True
1238
1339
1440 class CarbonReceiverFactory(ServerFactory):
41
1542 def buildProtocol(self, addr):
16 from carbon.conf import settings
17
18 # Don't establish the connection if we have reached the limit.
19 if len(state.connectedMetricReceiverProtocols) < settings.MAX_RECEIVER_CONNECTIONS:
43 clients = len(state.connectedMetricReceiverProtocols)
44 max_clients = settings.MAX_RECEIVER_CONNECTIONS
45
46 if clients < max_clients:
2047 return ServerFactory.buildProtocol(self, addr)
2148 else:
2249 return None
2350
2451
25 class CarbonServerProtocol(object):
26 __metaclass__ = PluginRegistrar
52 class CarbonService(service.Service):
53 """Create our own socket to support SO_REUSEPORT.
54 To be removed when twisted supports it natively
55 See: https://github.com/twisted/twisted/pull/759.
56 """
57 factory = None
58 protocol = None
59
60 def __init__(self, interface, port, protocol, factory):
61 self.protocol = protocol
62 self.factory = factory
63 self.interface = interface
64 self.port = port
65
66 def startService(self):
67 # use socket creation from twisted to use the same options as before
68 if hasattr(self.protocol, 'datagramReceived'):
69 tmp_port = udp.Port(None, None, interface=self.interface)
70 else:
71 tmp_port = tcp.Port(None, None, interface=self.interface)
72 carbon_sock = tmp_port.createInternetSocket()
73 if hasattr(socket, 'SO_REUSEPORT'):
74 carbon_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
75 carbon_sock.bind((self.interface, self.port))
76
77 if hasattr(self.protocol, 'datagramReceived'):
78 self._port = reactor.adoptDatagramPort(
79 carbon_sock.fileno(), socket.AF_INET, self.protocol())
80 else:
81 carbon_sock.listen(tmp_port.backlog)
82 self._port = reactor.adoptStreamPort(
83 carbon_sock.fileno(), socket.AF_INET, self.factory)
84 state.listeningPorts.append(self._port)
85 self._port.paused = False
86 carbon_sock.close()
87
88 def stopService(self):
89 self._port.stopListening()
90
91
92 class CarbonServerProtocol(with_metaclass(PluginRegistrar, object)):
2793 plugins = {}
2894
2995 @classmethod
37103 return
38104
39105 if hasattr(protocol, 'datagramReceived'):
40 service = UDPServer(port, protocol(), interface=interface)
41 service.setServiceParent(root_service)
106 service = CarbonService(interface, port, protocol, None)
42107 else:
43108 factory = CarbonReceiverFactory()
44109 factory.protocol = protocol
45 service = TCPServer(port, factory, interface=interface)
46 service.setServiceParent(root_service)
110 service = CarbonService(interface, port, protocol, factory)
111 service.setServiceParent(root_service)
47112
48113
49114 class MetricReceiver(CarbonServerProtocol, TimeoutMixin):
53118
54119 def connectionMade(self):
55120 self.setTimeout(settings.METRIC_CLIENT_IDLE_TIMEOUT)
121 enableTcpKeepAlive(self.transport, settings.TCP_KEEPALIVE, settings)
56122 self.peerName = self.getPeerName()
123
57124 if settings.LOG_LISTENER_CONN_SUCCESS:
58125 log.listener("%s connection with %s established" % (
59126 self.__class__.__name__, self.peerName))
62129 self.pauseReceiving()
63130
64131 state.connectedMetricReceiverProtocols.add(self)
132 checkIfAcceptingConnections()
65133 if settings.USE_FLOW_CONTROL:
66134 events.pauseReceivingMetrics.addHandler(self.pauseReceiving)
67135 events.resumeReceivingMetrics.addHandler(self.resumeReceiving)
82150 def connectionLost(self, reason):
83151 if reason.check(ConnectionDone):
84152 if settings.LOG_LISTENER_CONN_SUCCESS:
85 log.listener("%s connection with %s closed cleanly" % (self.__class__.__name__, self.peerName))
86
87 else:
88 log.listener("%s connection with %s lost: %s" % (self.__class__.__name__, self.peerName, reason.value))
153 log.listener(
154 "%s connection with %s closed cleanly" % (self.__class__.__name__, self.peerName))
155
156 else:
157 log.listener(
158 "%s connection with %s lost: %s" % (self.__class__.__name__, self.peerName, reason.value))
89159
90160 state.connectedMetricReceiverProtocols.remove(self)
161 checkIfAcceptingConnections()
91162 if settings.USE_FLOW_CONTROL:
92163 events.pauseReceivingMetrics.removeHandler(self.pauseReceiving)
93164 events.resumeReceivingMetrics.removeHandler(self.resumeReceiving)
101172 return
102173 if datapoint[1] != datapoint[1]: # filter out NaN values
103174 return
104 if int(datapoint[0]) == -1: # use current time if none given: https://github.com/graphite-project/carbon/issues/54
175 # use current time if none given: https://github.com/graphite-project/carbon/issues/54
176 if int(datapoint[0]) == -1:
105177 datapoint = (time.time(), datapoint[1])
106178 res = settings.MIN_TIMESTAMP_RESOLUTION
107179 if res:
112184
113185 class MetricLineReceiver(MetricReceiver, LineOnlyReceiver):
114186 plugin_name = "line"
115 delimiter = '\n'
187 delimiter = b'\n'
116188
117189 def lineReceived(self, line):
190 if sys.version_info >= (3, 0):
191 line = line.decode('utf-8')
192
118193 try:
119194 metric, value, timestamp = line.strip().split()
120195 datapoint = (float(timestamp), float(value))
121196 except ValueError:
122197 if len(line) > 400:
123198 line = line[:400] + '...'
124 log.listener('invalid line received from client %s, ignoring [%s]' % (self.peerName, line.strip().encode('string_escape')))
199 log.listener('invalid line received from client %s, ignoring [%s]' %
200 (self.peerName, repr(line.strip())[1:-1]))
125201 return
126202
127203 self.metricReceived(metric, datapoint)
137213
138214 super(MetricDatagramReceiver, cls).build(root_service)
139215
140 def datagramReceived(self, data, (host, port)):
216 def datagramReceived(self, data, addr):
217 (host, _) = addr
218 if sys.version_info >= (3, 0):
219 data = data.decode('utf-8')
220
141221 for line in data.splitlines():
142222 try:
143223 metric, value, timestamp = line.strip().split()
147227 except ValueError:
148228 if len(line) > 400:
149229 line = line[:400] + '...'
150 log.listener('invalid line received from %s, ignoring [%s]' % (host, line.strip().encode('string_escape')))
230 log.listener('invalid line received from %s, ignoring [%s]' %
231 (host, repr(line.strip())[1:-1]))
151232
152233
153234 class MetricPickleReceiver(MetricReceiver, Int32StringReceiver):
161242 def stringReceived(self, data):
162243 try:
163244 datapoints = self.unpickler.loads(data)
164 except pickle.UnpicklingError:
245 # Pickle can throw a wide range of exceptions
246 except (pickle.UnpicklingError, ValueError, IndexError, ImportError, KeyError):
165247 log.listener('invalid pickle received from %s, ignoring' % self.peerName)
166248 return
167249
168250 for raw in datapoints:
169251 try:
170252 (metric, (value, timestamp)) = raw
171 except Exception, e:
253 except Exception as e:
172254 log.listener('Error decoding pickle: %s' % e)
255 continue
173256
174257 try:
175258 datapoint = (float(value), float(timestamp)) # force proper types
176 except ValueError:
259 except (ValueError, TypeError):
177260 continue
178261
262 # convert python2 unicode objects to str/bytes
263 if not isinstance(metric, str):
264 metric = metric.encode('utf-8')
265
179266 self.metricReceived(metric, datapoint)
180267
181268
182269 class CacheManagementHandler(Int32StringReceiver):
183 MAX_LENGTH = 1024 ** 3 # 1mb
270 MAX_LENGTH = 1024 ** 3 # 1mb
184271
185272 def connectionMade(self):
186273 peer = self.transport.getPeer()
199286 cache = MetricCache()
200287 if request['type'] == 'cache-query':
201288 metric = request['metric']
202 datapoints = cache.get(metric, {}).items()
289 datapoints = list(cache.get(metric, {}).items())
203290 result = dict(datapoints=datapoints)
204291 if settings.LOG_CACHE_HITS:
205 log.query('[%s] cache query for \"%s\" returned %d values' % (self.peerAddr, metric, len(datapoints)))
292 log.query('[%s] cache query for \"%s\" returned %d values' % (
293 self.peerAddr, metric, len(datapoints)
294 ))
206295 instrumentation.increment('cacheQueries')
207296
208297 elif request['type'] == 'cache-query-bulk':
209298 datapointsByMetric = {}
210299 metrics = request['metrics']
211300 for metric in metrics:
212 datapointsByMetric[metric] = cache.get(metric, {}).items()
301 datapointsByMetric[metric] = list(cache.get(metric, {}).items())
213302
214303 result = dict(datapointsByMetric=datapointsByMetric)
215304
216305 if settings.LOG_CACHE_HITS:
217 log.query('[%s] cache query bulk for \"%d\" metrics returned %d values' %
218 (self.peerAddr, len(metrics), sum([len(datapoints) for datapoints in datapointsByMetric.values()])))
306 log.query('[%s] cache query bulk for \"%d\" metrics returned %d values' % (
307 self.peerAddr,
308 len(metrics),
309 sum([len(datapoints) for datapoints in datapointsByMetric.values()])
310 ))
219311 instrumentation.increment('cacheBulkQueries')
220312 instrumentation.append('cacheBulkQuerySize', len(metrics))
221313
228320 else:
229321 result = dict(error="Invalid request type \"%s\"" % request['type'])
230322
231 response = pickle.dumps(result, protocol=-1)
323 response = pickle.dumps(result, protocol=2)
232324 self.sendString(response)
233325
234326
235327 # Avoid import circularities
236 from carbon.cache import MetricCache
237 from carbon import instrumentation
328 from carbon.cache import MetricCache # NOQA
329 from carbon import instrumentation # NOQA
0 import time
10 import re
21 import os.path
32 from carbon import log
5655 def __nonzero__(self):
5756 return bool(self.regex_list)
5857
58 __bool__ = __nonzero__ # py2/3 compatibility
59
5960
6061 WhiteList = RegexList()
6162 BlackList = RegexList()
2424 for section in parser.sections():
2525 if not parser.has_option(section, 'destinations'):
2626 raise CarbonConfigException("Rules file %s section %s does not define a "
27 "'destinations' list" % (path, section))
27 "'destinations' list" % (path, section))
2828
2929 destination_strings = parser.get(section, 'destinations').split(',')
3030 destinations = parseDestinations(destination_strings)
3232 if parser.has_option(section, 'pattern'):
3333 if parser.has_option(section, 'default'):
3434 raise CarbonConfigException("Section %s contains both 'pattern' and "
35 "'default'. You must use one or the other." % section)
35 "'default'. You must use one or the other." % section)
3636 pattern = parser.get(section, 'pattern')
3737 regex = re.compile(pattern, re.I)
3838
3939 continue_matching = False
4040 if parser.has_option(section, 'continue'):
4141 continue_matching = parser.getboolean(section, 'continue')
42 rule = RelayRule(condition=regex.search, destinations=destinations, continue_matching=continue_matching)
42 rule = RelayRule(
43 condition=regex.search, destinations=destinations, continue_matching=continue_matching)
4344 rules.append(rule)
4445 continue
4546
5354
5455 if not defaultRule:
5556 raise CarbonConfigException("No default rule defined. You must specify exactly one "
56 "rule with 'default = true' instead of a pattern.")
57 "rule with 'default = true' instead of a pattern.")
5758
5859 rules.append(defaultRule)
5960 return rules
0 import random
1
2 from zope.interface import implementer
3
4 from twisted.internet._resolver import GAIResolver
5 from twisted.internet.defer import Deferred
6 from twisted.internet.address import IPv4Address
7 from twisted.internet.interfaces import IResolverSimple, IResolutionReceiver
8 from twisted.internet.error import DNSLookupError
9
10
11 # Inspired from /twisted/internet/_resolver.py
12 @implementer(IResolutionReceiver)
13 class RandomWins(object):
14 """
15 An L{IResolutionReceiver} which fires a L{Deferred} with a random result.
16 """
17
18 def __init__(self, deferred):
19 """
20 @param deferred: The L{Deferred} to fire with one resolution
21 result arrives.
22 """
23 self._deferred = deferred
24 self._results = []
25
26 def resolutionBegan(self, resolution):
27 """
28 See L{IResolutionReceiver.resolutionBegan}
29 @param resolution: See L{IResolutionReceiver.resolutionBegan}
30 """
31 self._resolution = resolution
32
33 def addressResolved(self, address):
34 """
35 See L{IResolutionReceiver.addressResolved}
36 @param address: See L{IResolutionReceiver.addressResolved}
37 """
38 self._results.append(address.host)
39
40 def resolutionComplete(self):
41 """
42 See L{IResolutionReceiver.resolutionComplete}
43 """
44 if self._results:
45 random.shuffle(self._results)
46 self._deferred.callback(self._results[0])
47 else:
48 self._deferred.errback(DNSLookupError(self._resolution.name))
49
50
51 @implementer(IResolverSimple)
52 class ComplexResolverSimplifier(object):
53 """
54 A converter from L{IHostnameResolver} to L{IResolverSimple}
55 """
56 def __init__(self, nameResolver):
57 """
58 Create a L{ComplexResolverSimplifier} with an L{IHostnameResolver}.
59 @param nameResolver: The L{IHostnameResolver} to use.
60 """
61 self._nameResolver = nameResolver
62
63 def getHostByName(self, name, timeouts=()):
64 """
65 See L{IResolverSimple.getHostByName}
66 @param name: see L{IResolverSimple.getHostByName}
67 @param timeouts: see L{IResolverSimple.getHostByName}
68 @return: see L{IResolverSimple.getHostByName}
69 """
70 result = Deferred()
71 self._nameResolver.resolveHostName(RandomWins(result), name, 0,
72 [IPv4Address])
73 return result
74
75
76 def setUpRandomResolver(reactor):
77 resolver = GAIResolver(reactor, reactor.getThreadPool)
78 reactor.installResolver(ComplexResolverSimplifier(resolver))
2121 yield (metric, datapoint)
2222
2323
24 class RewriteRuleManager:
24 class _RewriteRuleManager:
2525 def __init__(self):
2626 self.rulesets = defaultdict(list)
2727 self.rules_file = None
9393
9494
9595 # Ghetto singleton
96 RewriteRuleManager = RewriteRuleManager()
96 RewriteRuleManager = _RewriteRuleManager()
00 import imp
1 from carbon.hashing import ConsistentHashRing
1 from carbon.hashing import ConsistentHashRing, carbonHash
22 from carbon.util import PluginRegistrar
3
4
5 class DatapointRouter(object):
3 from six import with_metaclass
4 from six.moves import xrange
5
6
7 class DatapointRouter(with_metaclass(PluginRegistrar, object)):
68 "Abstract base class for datapoint routing logic implementations"
7 __metaclass__ = PluginRegistrar
89 plugins = {}
910
1011 def addDestination(self, destination):
1415 def removeDestination(self, destination):
1516 "destination is a (host, port, instance) triple"
1617 raise NotImplemented()
18
19 def hasDestination(self, destination):
20 "destination is a (host, port, instance) triple"
21 raise NotImplemented
22
23 def countDestinations(self):
24 "return number of configured destinations"
25 raise NotImplemented
1726
1827 def getDestinations(self, key):
1928 """Generate the destinations where the given routing key should map to. Only
4049
4150 def removeDestination(self, destination):
4251 self.destinations.discard(destination)
52
53 def hasDestination(self, destination):
54 return destination in self.destinations
55
56 def countDestinations(self):
57 return len(self.destinations)
4358
4459 def getDestinations(self, key):
4560 for rule in self.rules:
6176 self.replication_factor = int(replication_factor)
6277 self.diverse_replicas = diverse_replicas
6378 self.instance_ports = {} # { (server, instance) : port }
64 self.ring = ConsistentHashRing([])
79 hash_type = settings.ROUTER_HASH_TYPE or 'carbon_ch'
80 self.ring = ConsistentHashRing([], hash_type=hash_type)
6581
6682 def addDestination(self, destination):
6783 (server, port, instance) = destination
68 if (server, instance) in self.instance_ports:
84 if self.hasDestination(destination):
6985 raise Exception("destination instance (%s, %s) already configured" % (server, instance))
7086 self.instance_ports[(server, instance)] = port
7187 self.ring.add_node((server, instance))
7288
7389 def removeDestination(self, destination):
7490 (server, port, instance) = destination
75 if (server, instance) not in self.instance_ports:
91 if not self.hasDestination(destination):
7692 raise Exception("destination instance (%s, %s) not configured" % (server, instance))
7793 del self.instance_ports[(server, instance)]
7894 self.ring.remove_node((server, instance))
95
96 def hasDestination(self, destination):
97 (server, _, instance) = destination
98 return (server, instance) in self.instance_ports
99
100 def countDestinations(self):
101 return len(self.instance_ports)
79102
80103 def getDestinations(self, metric):
81104 key = self.getKey(metric)
112135 keyfunc = getattr(module, func_name)
113136 self.setKeyFunction(keyfunc)
114137
138
115139 class AggregatedConsistentHashingRouter(DatapointRouter):
116140 plugin_name = 'aggregated-consistent-hashing'
117141
129153
130154 def removeDestination(self, destination):
131155 self.hash_router.removeDestination(destination)
156
157 def hasDestination(self, destination):
158 return self.hash_router.hasDestination(destination)
159
160 def countDestinations(self):
161 return self.hash_router.countDestinations()
132162
133163 def getDestinations(self, key):
134164 # resolve metric to aggregate forms
154184 for destination in destinations:
155185 yield destination
156186
157 try:
158 import mmh3
159 except ImportError:
160 pass
161 else:
162 class FastHashRing(object):
163 """A very fast hash 'ring'.
164
165 Instead of trying to avoid rebalancing data when changing
166 the list of nodes we try to making routing as fast as we
167 can. It's good enough because the current rebalancing
168 tools performances depend on the total number of metrics
169 and not the number of metrics to rebalance.
170 """
171
172 def __init__(self):
173 self.nodes = set()
174 self.sorted_nodes = []
175
176 def _hash(self, key):
177 return mmh3.hash(key)
178
179 def _update_nodes(self):
180 self.sorted_nodes = sorted(
181 [(self._hash(str(n)), n) for n in self.nodes],
182 key=lambda v: v[0]
183 )
184
185 def add_node(self, node):
186 self.nodes.add(node)
187 self._update_nodes()
188
189 def remove_node(self, node):
190 self.nodes.discard(node)
191 self._update_nodes()
192
193 def get_nodes(self, key):
194 seed = self._hash(key) % len(self.nodes)
195
196 for n in xrange(seed, seed + len(self.nodes)):
197 yield self.sorted_nodes[n % len(self.sorted_nodes)][1]
198
199 class FastHashingRouter(ConsistentHashingRouter):
200 """Same as ConsistentHashingRouter but using FastHashRing."""
201 plugin_name = 'fast-hashing'
202
203 def __init__(self, settings):
204 super(FastHashingRouter, self).__init__(settings)
205 self.ring = FastHashRing()
206
207 class FastAggregatedHashingRouter(AggregatedConsistentHashingRouter):
208 """Same as AggregatedConsistentHashingRouter but using FastHashRing."""
209 plugin_name = 'fast-aggregated-hashing'
210
211 def __init__(self, settings):
212 super(FastAggregatedHashingRouter, self).__init__(settings)
213 self.hash_router.ring = FastHashRing()
187
188 class FastHashRing(object):
189 """A very fast hash 'ring'.
190
191 Instead of trying to avoid rebalancing data when changing
192 the list of nodes we try to making routing as fast as we
193 can. It's good enough because the current rebalancing
194 tools performances depend on the total number of metrics
195 and not the number of metrics to rebalance.
196 """
197
198 def __init__(self, settings):
199 self.nodes = set()
200 self.sorted_nodes = []
201 self.hash_type = settings.ROUTER_HASH_TYPE or 'mmh3_ch'
202
203 def _hash(self, key):
204 return carbonHash(key, self.hash_type)
205
206 def _update_nodes(self):
207 self.sorted_nodes = sorted(
208 [(self._hash(str(n)), n) for n in self.nodes],
209 key=lambda v: v[0]
210 )
211
212 def add_node(self, node):
213 self.nodes.add(node)
214 self._update_nodes()
215
216 def remove_node(self, node):
217 self.nodes.discard(node)
218 self._update_nodes()
219
220 def get_nodes(self, key):
221 if not self.nodes:
222 return
223
224 seed = self._hash(key) % len(self.nodes)
225
226 for n in xrange(seed, seed + len(self.nodes)):
227 yield self.sorted_nodes[n % len(self.sorted_nodes)][1]
228
229
230 class FastHashingRouter(ConsistentHashingRouter):
231 """Same as ConsistentHashingRouter but using FastHashRing."""
232 plugin_name = 'fast-hashing'
233
234 def __init__(self, settings):
235 super(FastHashingRouter, self).__init__(settings)
236 self.ring = FastHashRing(settings)
237
238
239 class FastAggregatedHashingRouter(AggregatedConsistentHashingRouter):
240 """Same as AggregatedConsistentHashingRouter but using FastHashRing."""
241 plugin_name = 'fast-aggregated-hashing'
242
243 def __init__(self, settings):
244 super(FastAggregatedHashingRouter, self).__init__(settings)
245 self.hash_router.ring = FastHashRing(settings)
1414 from os.path import exists
1515
1616 from twisted.application.service import MultiService
17 from twisted.application.internet import TCPServer, TCPClient
17 from twisted.application.internet import TCPServer
1818 from twisted.internet.protocol import ServerFactory
1919 from twisted.python.components import Componentized
2020 from twisted.python.log import ILogObserver
3636 except ImportError:
3737 pass
3838 try:
39 import carbon.protobuf
40 except ImportError, e:
39 import carbon.protobuf # NOQA
40 except ImportError:
4141 pass
4242
4343
7171
7272 def setupPipeline(pipeline, root_service, settings):
7373 state.pipeline_processors = []
74
7475 for processor in pipeline:
7576 args = []
7677 if ':' in processor:
9192 plugin_class = Processor.plugins[processor]
9293 state.pipeline_processors.append(plugin_class(*args))
9394
94 if processor == 'relay':
95 if processor in ['relay', 'write']:
9596 state.pipeline_processors_generated.append(plugin_class(*args))
96
9797
9898 events.metricReceived.addHandler(run_pipeline)
9999 events.metricGenerated.addHandler(run_pipeline_generated)
121121
122122 settings.RELAY_METHOD = 'consistent-hashing'
123123 root_service = createBaseService(config, settings)
124 setupPipeline(['rewrite:pre', 'aggregate', 'rewrite:post', 'relay'], root_service, settings)
124 setupPipeline(
125 ['rewrite:pre', 'aggregate', 'rewrite:post', 'relay'],
126 root_service, settings)
127 setupReceivers(root_service, settings)
128
129 return root_service
130
131
132 def createAggregatorCacheService(config):
133 from carbon.conf import settings
134
135 settings.RELAY_METHOD = 'consistent-hashing'
136 root_service = createBaseService(config, settings)
137 setupPipeline(
138 ['rewrite:pre', 'aggregate', 'rewrite:post', 'write'],
139 root_service, settings)
125140 setupReceivers(root_service, settings)
126141
127142 return root_service
140155 def setupReceivers(root_service, settings):
141156 from carbon.protocols import MetricReceiver
142157
143 for plugin_name, plugin_class in MetricReceiver.plugins.items():
158 for _, plugin_class in MetricReceiver.plugins.items():
144159 plugin_class.build(root_service)
145160
146161
147162 def setupAggregatorProcessor(root_service, settings):
148 from carbon.aggregator.processor import AggregationProcessor # Register the plugin class
163 from carbon.aggregator.processor import AggregationProcessor # NOQA Register the plugin class
149164 from carbon.aggregator.rules import RuleManager
150165
151166 aggregation_rules_path = settings["aggregation-rules"]
152167 if not exists(aggregation_rules_path):
153 raise CarbonConfigException("aggregation processor: file does not exist {0}".format(aggregation_rules_path))
168 raise CarbonConfigException(
169 "aggregation processor: file does not exist {0}".format(aggregation_rules_path))
154170 RuleManager.read_from(aggregation_rules_path)
155171
156172
176192
177193
178194 def setupWriterProcessor(root_service, settings):
179 from carbon import cache # Register CacheFeedingProcessor
195 from carbon import cache # NOQA Register CacheFeedingProcessor
180196 from carbon.protocols import CacheManagementHandler
181197 from carbon.writer import WriterService
182 from carbon import events
183198
184199 factory = ServerFactory()
185200 factory.protocol = CacheManagementHandler
99 pipeline_processors = []
1010 pipeline_processors_generated = []
1111 database = None
12 listeningPorts = []
1111 See the License for the specific language governing permissions and
1212 limitations under the License."""
1313
14 import os
1514 import re
1615
17 from os.path import join, exists
16 from os.path import join
1817 from carbon.conf import OrderedConfigParser, settings
1918 from carbon.exceptions import CarbonConfigException
20 from carbon.util import pickle, parseRetentionDef
19 from carbon.util import parseRetentionDef
2120 from carbon import log, state
2221
2322
6362 self.points = int(points)
6463
6564 def __str__(self):
66 return "Archive = (Seconds per point: %d, Datapoints to save: %d)" % (self.secondsPerPoint, self.points)
65 return "Archive = (Seconds per point: %d, Datapoints to save: %d)" % (
66 self.secondsPerPoint, self.points)
6767
6868 def getTuple(self):
6969 return (self.secondsPerPoint, self.points)
8585
8686 try:
8787 retentions = options['retentions'].split(',')
88 archives = [Archive.fromString(s) for s in retentions]
8988 except KeyError:
9089 log.err("Schema %s missing 'retentions', skipping" % section)
9190 continue
91
92 try:
93 archives = [Archive.fromString(s) for s in retentions]
94 except ValueError as exc:
95 log.err("{msg} in section [{section}] in {fn}".format(
96 msg=exc, section=section.title(), fn=STORAGE_SCHEMAS_CONFIG))
97 raise SystemExit(1)
9298
9399 if pattern:
94100 mySchema = PatternSchema(section, pattern, archives)
102108 if state.database is not None:
103109 state.database.validateArchiveList(archiveList)
104110 schemaList.append(mySchema)
105 except ValueError, e:
111 except ValueError as e:
106112 log.msg("Invalid schemas found in %s: %s" % (section, e))
107113
108114 schemaList.append(defaultSchema)
150156 schemaList.append(defaultAggregation)
151157 return schemaList
152158
153 defaultArchive = Archive(60, 60 * 24 * 7) # default retention for unclassified data (7 days of minutely data)
159
160 # default retention for unclassified data (7 days of minutely data)
161 defaultArchive = Archive(60, 60 * 24 * 7)
154162 defaultSchema = DefaultSchema('default', [defaultArchive])
155163 defaultAggregation = DefaultSchema('default', (None, None))
00 import sys
11 import os
22 import pwd
3 import __builtin__
4
3 import re
4
5 try:
6 import builtins as __builtin__
7 except ImportError:
8 import __builtin__
9
10 from hashlib import sha256
511 from os.path import abspath, basename, dirname
12 import socket
13 from time import sleep, time
14 from twisted.python.util import initgroups
15 from twisted.scripts.twistd import runApp
16 from carbon.log import setDebugEnabled
617 try:
7 from cStringIO import StringIO
18 from OpenSSL import SSL
819 except ImportError:
9 from StringIO import StringIO
20 SSL = None
21
22
23 # BytesIO is needed on py3 as StringIO does not operate on byte input anymore
24 # We could use BytesIO on py2 as well but it is slower than StringIO
25 if sys.version_info >= (3, 0):
26 from io import BytesIO as StringIO
27 else:
28 try:
29 from cStringIO import StringIO
30 except ImportError:
31 from StringIO import StringIO
32
1033 try:
1134 import cPickle as pickle
1235 USING_CPICKLE = True
1437 import pickle
1538 USING_CPICKLE = False
1639
17 from time import sleep, time
18 from twisted.python.util import initgroups
19 from twisted.scripts.twistd import runApp
20
2140
2241 def dropprivs(user):
2342 uid, gid = pwd.getpwnam(user)[2:4]
2746 return (uid, gid)
2847
2948
49 def enableTcpKeepAlive(transport, enable, settings):
50 if not enable or not hasattr(transport, 'getHandle'):
51 return
52
53 fd = transport.getHandle()
54 if SSL:
55 if isinstance(fd, SSL.Connection):
56 return
57 if fd.type != socket.SOCK_STREAM:
58 return
59
60 transport.setTcpKeepAlive(1)
61 for attr in ['TCP_KEEPIDLE', 'TCP_KEEPINTVL', 'TCP_KEEPCNT']:
62 flag = getattr(socket, attr, None)
63 value = getattr(settings, attr, None)
64 if not flag or value is None:
65 continue
66 fd.setsockopt(socket.SOL_TCP, flag, value)
67
68
3069 def run_twistd_plugin(filename):
3170 from carbon.conf import get_parser
3271 from twisted.scripts.twistd import ServerOptions
5695 # If no reactor was selected yet, try to use the epoll reactor if
5796 # available.
5897 try:
59 from twisted.internet import epollreactor
98 from twisted.internet import epollreactor # noqa: F401
6099 twistd_options.append("--reactor=epoll")
61100 except ImportError:
62101 pass
71110 twistd_options.extend(["--pidfile", options.pidfile])
72111 if options.umask:
73112 twistd_options.extend(["--umask", options.umask])
113 if options.logger:
114 twistd_options.extend(["--logger", options.logger])
115 if options.logger:
116 twistd_options.extend(["--logfile", options.logfile])
74117 if options.syslog:
75118 twistd_options.append("--syslog")
76119
79122
80123 if options.debug:
81124 twistd_options.append("--debug")
125 setDebugEnabled(True)
82126
83127 for option_name, option_value in vars(options).items():
84 if (option_value is not None and
85 option_name not in ("debug", "profile", "profiler", "pidfile", "umask", "nodaemon", "syslog")):
86 twistd_options.extend(["--%s" % option_name.replace("_", "-"),
87 option_value])
128 if (option_value is not None and option_name not in (
129 "debug", "profile", "profiler", "pidfile", "umask", "nodaemon", "syslog",
130 "logger", "logfile")):
131 twistd_options.extend(["--%s" % option_name.replace("_", "-"), option_value])
88132
89133 # Finally, append extra args so that twistd has a chance to process them.
90134 twistd_options.extend(args)
125169 # a dependency on whisper especiaily as carbon moves toward being a more
126170 # generic storage service that can use various backends.
127171 UnitMultipliers = {
128 's' : 1,
129 'm' : 60,
130 'h' : 60 * 60,
131 'd' : 60 * 60 * 24,
132 'w' : 60 * 60 * 24 * 7,
133 'y' : 60 * 60 * 24 * 365,
172 's': 1,
173 'm': 60,
174 'h': 60 * 60,
175 'd': 60 * 60 * 24,
176 'w': 60 * 60 * 24 * 7,
177 'y': 60 * 60 * 24 * 365,
134178 }
135179
136180
181225
182226 @classmethod
183227 def find_class(cls, module, name):
184 if not module in cls.PICKLE_SAFE:
228 if module not in cls.PICKLE_SAFE:
185229 raise pickle.UnpicklingError('Attempting to unpickle unsafe module %s' % module)
186230 __import__(module)
187231 mod = sys.modules[module]
188 if not name in cls.PICKLE_SAFE[module]:
232 if name not in cls.PICKLE_SAFE[module]:
189233 raise pickle.UnpicklingError('Attempting to unpickle unsafe class %s' % name)
190234 return getattr(mod, name)
191235
203247 }
204248
205249 def find_class(self, module, name):
206 if not module in self.PICKLE_SAFE:
250 if module not in self.PICKLE_SAFE:
207251 raise pickle.UnpicklingError('Attempting to unpickle unsafe module %s' % module)
208252 __import__(module)
209253 mod = sys.modules[module]
210 if not name in self.PICKLE_SAFE[module]:
254 if name not in self.PICKLE_SAFE[module]:
211255 raise pickle.UnpicklingError('Attempting to unpickle unsafe class %s' % name)
212256 return getattr(mod, name)
213257
214258 @classmethod
215259 def loads(cls, pickle_string):
216 return cls(StringIO(pickle_string)).load()
260 if sys.version_info >= (3, 0):
261 return cls(StringIO(pickle_string), encoding='utf-8').load()
262 else:
263 return cls(StringIO(pickle_string)).load()
217264
218265
219266 def get_unpickler(insecure=False):
242289 if cost <= self.tokens:
243290 self._tokens -= cost
244291 return True
245 else:
246 if blocking:
247 tokens_needed = cost - self._tokens
248 seconds_per_token = 1 / self.fill_rate
249 seconds_left = seconds_per_token * tokens_needed
250 time_to_sleep = self.timestamp + seconds_left - time()
251 if time_to_sleep > 0:
252 sleep(time_to_sleep)
253 self._tokens -= cost
254 return True
292
293 if not blocking:
255294 return False
295
296 tokens_needed = cost - self._tokens
297 seconds_per_token = 1 / self.fill_rate
298 seconds_left = seconds_per_token * tokens_needed
299 time_to_sleep = self.timestamp + seconds_left - time()
300 if time_to_sleep > 0:
301 sleep(time_to_sleep)
302
303 self._tokens -= cost
304 return True
256305
257306 def setCapacityAndFillRate(self, new_capacity, new_fill_rate):
258307 delta = float(new_capacity) - self.capacity
283332 super(PluginRegistrar, classObj).__init__(name, bases, members)
284333 if hasattr(classObj, 'plugin_name'):
285334 classObj.plugins[classObj.plugin_name] = classObj
335
336
337 class TaggedSeries(object):
338 @classmethod
339 def parse(cls, path):
340 # if path is in openmetrics format: metric{tag="value",...}
341 if path[-2:] == '"}' and '{' in path:
342 return cls.parse_openmetrics(path)
343
344 # path is a carbon path with optional tags: metric;tag=value;...
345 return cls.parse_carbon(path)
346
347 @classmethod
348 def parse_openmetrics(cls, path):
349 """parse a path in openmetrics format: metric{tag="value",...}
350
351 https://github.com/RichiH/OpenMetrics
352 """
353 (metric, rawtags) = path[0:-1].split('{', 2)
354 if not metric:
355 raise Exception('Cannot parse path %s, no metric found' % path)
356
357 tags = {}
358
359 while len(rawtags) > 0:
360 m = re.match(r'([^=]+)="((?:[\\]["\\]|[^"\\])+)"(:?,|$)', rawtags)
361 if not m:
362 raise Exception('Cannot parse path %s, invalid segment %s' % (path, rawtags))
363
364 tags[m.group(1)] = m.group(2).replace(r'\"', '"').replace(r'\\', '\\')
365 rawtags = rawtags[len(m.group(0)):]
366
367 tags['name'] = metric
368 return cls(metric, tags)
369
370 @classmethod
371 def parse_carbon(cls, path):
372 """parse a carbon path with optional tags: metric;tag=value;..."""
373 segments = path.split(';')
374
375 metric = segments[0]
376 if not metric:
377 raise Exception('Cannot parse path %s, no metric found' % path)
378
379 tags = {}
380
381 for segment in segments[1:]:
382 tag = segment.split('=', 1)
383 if len(tag) != 2 or not tag[0]:
384 raise Exception('Cannot parse path %s, invalid segment %s' % (path, segment))
385
386 tags[tag[0]] = tag[1]
387
388 tags['name'] = metric
389 return cls(metric, tags)
390
391 @staticmethod
392 def format(tags):
393 return tags.get('name', '') + ''.join(sorted([
394 ';%s=%s' % (tag, value)
395 for tag, value in tags.items()
396 if tag != 'name'
397 ]))
398
399 @staticmethod
400 def encode(metric, sep='.', hash_only=False):
401 """
402 Helper function to encode tagged series for storage in whisper etc
403
404 When tagged series are detected, they are stored in a separate hierarchy of folders under a
405 top-level _tagged folder, where subfolders are created by using the first 3 hex digits of the
406 sha256 hash of the tagged metric path (4096 possible folders), and second-level subfolders are
407 based on the following 3 hex digits (another 4096 possible folders) for a total of 4096^2
408 possible subfolders. The metric files themselves are created with any . in the metric path
409 replaced with -, to avoid any issues where metrics, tags or values containing a '.' would end
410 up creating further subfolders. This helper is used by both whisper and ceres, but by design
411 each carbon database and graphite-web finder is responsible for handling its own encoding so
412 that different backends can create their own schemes if desired.
413
414 The hash_only parameter can be set to True to use the hash as the filename instead of a
415 human-readable name. This avoids issues with filename length restrictions, at the expense of
416 being unable to decode the filename and determine the original metric name.
417
418 A concrete example:
419
420 .. code-block:: none
421
422 some.metric;tag1=value2;tag2=value.2
423
424 with sha256 hash starting effaae would be stored in:
425
426 _tagged/eff/aae/some-metric;tag1=value2;tag2=value-2.wsp (whisper)
427 _tagged/eff/aae/some-metric;tag1=value2;tag2=value-2 (ceres)
428
429 """
430 if ';' in metric:
431 metric_hash = sha256(metric.encode('utf8')).hexdigest()
432 return sep.join([
433 '_tagged',
434 metric_hash[0:3],
435 metric_hash[3:6],
436 metric_hash if hash_only else metric.replace('.', '_DOT_')
437 ])
438
439 # metric isn't tagged, just replace dots with the separator and trim any leading separator
440 return metric.replace('.', sep).lstrip(sep)
441
442 @staticmethod
443 def decode(path, sep='.'):
444 """
445 Helper function to decode tagged series from storage in whisper etc
446 """
447 if path.startswith('_tagged'):
448 return path.split(sep, 3)[-1].replace('_DOT_', '.')
449
450 # metric isn't tagged, just replace the separator with dots
451 return path.replace(sep, '.')
452
453 def __init__(self, metric, tags, series_id=None):
454 self.metric = metric
455 self.tags = tags
456 self.id = series_id
457
458 @property
459 def path(self):
460 return self.__class__.format(self.tags)
1212 limitations under the License."""
1313
1414 import time
15 from six.moves import queue
1516
1617 from carbon import state
1718 from carbon.cache import MetricCache
1819 from carbon.storage import loadStorageSchemas, loadAggregationSchemas
1920 from carbon.conf import settings
20 from carbon import log, events, instrumentation
21 from carbon import log, instrumentation
2122 from carbon.util import TokenBucket
2223
2324 from twisted.internet import reactor
4950 UPDATE_BUCKET = TokenBucket(capacity, fill_rate)
5051
5152
52 def optimalWriteOrder():
53 """Generates metrics with the most cached values first and applies a soft
54 rate limit on new metrics"""
53 class TagQueue(object):
54 def __init__(self, maxsize=0, update_interval=1):
55 self.add_queue = queue.Queue(maxsize)
56 self.update_queue = queue.Queue(maxsize)
57 self.update_interval = update_interval
58 self.update_counter = 0
59
60 def add(self, metric):
61 try:
62 self.add_queue.put_nowait(metric)
63 except queue.Full:
64 pass
65
66 def update(self, metric):
67 self.update_counter = self.update_counter % self.update_interval + 1
68 if self.update_counter == 1:
69 try:
70 self.update_queue.put_nowait(metric)
71 except queue.Full:
72 pass
73
74 def getbatch(self, maxsize=1):
75 batch = []
76 while len(batch) < maxsize:
77 try:
78 batch.append(self.add_queue.get_nowait())
79 except queue.Empty:
80 break
81 while len(batch) < maxsize:
82 try:
83 batch.append(self.update_queue.get_nowait())
84 except queue.Empty:
85 break
86 return batch
87
88
89 tagQueue = TagQueue(maxsize=settings.TAG_QUEUE_SIZE, update_interval=settings.TAG_UPDATE_INTERVAL)
90
91
92 def writeCachedDataPoints():
93 "Write datapoints until the MetricCache is completely empty"
94
5595 cache = MetricCache()
5696 while cache:
5797 (metric, datapoints) = cache.drain_metric()
98 if metric is None:
99 # end the loop
100 break
101
58102 dbFileExists = state.database.exists(metric)
59103
60 if not dbFileExists and CREATE_BUCKET:
61 # If our tokenbucket has enough tokens available to create a new metric
62 # file then yield the metric data to complete that operation. Otherwise
63 # we'll just drop the metric on the ground and move on to the next
64 # metric.
65 # XXX This behavior should probably be configurable to no tdrop metrics
66 # when rate limitng unless our cache is too big or some other legit
67 # reason.
68 if CREATE_BUCKET.drain(1):
69 yield (metric, datapoints, dbFileExists)
70 continue
71
72 yield (metric, datapoints, dbFileExists)
73
74
75 def writeCachedDataPoints():
76 "Write datapoints until the MetricCache is completely empty"
77
78 cache = MetricCache()
79 while cache:
80 dataWritten = False
81
82 for (metric, datapoints, dbFileExists) in optimalWriteOrder():
83 dataWritten = True
84
85 if not dbFileExists:
86 archiveConfig = None
87 xFilesFactor, aggregationMethod = None, None
88
89 for schema in SCHEMAS:
90 if schema.matches(metric):
91 if settings.LOG_CREATES:
92 log.creates('new metric %s matched schema %s' % (metric, schema.name))
93 archiveConfig = [archive.getTuple() for archive in schema.archives]
94 break
95
96 for schema in AGGREGATION_SCHEMAS:
97 if schema.matches(metric):
98 if settings.LOG_CREATES:
99 log.creates('new metric %s matched aggregation schema %s'
100 % (metric, schema.name))
101 xFilesFactor, aggregationMethod = schema.archives
102 break
103
104 if not archiveConfig:
105 raise Exception("No storage schema matched the metric '%s', check your storage-schemas.conf file." % metric)
106
107 if settings.LOG_CREATES:
108 log.creates("creating database metric %s (archive=%s xff=%s agg=%s)" %
109 (metric, archiveConfig, xFilesFactor, aggregationMethod))
110 try:
111 state.database.create(metric, archiveConfig, xFilesFactor, aggregationMethod)
112 instrumentation.increment('creates')
113 except Exception, e:
114 log.err()
115 log.msg("Error creating %s: %s" % (metric, e))
116 instrumentation.increment('errors')
117 continue
118 # If we've got a rate limit configured lets makes sure we enforce it
119 if UPDATE_BUCKET:
120 UPDATE_BUCKET.drain(1, blocking=True)
121 try:
122 t1 = time.time()
123 # If we have duplicated points, always pick the last. update_many()
124 # has no guaranted behavior for that, and in fact the current implementation
125 # will keep the first point in the list.
126 datapoints = dict(datapoints).items()
127 state.database.write(metric, datapoints)
128 updateTime = time.time() - t1
129 except Exception, e:
104 if not dbFileExists:
105 if CREATE_BUCKET and not CREATE_BUCKET.drain(1):
106 # If our tokenbucket doesn't have enough tokens available to create a new metric
107 # file then we'll just drop the metric on the ground and move on to the next
108 # metric.
109 # XXX This behavior should probably be configurable to no tdrop metrics
110 # when rate limitng unless our cache is too big or some other legit
111 # reason.
112 instrumentation.increment('droppedCreates')
113 continue
114
115 archiveConfig = None
116 xFilesFactor, aggregationMethod = None, None
117
118 for schema in SCHEMAS:
119 if schema.matches(metric):
120 if settings.LOG_CREATES:
121 log.creates('new metric %s matched schema %s' % (metric, schema.name))
122 archiveConfig = [archive.getTuple() for archive in schema.archives]
123 break
124
125 for schema in AGGREGATION_SCHEMAS:
126 if schema.matches(metric):
127 if settings.LOG_CREATES:
128 log.creates('new metric %s matched aggregation schema %s'
129 % (metric, schema.name))
130 xFilesFactor, aggregationMethod = schema.archives
131 break
132
133 if not archiveConfig:
134 raise Exception(("No storage schema matched the metric '%s',"
135 " check your storage-schemas.conf file.") % metric)
136
137 if settings.LOG_CREATES:
138 log.creates("creating database metric %s (archive=%s xff=%s agg=%s)" %
139 (metric, archiveConfig, xFilesFactor, aggregationMethod))
140 try:
141 state.database.create(metric, archiveConfig, xFilesFactor, aggregationMethod)
142 if settings.ENABLE_TAGS:
143 tagQueue.add(metric)
144 instrumentation.increment('creates')
145 except Exception as e:
130146 log.err()
131 log.msg("Error writing to %s: %s" % (metric, e))
147 log.msg("Error creating %s: %s" % (metric, e))
132148 instrumentation.increment('errors')
133 else:
134 pointCount = len(datapoints)
135 instrumentation.increment('committedPoints', pointCount)
136 instrumentation.append('updateTimes', updateTime)
137 if settings.LOG_UPDATES:
138 log.updates("wrote %d datapoints for %s in %.5f seconds" % (pointCount, metric, updateTime))
139
140 # Avoid churning CPU when only new metrics are in the cache
141 if not dataWritten:
142 time.sleep(0.1)
149 continue
150
151 # If we've got a rate limit configured lets makes sure we enforce it
152 waitTime = 0
153 if UPDATE_BUCKET:
154 t1 = time.time()
155 UPDATE_BUCKET.drain(1, blocking=True)
156 waitTime = time.time() - t1
157
158 try:
159 t1 = time.time()
160 # If we have duplicated points, always pick the last. update_many()
161 # has no guaranted behavior for that, and in fact the current implementation
162 # will keep the first point in the list.
163 datapoints = dict(datapoints).items()
164 state.database.write(metric, datapoints)
165 if settings.ENABLE_TAGS:
166 tagQueue.update(metric)
167 updateTime = time.time() - t1
168 except Exception as e:
169 log.err()
170 log.msg("Error writing to %s: %s" % (metric, e))
171 instrumentation.increment('errors')
172 else:
173 pointCount = len(datapoints)
174 instrumentation.increment('committedPoints', pointCount)
175 instrumentation.append('updateTimes', updateTime)
176 if settings.LOG_UPDATES:
177 if waitTime > 0.001:
178 log.updates("wrote %d datapoints for %s in %.5f seconds after waiting %.5f seconds" % (
179 pointCount, metric, updateTime, waitTime))
180 else:
181 log.updates("wrote %d datapoints for %s in %.5f seconds" % (
182 pointCount, metric, updateTime))
143183
144184
145185 def writeForever():
148188 writeCachedDataPoints()
149189 except Exception:
150190 log.err()
151 time.sleep(0.1) # The writer thread only sleeps when the cache is empty or an error occurs
191 # Back-off on error to give the backend time to recover.
192 time.sleep(0.1)
193 else:
194 # Avoid churning CPU when there are no metrics are in the cache
195 time.sleep(1)
196
197
198 def writeTags():
199 while True:
200 tags = tagQueue.getbatch(settings.TAG_BATCH_SIZE)
201 if not tags:
202 break
203 state.database.tag(*tags)
204
205
206 def writeTagsForever():
207 while reactor.running:
208 try:
209 writeTags()
210 except Exception:
211 log.err()
212 # Back-off on error to give the backend time to recover.
213 time.sleep(0.1)
214 else:
215 # Avoid churning CPU when there are no series in the queue
216 time.sleep(0.2)
152217
153218
154219 def reloadStorageSchemas():
155220 global SCHEMAS
156221 try:
157222 SCHEMAS = loadStorageSchemas()
158 except Exception, e:
223 except Exception as e:
159224 log.msg("Failed to reload storage SCHEMAS: %s" % (e))
160225
161226
163228 global AGGREGATION_SCHEMAS
164229 try:
165230 AGGREGATION_SCHEMAS = loadAggregationSchemas()
166 except Exception, e:
231 except Exception as e:
167232 log.msg("Failed to reload aggregation SCHEMAS: %s" % (e))
168233
169234
171236 try:
172237 shut = settings.MAX_UPDATES_PER_SECOND_ON_SHUTDOWN
173238 if UPDATE_BUCKET:
174 UPDATE_BUCKET.setCapacityAndFillRate(shut,shut)
239 UPDATE_BUCKET.setCapacityAndFillRate(shut, shut)
175240 if CREATE_BUCKET:
176 CREATE_BUCKET.setCapacityAndFillRate(shut,shut)
177 log.msg("Carbon shutting down. Changed the update rate to: " + str(settings.MAX_UPDATES_PER_SECOND_ON_SHUTDOWN))
241 CREATE_BUCKET.setCapacityAndFillRate(shut, shut)
242 log.msg("Carbon shutting down. Changed the update rate to: " +
243 str(settings.MAX_UPDATES_PER_SECOND_ON_SHUTDOWN))
178244 except KeyError:
179245 log.msg("Carbon shutting down. Update rate not changed")
246
247 # Also set MIN_TIMESTAMP_LAG to 0 to avoid waiting for nothing.
248 settings.MIN_TIMESTAMP_LAG = 0
180249
181250
182251 class WriterService(Service):
193262 self.aggregation_reload_task.start(60, False)
194263 reactor.addSystemEventTrigger('before', 'shutdown', shutdownModifyUpdateSpeed)
195264 reactor.callInThread(writeForever)
265 if settings.ENABLE_TAGS:
266 reactor.callInThread(writeTagsForever)
196267 Service.startService(self)
197268
198269 def stopService(self):
0 from zope.interface import implementer
1
2 from twisted.plugin import IPlugin
3 from twisted.application.service import IServiceMaker
4
5 from carbon import conf
6
7
8 @implementer(IServiceMaker, IPlugin)
9 class CarbonAggregatorCacheServiceMaker(object):
10
11 tapname = "carbon-aggregator-cache"
12 description = "Aggregate and write stats for graphite."
13 options = conf.CarbonAggregatorOptions
14
15 def makeService(self, options):
16 """
17 Construct a C{carbon-aggregator-cache} service.
18 """
19 from carbon import service
20 return service.createAggregatorCacheService(options)
21
22
23 # Now construct an object which *provides* the relevant interfaces
24 serviceMaker = CarbonAggregatorCacheServiceMaker()
0 from zope.interface import implements
0 from zope.interface import implementer
11
22 from twisted.plugin import IPlugin
33 from twisted.application.service import IServiceMaker
55 from carbon import conf
66
77
8 @implementer(IServiceMaker, IPlugin)
89 class CarbonAggregatorServiceMaker(object):
910
10 implements(IServiceMaker, IPlugin)
1111 tapname = "carbon-aggregator"
1212 description = "Aggregate stats for graphite."
1313 options = conf.CarbonAggregatorOptions
0 from zope.interface import implements
0 from zope.interface import implementer
11
22 from twisted.plugin import IPlugin
33 from twisted.application.service import IServiceMaker
55 from carbon import conf
66
77
8 @implementer(IServiceMaker, IPlugin)
89 class CarbonCacheServiceMaker(object):
910
10 implements(IServiceMaker, IPlugin)
1111 tapname = "carbon-cache"
1212 description = "Collect stats for graphite."
1313 options = conf.CarbonCacheOptions
0 from zope.interface import implements
0 from zope.interface import implementer
11
22 from twisted.plugin import IPlugin
33 from twisted.application.service import IServiceMaker
55 from carbon import conf
66
77
8 @implementer(IServiceMaker, IPlugin)
89 class CarbonRelayServiceMaker(object):
910
10 implements(IServiceMaker, IPlugin)
1111 tapname = "carbon-relay"
1212 description = "Relay stats for graphite."
1313 options = conf.CarbonRelayOptions
22 from __future__ import with_statement
33
44 import os
5 import ConfigParser
5 from glob import glob
6 try:
7 from ConfigParser import ConfigParser, DuplicateSectionError # Python 2
8 except ImportError:
9 from configparser import ConfigParser, DuplicateSectionError # Python 3
610
7 import platform
8 from glob import glob
9
10 try:
11 from io import BytesIO
12 except ImportError:
13 from StringIO import StringIO as BytesIO
1411
1512 # Graphite historically has an install prefix set in setup.cfg. Being in a
1613 # configuration file, it's not easy to override it or unset it (for installing
2118 # required for installations from a source tarball because running
2219 # ``python setup.py sdist`` will re-add the prefix to the tarball's
2320 # ``setup.cfg``.
21 cf = ConfigParser()
22
2423 with open('setup.cfg', 'r') as f:
2524 orig_setup_cfg = f.read()
26 cf = ConfigParser.ConfigParser()
27 cf.readfp(BytesIO(orig_setup_cfg), 'setup.cfg')
25 f.seek(0)
26 cf.readfp(f, 'setup.cfg')
2827
2928 if os.environ.get('GRAPHITE_NO_PREFIX'):
3029 cf.remove_section('install')
3130 else:
31 print('#' * 80)
32 print('')
33 print('Carbon\'s default installation prefix is "/opt/graphite".')
34 print('')
35 print('To install Carbon in the Python\'s default location run:')
36 print('$ GRAPHITE_NO_PREFIX=True python setup.py install')
37 print('')
38 print('#' * 80)
3239 try:
3340 cf.add_section('install')
34 except ConfigParser.DuplicateSectionError:
41 except DuplicateSectionError:
3542 pass
3643 if not cf.has_option('install', 'prefix'):
3744 cf.set('install', 'prefix', '/opt/graphite')
3845 if not cf.has_option('install', 'install-lib'):
3946 cf.set('install', 'install-lib', '%(prefix)s/lib')
4047
41 with open('setup.cfg', 'wb') as f:
48 with open('setup.cfg', 'w') as f:
4249 cf.write(f)
50
4351
4452 if os.environ.get('USE_SETUPTOOLS'):
4553 from setuptools import setup
4654 setup_kwargs = dict(zip_safe=0)
47
4855 else:
4956 from distutils.core import setup
5057 setup_kwargs = dict()
5158
5259
53 storage_dirs = [ ('storage/ceres', []), ('storage/whisper',[]),
54 ('storage/lists',[]), ('storage/log',[]),
55 ('storage/rrd',[]) ]
60 storage_dirs = [ ('storage/ceres/dummy.txt', []), ('storage/whisper/dummy.txt',[]),
61 ('storage/lists',[]), ('storage/log/dummy.txt',[]),
62 ('storage/rrd/dummy.txt',[]) ]
5663 conf_files = [ ('conf', glob('conf/*.example')) ]
5764
5865 install_files = storage_dirs + conf_files
6471 'distro/redhat/init.d/carbon-aggregator']) ]
6572 install_files += init_scripts
6673
74
6775 try:
6876 setup(
69 name='carbon',
70 version='1.0.2',
71 url='http://graphiteapp.org/',
72 author='Chris Davis',
73 author_email='chrismd@gmail.com',
74 license='Apache Software License 2.0',
75 description='Backend data caching and persistence daemon for Graphite',
76 long_description='Backend data caching and persistence daemon for Graphite',
77 packages=['carbon', 'carbon.aggregator', 'twisted.plugins'],
78 package_dir={'' : 'lib'},
79 scripts=glob('bin/*'),
80 package_data={ 'carbon' : ['*.xml'] },
81 data_files=install_files,
82 install_requires=['Twisted', 'txAMQP'],
83 classifiers=(
84 'Intended Audience :: Developers',
85 'Natural Language :: English',
86 'License :: OSI Approved :: Apache Software License',
87 'Programming Language :: Python',
88 'Programming Language :: Python :: 2',
89 'Programming Language :: Python :: 2.7',
90 'Programming Language :: Python :: 2 :: Only',
91 ),
92
93 **setup_kwargs
77 name='carbon',
78 version='1.1.4',
79 url='http://graphiteapp.org/',
80 author='Chris Davis',
81 author_email='chrismd@gmail.com',
82 license='Apache Software License 2.0',
83 description='Backend data caching and persistence daemon for Graphite',
84 long_description='Backend data caching and persistence daemon for Graphite',
85 packages=['carbon', 'carbon.aggregator', 'twisted.plugins'],
86 package_dir={'' : 'lib'},
87 scripts=glob('bin/*'),
88 package_data={ 'carbon' : ['*.xml'] },
89 data_files=install_files,
90 install_requires=['Twisted', 'txAMQP', 'cachetools', 'urllib3'],
91 classifiers=(
92 'Intended Audience :: Developers',
93 'Natural Language :: English',
94 'License :: OSI Approved :: Apache Software License',
95 'Programming Language :: Python',
96 'Programming Language :: Python :: 2',
97 'Programming Language :: Python :: 2.7',
98 'Programming Language :: Python :: 3',
99 'Programming Language :: Python :: 3.4',
100 'Programming Language :: Python :: 3.5',
101 'Programming Language :: Python :: 3.6',
102 'Programming Language :: Python :: 3.7',
103 'Programming Language :: Python :: Implementation :: CPython',
104 'Programming Language :: Python :: Implementation :: PyPy',
105 ),
106 **setup_kwargs
94107 )
95108 finally:
96109 with open('setup.cfg', 'w') as f: