New upstream version 1.1.4
Thomas Goirand
5 years ago
0 | 0 | Metadata-Version: 1.1 |
1 | 1 | Name: carbon |
2 | Version: 1.0.2 | |
2 | Version: 1.1.4 | |
3 | 3 | Summary: Backend data caching and persistence daemon for Graphite |
4 | 4 | Home-page: http://graphiteapp.org/ |
5 | 5 | Author: Chris Davis |
13 | 13 | Classifier: Programming Language :: Python |
14 | 14 | Classifier: Programming Language :: Python :: 2 |
15 | 15 | Classifier: Programming Language :: Python :: 2.7 |
16 | Classifier: Programming Language :: Python :: 2 :: Only | |
16 | Classifier: Programming Language :: Python :: 3 | |
17 | Classifier: Programming Language :: Python :: 3.4 | |
18 | Classifier: Programming Language :: Python :: 3.5 | |
19 | Classifier: Programming Language :: Python :: 3.6 | |
20 | Classifier: Programming Language :: Python :: 3.7 | |
21 | Classifier: Programming Language :: Python :: Implementation :: CPython | |
22 | Classifier: Programming Language :: Python :: Implementation :: PyPy |
0 | #!/usr/bin/env python | |
1 | """Copyright 2009 Chris Davis | |
2 | ||
3 | Licensed under the Apache License, Version 2.0 (the "License"); | |
4 | you may not use this file except in compliance with the License. | |
5 | You may obtain a copy of the License at | |
6 | ||
7 | http://www.apache.org/licenses/LICENSE-2.0 | |
8 | ||
9 | Unless required by applicable law or agreed to in writing, software | |
10 | distributed under the License is distributed on an "AS IS" BASIS, | |
11 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
12 | See the License for the specific language governing permissions and | |
13 | limitations under the License.""" | |
14 | ||
15 | import sys | |
16 | import os.path | |
17 | ||
18 | # Figure out where we're installed | |
19 | BIN_DIR = os.path.dirname(os.path.abspath(__file__)) | |
20 | ROOT_DIR = os.path.dirname(BIN_DIR) | |
21 | ||
22 | # Make sure that carbon's 'lib' dir is in the $PYTHONPATH if we're running from | |
23 | # source. | |
24 | LIB_DIR = os.path.join(ROOT_DIR, "lib") | |
25 | sys.path.insert(0, LIB_DIR) | |
26 | ||
27 | from carbon.util import run_twistd_plugin # noqa | |
28 | from carbon.exceptions import CarbonConfigException # noqa | |
29 | ||
30 | try: | |
31 | run_twistd_plugin(__file__) | |
32 | except CarbonConfigException as exc: | |
33 | raise SystemExit(str(exc)) |
24 | 24 | LIB_DIR = os.path.join(ROOT_DIR, "lib") |
25 | 25 | sys.path.insert(0, LIB_DIR) |
26 | 26 | |
27 | from carbon.util import run_twistd_plugin | |
28 | from carbon.exceptions import CarbonConfigException | |
27 | from carbon.util import run_twistd_plugin # noqa | |
28 | from carbon.exceptions import CarbonConfigException # noqa | |
29 | 29 | |
30 | 30 | try: |
31 | 31 | run_twistd_plugin(__file__) |
32 | except CarbonConfigException, exc: | |
32 | except CarbonConfigException as exc: | |
33 | 33 | raise SystemExit(str(exc)) |
24 | 24 | LIB_DIR = os.path.join(ROOT_DIR, "lib") |
25 | 25 | sys.path.insert(0, LIB_DIR) |
26 | 26 | |
27 | from carbon.util import run_twistd_plugin | |
28 | from carbon.exceptions import CarbonConfigException | |
27 | from carbon.util import run_twistd_plugin # noqa | |
28 | from carbon.exceptions import CarbonConfigException # noqa | |
29 | 29 | |
30 | 30 | try: |
31 | 31 | run_twistd_plugin(__file__) |
32 | except CarbonConfigException, exc: | |
32 | except CarbonConfigException as exc: | |
33 | 33 | raise SystemExit(str(exc)) |
13 | 13 | limitations under the License.""" |
14 | 14 | |
15 | 15 | import sys |
16 | import imp | |
17 | 16 | from os.path import dirname, join, abspath, exists |
18 | 17 | from optparse import OptionParser |
19 | 18 | |
34 | 33 | except ImportError: |
35 | 34 | pass |
36 | 35 | |
37 | from twisted.internet import stdio, reactor, defer | |
38 | from twisted.protocols.basic import LineReceiver | |
39 | from carbon.routers import ConsistentHashingRouter, RelayRulesRouter | |
40 | from carbon.client import CarbonClientManager | |
41 | from carbon import log, events | |
36 | from twisted.internet import stdio, reactor, defer # noqa | |
37 | from twisted.protocols.basic import LineReceiver # noqa | |
38 | from carbon.routers import ConsistentHashingRouter, RelayRulesRouter # noqa | |
39 | from carbon.client import CarbonClientManager # noqa | |
40 | from carbon import log, events # noqa | |
42 | 41 | |
43 | 42 | |
44 | 43 | option_parser = OptionParser(usage="%prog [options] <host:port:instance> <host:port:instance> ...") |
45 | 44 | option_parser.add_option('--debug', action='store_true', help="Log debug info to stdout") |
46 | 45 | option_parser.add_option('--keyfunc', help="Use a custom key function (path/to/module.py:myFunc)") |
47 | 46 | option_parser.add_option('--replication', type='int', default=1, help='Replication factor') |
48 | option_parser.add_option('--routing', default='consistent-hashing', | |
47 | option_parser.add_option( | |
48 | '--routing', default='consistent-hashing', | |
49 | 49 | help='Routing method: "consistent-hashing" (default) or "relay"') |
50 | option_parser.add_option('--diverse-replicas', action='store_true', help="Spread replicas across diff. servers") | |
51 | option_parser.add_option('--relayrules', default=default_relayrules, | |
52 | help='relay-rules.conf file to use for relay routing') | |
50 | option_parser.add_option( | |
51 | '--diverse-replicas', action='store_true', help="Spread replicas across diff. servers") | |
52 | option_parser.add_option( | |
53 | '--relayrules', default=default_relayrules, help='relay-rules.conf file to use for relay routing') | |
53 | 54 | |
54 | 55 | options, args = option_parser.parse_args() |
55 | 56 | |
56 | 57 | if not args: |
57 | print 'At least one host:port destination required\n' | |
58 | print('At least one host:port destination required\n') | |
58 | 59 | option_parser.print_usage() |
59 | 60 | raise SystemExit(1) |
60 | 61 | |
61 | 62 | if options.routing not in ('consistent-hashing', 'relay'): |
62 | print "Invalid --routing value, must be one of:" | |
63 | print " consistent-hashing" | |
64 | print " relay" | |
63 | print("Invalid --routing value, must be one of:") | |
64 | print(" consistent-hashing") | |
65 | print(" relay") | |
65 | 66 | raise SystemExit(1) |
66 | 67 | |
67 | 68 | destinations = [] |
73 | 74 | instance = parts[2] |
74 | 75 | else: |
75 | 76 | instance = None |
76 | destinations.append( (host, port, instance) ) | |
77 | destinations.append((host, port, instance)) | |
77 | 78 | |
78 | 79 | if options.debug: |
79 | 80 | log.logToStdout() |
86 | 87 | if exists(options.relayrules): |
87 | 88 | router = RelayRulesRouter(options.relayrules) |
88 | 89 | else: |
89 | print "relay rules file %s does not exist" % options.relayrules | |
90 | print("relay rules file %s does not exist" % options.relayrules) | |
90 | 91 | raise SystemExit(1) |
91 | 92 | |
92 | 93 | client_manager = CarbonClientManager(router) |
103 | 104 | delimiter = '\n' |
104 | 105 | |
105 | 106 | def lineReceived(self, line): |
106 | #log.msg("[DEBUG] lineReceived(): %s" % line) | |
107 | # log.msg("[DEBUG] lineReceived(): %s" % line) | |
107 | 108 | try: |
108 | 109 | (metric, value, timestamp) = line.split() |
109 | 110 | datapoint = (float(timestamp), float(value)) |
110 | assert datapoint[1] == datapoint[1] # filter out NaNs | |
111 | assert datapoint[1] == datapoint[1] # filter out NaNs | |
111 | 112 | client_manager.sendDatapoint(metric, datapoint) |
112 | 113 | except ValueError: |
113 | 114 | log.err(None, 'Dropping invalid line: %s' % line) |
114 | 115 | |
115 | 116 | def connectionLost(self, reason): |
116 | 117 | log.msg('stdin disconnected') |
118 | ||
117 | 119 | def startShutdown(results): |
118 | 120 | log.msg("startShutdown(%s)" % str(results)) |
119 | 121 | allStopped = client_manager.stopAllClients() |
120 | 122 | allStopped.addCallback(shutdown) |
123 | ||
121 | 124 | firstConnectsAttempted.addCallback(startShutdown) |
122 | 125 | |
123 | stdio.StandardIO( StdinMetricsReader() ) | |
126 | ||
127 | stdio.StandardIO(StdinMetricsReader()) | |
124 | 128 | |
125 | 129 | exitCode = 0 |
130 | ||
131 | ||
126 | 132 | def shutdown(results): |
127 | 133 | global exitCode |
128 | 134 | for success, result in results: |
132 | 138 | if reactor.running: |
133 | 139 | reactor.stop() |
134 | 140 | |
141 | ||
135 | 142 | reactor.run() |
136 | 143 | raise SystemExit(exitCode) |
24 | 24 | LIB_DIR = os.path.join(ROOT_DIR, "lib") |
25 | 25 | sys.path.insert(0, LIB_DIR) |
26 | 26 | |
27 | from carbon.util import run_twistd_plugin | |
28 | from carbon.exceptions import CarbonConfigException | |
27 | from carbon.util import run_twistd_plugin # noqa | |
28 | from carbon.exceptions import CarbonConfigException # noqa | |
29 | 29 | |
30 | 30 | try: |
31 | 31 | run_twistd_plugin(__file__) |
32 | except CarbonConfigException, exc: | |
32 | except CarbonConfigException as exc: | |
33 | 33 | raise SystemExit(str(exc)) |
14 | 14 | |
15 | 15 | import sys |
16 | 16 | import whisper |
17 | from os.path import dirname, exists, join, realpath | |
18 | from ConfigParser import ConfigParser | |
17 | from os.path import dirname, join, realpath | |
18 | ||
19 | try: | |
20 | from ConfigParser import ConfigParser | |
21 | except ImportError: | |
22 | from configparser import ConfigParser | |
19 | 23 | |
20 | 24 | if len(sys.argv) == 2: |
21 | 25 | SCHEMAS_FILE = sys.argv[1] |
22 | print "Loading storage-schemas configuration from: '%s'" % SCHEMAS_FILE | |
26 | print("Loading storage-schemas configuration from: '%s'" % SCHEMAS_FILE) | |
23 | 27 | else: |
24 | 28 | SCHEMAS_FILE = realpath(join(dirname(__file__), '..', 'conf', 'storage-schemas.conf')) |
25 | print "Loading storage-schemas configuration from default location at: '%s'" % SCHEMAS_FILE | |
29 | print("Loading storage-schemas configuration from default location at: '%s'" % SCHEMAS_FILE) | |
26 | 30 | |
27 | 31 | config_parser = ConfigParser() |
28 | 32 | if not config_parser.read(SCHEMAS_FILE): |
31 | 35 | errors_found = 0 |
32 | 36 | |
33 | 37 | for section in config_parser.sections(): |
34 | print "Section '%s':" % section | |
38 | print("Section '%s':" % section) | |
35 | 39 | options = dict(config_parser.items(section)) |
36 | 40 | retentions = options['retentions'].split(',') |
37 | 41 | |
40 | 44 | for retention in retentions: |
41 | 45 | try: |
42 | 46 | archives.append(whisper.parseRetentionDef(retention)) |
43 | except ValueError, e: | |
44 | print " - Error: Section '%s' contains an invalid item in its retention definition ('%s')" % \ | |
47 | except ValueError as e: | |
48 | print( | |
49 | " - Error: Section '%s' contains an invalid item in its retention definition ('%s')" % | |
45 | 50 | (section, retention) |
46 | print " %s" % e.message | |
51 | ) | |
52 | print(" %s" % e) | |
47 | 53 | section_failed = True |
48 | 54 | |
49 | 55 | if not section_failed: |
50 | 56 | try: |
51 | 57 | whisper.validateArchiveList(archives) |
52 | except whisper.InvalidConfiguration, e: | |
53 | print " - Error: Section '%s' contains an invalid retention definition ('%s')" % \ | |
58 | except whisper.InvalidConfiguration as e: | |
59 | print( | |
60 | " - Error: Section '%s' contains an invalid retention definition ('%s')" % | |
54 | 61 | (section, ','.join(retentions)) |
55 | print " %s" % e.message | |
62 | ) | |
63 | print(" %s" % e) | |
56 | 64 | |
57 | 65 | if section_failed: |
58 | 66 | errors_found += 1 |
59 | 67 | else: |
60 | print " OK" | |
68 | print(" OK") | |
61 | 69 | |
62 | 70 | if errors_found: |
63 | raise SystemExit( "Storage-schemas configuration '%s' failed validation" % SCHEMAS_FILE) | |
71 | raise SystemExit("Storage-schemas configuration '%s' failed validation" % SCHEMAS_FILE) | |
64 | 72 | |
65 | print "Storage-schemas configuration '%s' is valid" % SCHEMAS_FILE | |
73 | print("Storage-schemas configuration '%s' is valid" % SCHEMAS_FILE) |
84 | 84 | # second. |
85 | 85 | MIN_TIMESTAMP_RESOLUTION = 1 |
86 | 86 | |
87 | # Set the minimum lag in seconds for a point to be written to the database | |
88 | # in order to optimize batching. This means that each point will wait at least | |
89 | # the duration of this lag before being written. Setting this to 0 disable the feature. | |
90 | # This currently only works when using the timesorted write strategy. | |
91 | # MIN_TIMESTAMP_LAG = 0 | |
92 | ||
87 | 93 | # Set the interface and port for the line (plain text) listener. Setting the |
88 | 94 | # interface to 0.0.0.0 listens on all interfaces. Port can be set to 0 to |
89 | 95 | # disable this listener if it is not required. |
130 | 136 | USE_FLOW_CONTROL = True |
131 | 137 | |
132 | 138 | # If enabled this setting is used to timeout metric client connection if no |
133 | # metrics have been sent in specified time in seconds | |
139 | # metrics have been sent in specified time in seconds | |
134 | 140 | #METRIC_CLIENT_IDLE_TIMEOUT = None |
135 | 141 | |
136 | 142 | # By default, carbon-cache will log every whisper update and cache hit. |
200 | 206 | |
201 | 207 | # On systems which has a large number of metrics, an amount of Whisper write(2)'s |
202 | 208 | # pageback sometimes cause disk thrashing due to memory shortage, so that abnormal |
203 | # disk reads occur. Enabling this option makes it possible to decrease useless | |
209 | # disk reads occur. Enabling this option makes it possible to decrease useless | |
204 | 210 | # page cache memory by posix_fadvise(2) with POSIX_FADVISE_RANDOM option. |
205 | 211 | # WHISPER_FADVISE_RANDOM = False |
206 | 212 | |
295 | 301 | # Example: store everything |
296 | 302 | # BIND_PATTERNS = # |
297 | 303 | |
304 | # URL of graphite-web instance, this is used to add incoming series to the tag database | |
305 | # GRAPHITE_URL = http://127.0.0.1:80 | |
306 | ||
307 | # Tag support, when enabled carbon will make HTTP calls to graphite-web to update the tag index | |
308 | # ENABLE_TAGS = True | |
309 | ||
310 | # Tag update interval, this specifies how frequently updates to existing series will trigger | |
311 | # an update to the tag index, the default setting is once every 100 updates | |
312 | # TAG_UPDATE_INTERVAL = 100 | |
313 | ||
314 | # Tag hash filenames, this specifies whether tagged metric filenames should use the hash of the metric name | |
315 | # or a human-readable name, using hashed names avoids issues with path length when using a large number of tags | |
316 | # TAG_HASH_FILENAMES = True | |
317 | ||
318 | # Tag batch size, this specifies the maximum number of series to be sent to graphite-web in a single batch | |
319 | # TAG_BATCH_SIZE = 100 | |
320 | ||
321 | # Tag queue size, this specifies the maximum number of series to be queued for sending to graphite-web | |
322 | # There are separate queues for new series and for updates to existing series | |
323 | # TAG_QUEUE_SIZE = 10000 | |
324 | ||
325 | # Set to enable Sentry.io exception monitoring. | |
326 | # RAVEN_DSN='YOUR_DSN_HERE'. | |
327 | ||
298 | 328 | # To configure special settings for the carbon-cache instance 'b', uncomment this: |
299 | 329 | #[cache:b] |
300 | 330 | #LINE_RECEIVER_PORT = 2103 |
340 | 370 | |
341 | 371 | # For REPLICATION_FACTOR >=2, set DIVERSE_REPLICAS to True to guarantee replicas |
342 | 372 | # across distributed hosts. With this setting disabled, it's possible that replicas |
343 | # may be sent to different caches on the same host. This has been the default | |
373 | # may be sent to different caches on the same host. This has been the default | |
344 | 374 | # behavior since introduction of 'consistent-hashing' relay method. |
345 | 375 | # Note that enabling this on an existing pre-0.9.14 cluster will require rebalancing |
346 | 376 | # your metrics across the cluster nodes using a tool like Carbonate. |
368 | 398 | # set to one of "line", "pickle", "udp" and "protobuf". This list can be |
369 | 399 | # extended with CarbonClientFactory plugins and defaults to "pickle". |
370 | 400 | # DESTINATION_PROTOCOL = pickle |
401 | ||
402 | # This defines the wire transport, either none or ssl. | |
403 | # If SSL is used any TCP connection will be upgraded to TLS1. The system's | |
404 | # trust authority will be used unless DESTINATION_SSL_CA is specified in | |
405 | # which case an alternative certificate authority chain will be used for | |
406 | # verifying the remote certificate. | |
407 | # To use SSL you'll need the cryptography, service_identity, and twisted >= 14 | |
408 | # DESTINATION_TRANSPORT = none | |
409 | # DESTINATION_SSL_CA=/path/to/private-ca.crt | |
410 | ||
411 | # This allows to have multiple connections per destinations, this will | |
412 | # pool all the replicas of a single host in the same queue and distribute | |
413 | # points accross these replicas instead of replicating them. | |
414 | # The following example will balance the load between :0 and :1. | |
415 | ## DESTINATIONS = foo:2001:0, foo:2001:1 | |
416 | ## RELAY_METHOD = rules | |
417 | # Note: this is currently incompatible with USE_RATIO_RESET which gets | |
418 | # disabled if this option is enabled. | |
419 | # DESTINATIONS_POOL_REPLICAS = False | |
420 | ||
421 | # When using consistent hashing it sometime makes sense to make | |
422 | # the ring dynamic when you don't want to loose points when a | |
423 | # single destination is down. Replication is an answer to that | |
424 | # but it can be quite expensive. | |
425 | # DYNAMIC_ROUTER = False | |
426 | ||
427 | # Controls the number of connection attempts before marking a | |
428 | # destination as down. We usually do one connection attempt per | |
429 | # second. | |
430 | # DYNAMIC_ROUTER_MAX_RETRIES = 5 | |
371 | 431 | |
372 | 432 | # This is the maximum number of datapoints that can be queued up |
373 | 433 | # for a single destination. Once this limit is hit, we will |
412 | 472 | USE_FLOW_CONTROL = True |
413 | 473 | |
414 | 474 | # If enabled this setting is used to timeout metric client connection if no |
415 | # metrics have been sent in specified time in seconds | |
475 | # metrics have been sent in specified time in seconds | |
416 | 476 | #METRIC_CLIENT_IDLE_TIMEOUT = None |
417 | 477 | |
418 | 478 | # Set this to True to enable whitelisting and blacklisting of metrics in |
460 | 520 | # reset connections for no good reason. |
461 | 521 | MIN_RESET_INTERVAL=121 |
462 | 522 | |
523 | # Enable TCP Keep Alive (http://tldp.org/HOWTO/TCP-Keepalive-HOWTO/overview.html). | |
524 | # Default settings will send a probe every 30s. Default is False. | |
525 | # TCP_KEEPALIVE=True | |
526 | # The interval between the last data packet sent (simple ACKs are not | |
527 | # considered data) and the first keepalive probe; after the connection is marked | |
528 | # to need keepalive, this counter is not used any further. | |
529 | # TCP_KEEPIDLE=10 | |
530 | # The interval between subsequential keepalive probes, regardless of what | |
531 | # the connection has exchanged in the meantime. | |
532 | # TCP_KEEPINTVL=30 | |
533 | # The number of unacknowledged probes to send before considering the connection | |
534 | # dead and notifying the application layer. | |
535 | # TCP_KEEPCNT=2 | |
536 | ||
537 | ||
463 | 538 | [aggregator] |
464 | 539 | LINE_RECEIVER_INTERFACE = 0.0.0.0 |
465 | 540 | LINE_RECEIVER_PORT = 2023 |
510 | 585 | USE_FLOW_CONTROL = True |
511 | 586 | |
512 | 587 | # If enabled this setting is used to timeout metric client connection if no |
513 | # metrics have been sent in specified time in seconds | |
588 | # metrics have been sent in specified time in seconds | |
514 | 589 | #METRIC_CLIENT_IDLE_TIMEOUT = None |
515 | 590 | |
516 | 591 | # This defines the maximum "message size" between carbon daemons. |
558 | 633 | # Specify the user to drop privileges to |
559 | 634 | # If this is blank carbon-aggregator runs as the user that invokes it |
560 | 635 | # USER = |
636 | ||
637 | # Part of the code, and particularly aggregator rules, need | |
638 | # to cache metric names. To avoid leaking too much memory you | |
639 | # can tweak the size of this cache. The default allow for 1M | |
640 | # different metrics per rule (~200MiB). | |
641 | # CACHE_METRIC_NAMES_MAX=1000000 | |
642 | ||
643 | # You can optionally set a ttl to this cache. | |
644 | # CACHE_METRIC_NAMES_TTL=600 |
23 | 23 | [sum] |
24 | 24 | pattern = \.count$ |
25 | 25 | xFilesFactor = 0 |
26 | aggregationMethod = sum | |
26 | aggregationMethod = max | |
27 | 27 | |
28 | 28 | [default_average] |
29 | 29 | pattern = .* |
13 | 13 | # Valid: 60s:7d,300s:30d (300/60 = 5) |
14 | 14 | # Invalid: 180s:7d,300s:30d (300/180 = 3.333) |
15 | 15 | # |
16 | # This retention is set at the time the first metric is sent. | |
17 | # Changing this file will not affect already-created .wsp files. | |
18 | # Use whisper-resize.py to change existing data files. | |
16 | 19 | |
17 | 20 | # Carbon's internal metrics. This entry should match what is specified in |
18 | 21 | # CARBON_METRIC_PREFIX and CARBON_METRIC_INTERVAL settings |
3 | 3 | from carbon import log |
4 | 4 | |
5 | 5 | |
6 | class BufferManager: | |
6 | class _BufferManager: | |
7 | 7 | def __init__(self): |
8 | 8 | self.buffers = {} |
9 | 9 | |
12 | 12 | |
13 | 13 | def get_buffer(self, metric_path): |
14 | 14 | if metric_path not in self.buffers: |
15 | log.aggregator("Allocating new metric buffer for %s" % metric_path) | |
15 | log.debug("Allocating new metric buffer for %s" % metric_path) | |
16 | 16 | self.buffers[metric_path] = MetricBuffer(metric_path) |
17 | 17 | |
18 | 18 | return self.buffers[metric_path] |
50 | 50 | self.aggregation_frequency = int(frequency) |
51 | 51 | self.aggregation_func = func |
52 | 52 | self.compute_task = LoopingCall(self.compute_value) |
53 | compute_frequency = min(settings['WRITE_BACK_FREQUENCY'], frequency) or frequency | |
53 | if settings['WRITE_BACK_FREQUENCY'] is not None: | |
54 | compute_frequency = min(settings['WRITE_BACK_FREQUENCY'], frequency) | |
55 | else: | |
56 | compute_frequency = frequency | |
54 | 57 | self.compute_task.start(compute_frequency, now=False) |
55 | 58 | self.configured = True |
56 | 59 | |
57 | 60 | def compute_value(self): |
58 | now = int( time.time() ) | |
61 | now = int(time.time()) | |
59 | 62 | current_interval = now - (now % self.aggregation_frequency) |
60 | age_threshold = current_interval - (settings['MAX_AGGREGATION_INTERVALS'] * self.aggregation_frequency) | |
63 | age_threshold = current_interval - ( | |
64 | settings['MAX_AGGREGATION_INTERVALS'] * self.aggregation_frequency) | |
61 | 65 | |
62 | for buffer in self.interval_buffers.values(): | |
66 | for buffer in list(self.interval_buffers.values()): | |
63 | 67 | if buffer.active: |
64 | 68 | value = self.aggregation_func(buffer.values) |
65 | 69 | datapoint = (buffer.interval, value) |
92 | 96 | self.active = True |
93 | 97 | |
94 | 98 | def input(self, datapoint): |
95 | self.values.append( datapoint[1] ) | |
99 | self.values.append(datapoint[1]) | |
96 | 100 | self.active = True |
97 | 101 | |
98 | 102 | def mark_inactive(self): |
100 | 104 | |
101 | 105 | |
102 | 106 | # Shared importable singleton |
103 | BufferManager = BufferManager() | |
107 | BufferManager = _BufferManager() | |
104 | 108 | |
105 | 109 | # Avoid import circularity |
106 | from carbon import state | |
110 | from carbon import state # NOQA |
1 | 1 | from carbon.aggregator.buffers import BufferManager |
2 | 2 | from carbon.instrumentation import increment |
3 | 3 | from carbon.pipeline import Processor |
4 | from carbon.rewrite import PRE, POST, RewriteRuleManager | |
5 | 4 | from carbon.conf import settings |
6 | 5 | from carbon import log |
7 | 6 | |
11 | 10 | |
12 | 11 | def process(self, metric, datapoint): |
13 | 12 | increment('datapointsReceived') |
14 | ||
15 | for rule in RewriteRuleManager.rules(PRE): | |
16 | metric = rule.apply(metric) | |
17 | 13 | |
18 | 14 | aggregate_metrics = set() |
19 | 15 | |
32 | 28 | |
33 | 29 | values_buffer.input(datapoint) |
34 | 30 | |
35 | for rule in RewriteRuleManager.rules(POST): | |
36 | metric = rule.apply(metric) | |
37 | ||
38 | 31 | if settings.FORWARD_ALL and metric not in aggregate_metrics: |
39 | 32 | if settings.LOG_AGGREGATOR_MISSES and len(aggregate_metrics) == 0: |
40 | log.msg("Couldn't match metric %s with any aggregation rule. Passing on un-aggregated." % metric) | |
33 | log.msg( | |
34 | "Couldn't match metric %s with any aggregation rule. Passing on un-aggregated." % metric) | |
41 | 35 | yield (metric, datapoint) |
0 | import time | |
1 | 0 | import re |
1 | ||
2 | from math import floor, ceil | |
3 | ||
2 | 4 | from os.path import exists, getmtime |
3 | 5 | from twisted.internet.task import LoopingCall |
6 | from cachetools import TTLCache, LRUCache | |
7 | ||
4 | 8 | from carbon import log |
9 | from carbon.conf import settings | |
5 | 10 | from carbon.aggregator.buffers import BufferManager |
6 | 11 | |
7 | 12 | |
8 | class RuleManager: | |
13 | def get_cache(): | |
14 | ttl = settings.CACHE_METRIC_NAMES_TTL | |
15 | size = settings.CACHE_METRIC_NAMES_MAX | |
16 | if ttl > 0 and size > 0: | |
17 | return TTLCache(size, ttl) | |
18 | elif size > 0: | |
19 | return LRUCache(size) | |
20 | else: | |
21 | return dict() | |
22 | ||
23 | ||
24 | class RuleManager(object): | |
9 | 25 | def __init__(self): |
10 | 26 | self.rules = [] |
11 | 27 | self.rules_file = None |
55 | 71 | left_side, right_side = line.split('=', 1) |
56 | 72 | output_pattern, frequency = left_side.split() |
57 | 73 | method, input_pattern = right_side.split() |
58 | frequency = int( frequency.lstrip('(').rstrip(')') ) | |
74 | frequency = int(frequency.lstrip('(').rstrip(')')) | |
59 | 75 | return AggregationRule(input_pattern, output_pattern, method, frequency) |
60 | 76 | |
61 | 77 | except ValueError: |
63 | 79 | raise |
64 | 80 | |
65 | 81 | |
66 | class AggregationRule: | |
82 | class AggregationRule(object): | |
67 | 83 | def __init__(self, input_pattern, output_pattern, method, frequency): |
68 | 84 | self.input_pattern = input_pattern |
69 | 85 | self.output_pattern = output_pattern |
76 | 92 | self.aggregation_func = AGGREGATION_METHODS[method] |
77 | 93 | self.build_regex() |
78 | 94 | self.build_template() |
79 | self.cache = {} | |
95 | self.cache = get_cache() | |
80 | 96 | |
81 | 97 | def get_aggregate_metric(self, metric_path): |
82 | 98 | if metric_path in self.cache: |
94 | 110 | try: |
95 | 111 | result = self.output_template % extracted_fields |
96 | 112 | except TypeError: |
97 | log.err("Failed to interpolate template %s with fields %s" % (self.output_template, extracted_fields)) | |
113 | log.err("Failed to interpolate template %s with fields %s" % ( | |
114 | self.output_template, extracted_fields)) | |
98 | 115 | |
99 | if result: | |
100 | self.cache[metric_path] = result | |
116 | self.cache[metric_path] = result | |
101 | 117 | return result |
102 | 118 | |
103 | 119 | def build_regex(self): |
109 | 125 | i = input_part.find('<<') |
110 | 126 | j = input_part.find('>>') |
111 | 127 | pre = input_part[:i] |
112 | post = input_part[j+2:] | |
113 | field_name = input_part[i+2:j] | |
128 | post = input_part[j + 2:] | |
129 | field_name = input_part[i + 2:j] | |
114 | 130 | regex_part = '%s(?P<%s>.+)%s' % (pre, field_name, post) |
115 | 131 | |
116 | 132 | else: |
118 | 134 | j = input_part.find('>') |
119 | 135 | if i > -1 and j > i: |
120 | 136 | pre = input_part[:i] |
121 | post = input_part[j+1:] | |
122 | field_name = input_part[i+1:j] | |
137 | post = input_part[j + 1:] | |
138 | field_name = input_part[i + 1:j] | |
123 | 139 | regex_part = '%s(?P<%s>[^.]+)%s' % (pre, field_name, post) |
124 | 140 | elif input_part == '*': |
125 | 141 | regex_part = '[^.]+' |
137 | 153 | |
138 | 154 | def avg(values): |
139 | 155 | if values: |
140 | return float( sum(values) ) / len(values) | |
156 | return float(sum(values)) / len(values) | |
157 | ||
141 | 158 | |
142 | 159 | def count(values): |
143 | 160 | if values: |
144 | 161 | return len(values) |
145 | 162 | |
163 | ||
164 | def percentile(factor): | |
165 | def func(values): | |
166 | if values: | |
167 | values = sorted(values) | |
168 | rank = factor * (len(values) - 1) | |
169 | rank_left = int(floor(rank)) | |
170 | rank_right = int(ceil(rank)) | |
171 | ||
172 | if rank_left == rank_right: | |
173 | return values[rank_left] | |
174 | else: | |
175 | return values[rank_left] * (rank_right - rank) + values[rank_right] * (rank - rank_left) | |
176 | ||
177 | return func | |
178 | ||
179 | ||
146 | 180 | AGGREGATION_METHODS = { |
147 | 'sum' : sum, | |
148 | 'avg' : avg, | |
149 | 'min' : min, | |
150 | 'max' : max, | |
151 | 'count' : count | |
181 | 'sum': sum, | |
182 | 'avg': avg, | |
183 | 'min': min, | |
184 | 'max': max, | |
185 | 'p50': percentile(0.50), | |
186 | 'p75': percentile(0.75), | |
187 | 'p80': percentile(0.80), | |
188 | 'p90': percentile(0.90), | |
189 | 'p95': percentile(0.95), | |
190 | 'p99': percentile(0.99), | |
191 | 'p999': percentile(0.999), | |
192 | 'count': count, | |
152 | 193 | } |
153 | 194 | |
154 | 195 | # Importable singleton |
30 | 30 | (see example config file provided) |
31 | 31 | """ |
32 | 32 | import sys |
33 | ||
33 | 34 | import os |
34 | 35 | import socket |
35 | 36 | from optparse import OptionParser |
38 | 39 | from twisted.internet import reactor |
39 | 40 | from twisted.internet.protocol import ReconnectingClientFactory |
40 | 41 | from twisted.application.internet import TCPClient |
41 | from txamqp.protocol import AMQClient | |
42 | from txamqp.client import TwistedDelegate | |
43 | import txamqp.spec | |
42 | ||
43 | # txamqp is currently not ported to py3 | |
44 | try: | |
45 | from txamqp.protocol import AMQClient | |
46 | from txamqp.client import TwistedDelegate | |
47 | import txamqp.spec | |
48 | except ImportError: | |
49 | raise ImportError | |
44 | 50 | |
45 | 51 | try: |
46 | 52 | import carbon |
49 | 55 | LIB_DIR = os.path.dirname(os.path.dirname(__file__)) |
50 | 56 | sys.path.insert(0, LIB_DIR) |
51 | 57 | |
52 | import carbon.protocols #satisfy import order requirements | |
58 | import carbon.protocols # NOQA satisfy import order requirements | |
53 | 59 | from carbon.protocols import CarbonServerProtocol |
54 | 60 | from carbon.conf import settings |
55 | from carbon import log, events, instrumentation | |
61 | from carbon import log, events | |
56 | 62 | |
57 | 63 | |
58 | 64 | HOSTNAME = socket.gethostname().split('.')[0] |
116 | 122 | |
117 | 123 | # bind each configured metric pattern |
118 | 124 | for bind_pattern in settings.BIND_PATTERNS: |
119 | log.listener("binding exchange '%s' to queue '%s' with pattern %s" \ | |
125 | log.listener("binding exchange '%s' to queue '%s' with pattern %s" | |
120 | 126 | % (exchange, my_queue, bind_pattern)) |
121 | 127 | yield chan.queue_bind(exchange=exchange, queue=my_queue, |
122 | 128 | routing_key=bind_pattern) |
123 | 129 | |
124 | 130 | yield chan.basic_consume(queue=my_queue, no_ack=True, |
125 | 131 | consumer_tag=self.consumer_tag) |
132 | ||
126 | 133 | @inlineCallbacks |
127 | 134 | def receive_loop(self): |
128 | 135 | queue = yield self.queue(self.consumer_tag) |
148 | 155 | metric, value, timestamp = line.split() |
149 | 156 | else: |
150 | 157 | value, timestamp = line.split() |
151 | datapoint = ( float(timestamp), float(value) ) | |
158 | datapoint = (float(timestamp), float(value)) | |
152 | 159 | if datapoint[1] != datapoint[1]: # filter out NaN values |
153 | 160 | continue |
154 | 161 | except ValueError: |
247 | 254 | |
248 | 255 | (options, args) = parser.parse_args() |
249 | 256 | |
250 | ||
251 | 257 | startReceiver(options.host, options.port, options.username, |
252 | 258 | options.password, vhost=options.vhost, |
253 | 259 | exchange_name=options.exchange, verbose=options.verbose) |
254 | 260 | reactor.run() |
255 | 261 | |
262 | ||
256 | 263 | if __name__ == "__main__": |
257 | 264 | main() |
20 | 20 | from optparse import OptionParser |
21 | 21 | |
22 | 22 | from twisted.internet.defer import inlineCallbacks |
23 | from twisted.internet import reactor, task | |
23 | from twisted.internet import reactor | |
24 | 24 | from twisted.internet.protocol import ClientCreator |
25 | 25 | from txamqp.protocol import AMQClient |
26 | 26 | from txamqp.client import TwistedDelegate |
53 | 53 | yield channel.exchange_declare(exchange=exchange, type="topic", |
54 | 54 | durable=True, auto_delete=False) |
55 | 55 | |
56 | message = Content( "%f %d" % (value, timestamp) ) | |
56 | message = Content("%f %d" % (value, timestamp)) | |
57 | 57 | message["delivery mode"] = 2 |
58 | 58 | |
59 | 59 | channel.basic_publish(exchange=exchange, content=message, routing_key=metric_path) |
111 | 111 | d.addBoth(lambda _: reactor.stop()) |
112 | 112 | reactor.run() |
113 | 113 | |
114 | ||
114 | 115 | if __name__ == "__main__": |
115 | 116 | main() |
20 | 20 | from carbon.conf import settings |
21 | 21 | from carbon import events, log |
22 | 22 | from carbon.pipeline import Processor |
23 | ||
24 | ||
25 | def by_timestamp((timestamp, value)): # useful sort key function | |
23 | from carbon.util import TaggedSeries | |
24 | ||
25 | ||
26 | def by_timestamp(t_v): # useful sort key function | |
27 | (timestamp, _) = t_v | |
26 | 28 | return timestamp |
27 | 29 | |
28 | 30 | |
30 | 32 | plugin_name = 'write' |
31 | 33 | |
32 | 34 | def __init__(self, *args, **kwargs): |
33 | super(Processor, self).__init__(*args, **kwargs) | |
35 | super(CacheFeedingProcessor, self).__init__(*args, **kwargs) | |
34 | 36 | self.cache = MetricCache() |
35 | 37 | |
36 | 38 | def process(self, metric, datapoint): |
39 | # normalize metric name (reorder tags) | |
40 | try: | |
41 | metric = TaggedSeries.parse(metric).path | |
42 | except Exception as err: | |
43 | log.msg('Error parsing metric %s: %s' % (metric, err)) | |
44 | ||
37 | 45 | self.cache.store(metric, datapoint) |
38 | 46 | return Processor.NO_OUTPUT |
39 | 47 | |
46 | 54 | self.cache = cache |
47 | 55 | |
48 | 56 | def choose_item(self): |
49 | raise NotImplemented | |
57 | raise NotImplementedError() | |
50 | 58 | |
51 | 59 | |
52 | 60 | class NaiveStrategy(DrainStrategy): |
56 | 64 | |
57 | 65 | def _generate_queue(): |
58 | 66 | while True: |
59 | metric_names = self.cache.keys() | |
67 | metric_names = list(self.cache.keys()) | |
60 | 68 | while metric_names: |
61 | 69 | yield metric_names.pop() |
62 | 70 | |
63 | 71 | self.queue = _generate_queue() |
64 | 72 | |
65 | 73 | def choose_item(self): |
66 | return self.queue.next() | |
74 | return next(self.queue) | |
67 | 75 | |
68 | 76 | |
69 | 77 | class MaxStrategy(DrainStrategy): |
72 | 80 | that infrequently or irregularly updated metrics may not be written |
73 | 81 | until shutdown """ |
74 | 82 | def choose_item(self): |
75 | metric_name, size = max(self.cache.items(), key=lambda x: len(itemgetter(1)(x))) | |
83 | metric_name, _ = max(self.cache.items(), key=lambda x: len(itemgetter(1)(x))) | |
76 | 84 | return metric_name |
77 | 85 | |
78 | 86 | |
79 | 87 | class RandomStrategy(DrainStrategy): |
80 | 88 | """Pop points randomly""" |
81 | 89 | def choose_item(self): |
82 | return choice(self.cache.keys()) | |
90 | return choice(list(self.cache.keys())) # nosec | |
83 | 91 | |
84 | 92 | |
85 | 93 | class SortedStrategy(DrainStrategy): |
104 | 112 | self.queue = _generate_queue() |
105 | 113 | |
106 | 114 | def choose_item(self): |
107 | return self.queue.next() | |
115 | return next(self.queue) | |
108 | 116 | |
109 | 117 | |
110 | 118 | class TimeSortedStrategy(DrainStrategy): |
118 | 126 | while True: |
119 | 127 | t = time.time() |
120 | 128 | metric_lw = sorted(self.cache.watermarks, key=lambda x: x[1], reverse=True) |
129 | if settings.MIN_TIMESTAMP_LAG: | |
130 | metric_lw = [x for x in metric_lw if t - x[1] > settings.MIN_TIMESTAMP_LAG] | |
121 | 131 | size = len(metric_lw) |
122 | 132 | if settings.LOG_CACHE_QUEUE_SORTS and size: |
123 | 133 | log.msg("Sorted %d cache queues in %.6f seconds" % (size, time.time() - t)) |
134 | if not metric_lw: | |
135 | # If there is nothing to do give a chance to sleep to the reader. | |
136 | yield None | |
124 | 137 | while metric_lw: |
125 | 138 | yield itemgetter(0)(metric_lw.pop()) |
126 | 139 | if settings.LOG_CACHE_QUEUE_SORTS and size: |
129 | 142 | self.queue = _generate_queue() |
130 | 143 | |
131 | 144 | def choose_item(self): |
132 | return self.queue.next() | |
145 | return next(self.queue) | |
133 | 146 | |
134 | 147 | |
135 | 148 | class _MetricCache(defaultdict): |
173 | 186 | metric = self.strategy.choose_item() |
174 | 187 | else: |
175 | 188 | # Avoid .keys() as it dumps the whole list |
176 | metric = self.iterkeys().next() | |
189 | metric = next(iter(self)) | |
190 | if metric is None: | |
191 | return (None, []) | |
177 | 192 | return (metric, self.pop(metric)) |
178 | 193 | |
179 | 194 | def get_datapoints(self, metric): |
206 | 221 | |
207 | 222 | _Cache = None |
208 | 223 | |
224 | ||
209 | 225 | def MetricCache(): |
210 | 226 | global _Cache |
211 | 227 | if _Cache is not None: |
229 | 245 | return _Cache |
230 | 246 | |
231 | 247 | |
232 | ||
233 | 248 | # Avoid import circularities |
234 | from carbon import state | |
249 | from carbon import state # NOQA |
1 | 1 | # source: carbon.proto |
2 | 2 | |
3 | 3 | import sys |
4 | _b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) | |
5 | 4 | from google.protobuf import descriptor as _descriptor |
6 | 5 | from google.protobuf import message as _message |
7 | 6 | from google.protobuf import reflection as _reflection |
8 | 7 | from google.protobuf import symbol_database as _symbol_database |
9 | from google.protobuf import descriptor_pb2 | |
10 | 8 | # @@protoc_insertion_point(imports) |
9 | ||
11 | 10 | |
12 | 11 | _sym_db = _symbol_database.Default() |
13 | 12 | |
14 | ||
15 | ||
13 | _b = sys.version_info[0] < 3 and (lambda x: x) or (lambda x: x.encode('latin1')) | |
16 | 14 | |
17 | 15 | DESCRIPTOR = _descriptor.FileDescriptor( |
18 | 16 | name='carbon.proto', |
19 | 17 | package='carbon', |
20 | 18 | syntax='proto3', |
21 | serialized_pb=_b('\n\x0c\x63\x61rbon.proto\x12\x06\x63\x61rbon\")\n\x05Point\x12\x11\n\ttimestamp\x18\x01 \x01(\r\x12\r\n\x05value\x18\x02 \x01(\x01\"7\n\x06Metric\x12\x0e\n\x06metric\x18\x01 \x01(\t\x12\x1d\n\x06points\x18\x02 \x03(\x0b\x32\r.carbon.Point\"*\n\x07Payload\x12\x1f\n\x07metrics\x18\x01 \x03(\x0b\x32\x0e.carbon.Metricb\x06proto3') | |
19 | serialized_pb=_b('\n\x0c\x63\x61rbon.proto\x12\x06\x63\x61rbon\")\n\x05Point\x12\x11\n\ttimestamp' | |
20 | '\x18\x01 \x01(\r\x12\r\n\x05value\x18\x02 \x01(\x01\"7\n\x06Metric\x12\x0e\n' | |
21 | '\x06metric\x18\x01 \x01(\t\x12\x1d\n\x06points\x18\x02 \x03(\x0b\x32\r.carbon.' | |
22 | 'Point\"*\n\x07Payload\x12\x1f\n\x07metrics\x18\x01 \x03(\x0b\x32\x0e.carbon.' | |
23 | 'Metricb\x06proto3') | |
22 | 24 | ) |
23 | 25 | _sym_db.RegisterFileDescriptor(DESCRIPTOR) |
24 | ||
25 | ||
26 | ||
27 | 26 | |
28 | 27 | _POINT = _descriptor.Descriptor( |
29 | 28 | name='Point', |
138 | 137 | DESCRIPTOR.message_types_by_name['Payload'] = _PAYLOAD |
139 | 138 | |
140 | 139 | Point = _reflection.GeneratedProtocolMessageType('Point', (_message.Message,), dict( |
141 | DESCRIPTOR = _POINT, | |
142 | __module__ = 'carbon_pb2' | |
140 | DESCRIPTOR=_POINT, | |
141 | __module__='carbon_pb2' | |
143 | 142 | # @@protoc_insertion_point(class_scope:carbon.Point) |
144 | )) | |
143 | )) | |
145 | 144 | _sym_db.RegisterMessage(Point) |
146 | 145 | |
147 | 146 | Metric = _reflection.GeneratedProtocolMessageType('Metric', (_message.Message,), dict( |
148 | DESCRIPTOR = _METRIC, | |
149 | __module__ = 'carbon_pb2' | |
147 | DESCRIPTOR=_METRIC, | |
148 | __module__='carbon_pb2' | |
150 | 149 | # @@protoc_insertion_point(class_scope:carbon.Metric) |
151 | )) | |
150 | )) | |
152 | 151 | _sym_db.RegisterMessage(Metric) |
153 | 152 | |
154 | 153 | Payload = _reflection.GeneratedProtocolMessageType('Payload', (_message.Message,), dict( |
155 | DESCRIPTOR = _PAYLOAD, | |
156 | __module__ = 'carbon_pb2' | |
154 | DESCRIPTOR=_PAYLOAD, | |
155 | __module__='carbon_pb2' | |
157 | 156 | # @@protoc_insertion_point(class_scope:carbon.Payload) |
158 | )) | |
157 | )) | |
159 | 158 | _sym_db.RegisterMessage(Payload) |
160 | 159 | |
161 | ||
162 | 160 | # @@protoc_insertion_point(module_scope) |
0 | from collections import deque | |
0 | from collections import deque, defaultdict | |
1 | 1 | from time import time |
2 | from six import with_metaclass | |
2 | 3 | |
3 | 4 | from twisted.application.service import Service |
4 | 5 | from twisted.internet import reactor |
5 | 6 | from twisted.internet.defer import Deferred, DeferredList |
6 | 7 | from twisted.internet.protocol import ReconnectingClientFactory |
7 | 8 | from twisted.protocols.basic import LineOnlyReceiver, Int32StringReceiver |
9 | ||
8 | 10 | from carbon.conf import settings |
9 | 11 | from carbon.util import pickle |
12 | from carbon.util import PluginRegistrar | |
13 | from carbon.util import enableTcpKeepAlive | |
10 | 14 | from carbon import instrumentation, log, pipeline, state |
11 | from carbon.util import PluginRegistrar | |
15 | ||
16 | try: | |
17 | from OpenSSL import SSL | |
18 | except ImportError: | |
19 | SSL = None | |
20 | try: | |
21 | from twisted.internet import ssl | |
22 | except ImportError: | |
23 | ssl = None | |
12 | 24 | |
13 | 25 | try: |
14 | 26 | import signal |
15 | 27 | except ImportError: |
16 | 28 | log.debug("Couldn't import signal module") |
29 | ||
30 | try: | |
31 | from carbon.resolver import setUpRandomResolver | |
32 | except ImportError: | |
33 | setUpRandomResolver = None | |
17 | 34 | |
18 | 35 | |
19 | 36 | SEND_QUEUE_LOW_WATERMARK = settings.MAX_QUEUE_SIZE * settings.QUEUE_LOW_WATERMARK_PCT |
28 | 45 | self.transport.registerProducer(self, streaming=True) |
29 | 46 | # Define internal metric names |
30 | 47 | self.lastResetTime = time() |
48 | self.destination = self.factory.destination | |
31 | 49 | self.destinationName = self.factory.destinationName |
32 | 50 | self.queuedUntilReady = 'destinations.%s.queuedUntilReady' % self.destinationName |
33 | 51 | self.sent = 'destinations.%s.sent' % self.destinationName |
34 | 52 | self.batchesSent = 'destinations.%s.batchesSent' % self.destinationName |
35 | 53 | |
36 | 54 | self.slowConnectionReset = 'destinations.%s.slowConnectionReset' % self.destinationName |
37 | ||
38 | self.factory.connectionMade.callback(self) | |
55 | enableTcpKeepAlive(self.transport, settings.TCP_KEEPALIVE, settings) | |
56 | ||
57 | d = self.factory.connectionMade | |
58 | # Setup a new deferred before calling the callback to allow callbacks | |
59 | # to re-register themselves. | |
39 | 60 | self.factory.connectionMade = Deferred() |
61 | d.callback(self) | |
62 | ||
40 | 63 | self.sendQueued() |
41 | 64 | |
42 | 65 | def connectionLost(self, reason): |
103 | 126 | if not self.factory.hasQueuedDatapoints(): |
104 | 127 | return |
105 | 128 | |
106 | if settings.USE_RATIO_RESET is True: | |
107 | if not self.connectionQualityMonitor(): | |
108 | self.resetConnectionForQualityReasons("Sent: {0}, Received: {1}".format( | |
109 | instrumentation.prior_stats.get(self.sent, 0), | |
110 | instrumentation.prior_stats.get('metricsReceived', 0))) | |
129 | if not self.connectionQualityMonitor(): | |
130 | self.resetConnectionForQualityReasons("Sent: {0}, Received: {1}".format( | |
131 | instrumentation.prior_stats.get(self.sent, 0), | |
132 | instrumentation.prior_stats.get('metricsReceived', 0))) | |
111 | 133 | |
112 | 134 | self.sendDatapointsNow(self.factory.takeSomeFromQueue()) |
113 | if (self.factory.queueFull.called and | |
114 | queueSize < SEND_QUEUE_LOW_WATERMARK): | |
135 | if (self.factory.queueFull.called and queueSize < SEND_QUEUE_LOW_WATERMARK): | |
115 | 136 | if not self.factory.queueHasSpace.called: |
116 | 137 | self.factory.queueHasSpace.callback(queueSize) |
117 | 138 | if self.factory.hasQueuedDatapoints(): |
118 | 139 | self.factory.scheduleSend() |
119 | 140 | |
120 | ||
121 | 141 | def connectionQualityMonitor(self): |
122 | 142 | """Checks to see if the connection for this factory appears to |
123 | 143 | be delivering stats at a speed close to what we're receiving |
131 | 151 | True means that the total received is less than settings.MIN_RESET_STAT_FLOW |
132 | 152 | |
133 | 153 | False means that quality is bad |
134 | ||
135 | 154 | """ |
155 | if not settings.USE_RATIO_RESET: | |
156 | return True | |
157 | ||
158 | if settings.DESTINATION_POOL_REPLICAS: | |
159 | received = self.factory.attemptedRelays | |
160 | else: | |
161 | received = 'metricsReceived' | |
162 | ||
136 | 163 | destination_sent = float(instrumentation.prior_stats.get(self.sent, 0)) |
137 | total_received = float(instrumentation.prior_stats.get('metricsReceived', 0)) | |
164 | total_received = float(instrumentation.prior_stats.get(received, 0)) | |
138 | 165 | instrumentation.increment(self.slowConnectionReset, 0) |
139 | 166 | if total_received < settings.MIN_RESET_STAT_FLOW: |
140 | 167 | return True |
164 | 191 | __repr__ = __str__ |
165 | 192 | |
166 | 193 | |
167 | class CarbonClientFactory(object, ReconnectingClientFactory): | |
168 | __metaclass__ = PluginRegistrar | |
194 | class CAReplaceClientContextFactory: | |
195 | """A context factory for SSL clients needing a different CA chain.""" | |
196 | ||
197 | isClient = 1 | |
198 | # SSLv23_METHOD allows SSLv2, SSLv3, and TLSv1. We disable SSLv2 below, | |
199 | # though. | |
200 | method = SSL.SSLv23_METHOD if SSL else None | |
201 | ||
202 | _cafile = None | |
203 | ||
204 | def __init__(self, file=None): | |
205 | self._cafile = file | |
206 | ||
207 | def getContext(self): | |
208 | ctx = SSL.Context(self.method) | |
209 | ctx.set_options(SSL.OP_NO_SSLv2) | |
210 | if self._cafile is not None: | |
211 | ctx.use_certificate_chain_file(self._cafile) | |
212 | return ctx | |
213 | ||
214 | ||
215 | class CarbonClientFactory(with_metaclass(PluginRegistrar, ReconnectingClientFactory, object)): | |
169 | 216 | plugins = {} |
170 | 217 | maxDelay = 5 |
171 | 218 | |
172 | def __init__(self, destination): | |
219 | def __init__(self, destination, router): | |
173 | 220 | self.destination = destination |
221 | self.router = router | |
174 | 222 | self.destinationName = ('%s:%d:%s' % destination).replace('.', '_') |
175 | 223 | self.host, self.port, self.carbon_instance = destination |
176 | 224 | self.addr = (self.host, self.port) |
180 | 228 | self.connectedProtocol = None |
181 | 229 | self.queueEmpty = Deferred() |
182 | 230 | self.queueFull = Deferred() |
183 | self.queueFull.addCallback(self.queueFullCallback) | |
231 | self.queueFull.addCallbacks(self.queueFullCallback, log.err) | |
184 | 232 | self.queueHasSpace = Deferred() |
185 | self.queueHasSpace.addCallback(self.queueSpaceCallback) | |
233 | self.queueHasSpace.addCallbacks(self.queueSpaceCallback, log.err) | |
234 | # Args: {'connector': connector, 'reason': reason} | |
186 | 235 | self.connectFailed = Deferred() |
236 | # Args: {'connector': connector, 'reason': reason} | |
237 | self.connectionLost = Deferred() | |
238 | # Args: protocol instance | |
187 | 239 | self.connectionMade = Deferred() |
188 | self.connectionLost = Deferred() | |
240 | self.connectionMade.addCallbacks(self.clientConnectionMade, log.err) | |
189 | 241 | self.deferSendPending = None |
190 | 242 | # Define internal metric names |
191 | 243 | self.attemptedRelays = 'destinations.%s.attemptedRelays' % self.destinationName |
213 | 265 | if self.queueFull.called: |
214 | 266 | log.clients('%s send queue has space available' % self.connectedProtocol) |
215 | 267 | self.queueFull = Deferred() |
216 | self.queueFull.addCallback(self.queueFullCallback) | |
268 | self.queueFull.addCallbacks(self.queueFullCallback, log.err) | |
217 | 269 | state.events.cacheSpaceAvailable() |
218 | 270 | self.queueHasSpace = Deferred() |
219 | self.queueHasSpace.addCallback(self.queueSpaceCallback) | |
271 | self.queueHasSpace.addCallbacks(self.queueSpaceCallback, log.err) | |
220 | 272 | |
221 | 273 | def buildProtocol(self, addr): |
222 | 274 | self.connectedProtocol = self.clientProtocol() |
225 | 277 | |
226 | 278 | def startConnecting(self): # calling this startFactory yields recursion problems |
227 | 279 | self.started = True |
228 | self.connector = reactor.connectTCP(self.host, self.port, self) | |
280 | ||
281 | if settings['DESTINATION_TRANSPORT'] == "ssl": | |
282 | if not SSL or not ssl: | |
283 | print("SSL destination transport request, but no Python OpenSSL available.") | |
284 | raise SystemExit(1) | |
285 | authority = None | |
286 | if settings['DESTINATION_SSL_CA']: | |
287 | try: | |
288 | with open(settings['DESTINATION_SSL_CA']) as f: | |
289 | authority = ssl.Certificate.loadPEM(f.read()) | |
290 | except IOError: | |
291 | print("Failed to read CA chain: %s" % settings['DESTINATION_SSL_CA']) | |
292 | raise SystemExit(1) | |
293 | # Twisted 14 introduced this function, it might not be around on older installs. | |
294 | if hasattr(ssl, "optionsForClientTLS"): | |
295 | from six import u | |
296 | client = ssl.optionsForClientTLS(u(self.host), authority) | |
297 | else: | |
298 | client = CAReplaceClientContextFactory(settings['DESTINATION_SSL_CA']) | |
299 | self.connector = reactor.connectSSL(self.host, self.port, self, client) | |
300 | else: | |
301 | self.connector = reactor.connectTCP(self.host, self.port, self) | |
229 | 302 | |
230 | 303 | def stopConnecting(self): |
231 | 304 | self.started = False |
246 | 319 | queue. |
247 | 320 | """ |
248 | 321 | def yield_max_datapoints(): |
249 | for count in range(settings.MAX_DATAPOINTS_PER_MESSAGE): | |
322 | for _ in range(settings.MAX_DATAPOINTS_PER_MESSAGE): | |
250 | 323 | try: |
251 | 324 | yield self.queue.popleft() |
252 | 325 | except IndexError: |
253 | raise StopIteration | |
326 | return | |
254 | 327 | return list(yield_max_datapoints()) |
255 | 328 | |
256 | 329 | def checkQueue(self): |
305 | 378 | instrumentation.increment(self.queuedUntilConnected) |
306 | 379 | |
307 | 380 | def startedConnecting(self, connector): |
308 | log.clients("%s::startedConnecting (%s:%d)" % (self, connector.host, connector.port)) | |
381 | log.clients("%s::startedConnecting (%s:%d)" % ( | |
382 | self, connector.host, connector.port)) | |
383 | ||
384 | def clientConnectionMade(self, client): | |
385 | log.clients("%s::connectionMade (%s)" % (self, client)) | |
386 | self.resetDelay() | |
387 | self.destinationUp(client.destination) | |
388 | self.connectionMade.addCallbacks(self.clientConnectionMade, log.err) | |
389 | return client | |
309 | 390 | |
310 | 391 | def clientConnectionLost(self, connector, reason): |
311 | 392 | ReconnectingClientFactory.clientConnectionLost(self, connector, reason) |
312 | log.clients("%s::clientConnectionLost (%s:%d) %s" % (self, connector.host, connector.port, reason.getErrorMessage())) | |
393 | log.clients("%s::clientConnectionLost (%s:%d) %s" % ( | |
394 | self, connector.host, connector.port, reason.getErrorMessage())) | |
313 | 395 | self.connectedProtocol = None |
314 | self.connectionLost.callback(0) | |
396 | ||
397 | self.destinationDown(self.destination) | |
398 | ||
399 | args = dict(connector=connector, reason=reason) | |
400 | d = self.connectionLost | |
315 | 401 | self.connectionLost = Deferred() |
402 | d.callback(args) | |
316 | 403 | |
317 | 404 | def clientConnectionFailed(self, connector, reason): |
318 | 405 | ReconnectingClientFactory.clientConnectionFailed(self, connector, reason) |
319 | log.clients("%s::clientConnectionFailed (%s:%d) %s" % (self, connector.host, connector.port, reason.getErrorMessage())) | |
320 | self.connectFailed.callback(dict(connector=connector, reason=reason)) | |
406 | log.clients("%s::clientConnectionFailed (%s:%d) %s" % ( | |
407 | self, connector.host, connector.port, reason.getErrorMessage())) | |
408 | ||
409 | self.destinationDown(self.destination) | |
410 | ||
411 | args = dict(connector=connector, reason=reason) | |
412 | d = self.connectFailed | |
321 | 413 | self.connectFailed = Deferred() |
414 | d.callback(args) | |
415 | ||
416 | def destinationUp(self, destination): | |
417 | log.clients("Destination is up: %s:%d:%s" % destination) | |
418 | if not self.router.hasDestination(destination): | |
419 | log.clients("Adding client %s:%d:%s to router" % destination) | |
420 | self.router.addDestination(destination) | |
421 | state.events.resumeReceivingMetrics() | |
422 | ||
423 | def destinationDown(self, destination): | |
424 | # Only blacklist the destination if we tried a lot. | |
425 | log.clients("Destination is down: %s:%d:%s (%d/%d)" % ( | |
426 | destination[0], destination[1], destination[2], self.retries, | |
427 | settings.DYNAMIC_ROUTER_MAX_RETRIES)) | |
428 | # Retries comes from the ReconnectingClientFactory. | |
429 | if self.retries < settings.DYNAMIC_ROUTER_MAX_RETRIES: | |
430 | return | |
431 | ||
432 | if settings.DYNAMIC_ROUTER and self.router.hasDestination(destination): | |
433 | log.clients("Removing client %s:%d:%s to router" % destination) | |
434 | self.router.removeDestination(destination) | |
435 | # Do not receive more metrics if we don't have any usable destinations. | |
436 | if not self.router.countDestinations(): | |
437 | state.events.pauseReceivingMetrics() | |
438 | # Re-inject queued metrics. | |
439 | metrics = list(self.queue) | |
440 | log.clients("Re-injecting %d metrics from %s" % (len(metrics), self)) | |
441 | for metric, datapoint in metrics: | |
442 | state.events.metricGenerated(metric, datapoint) | |
443 | self.queue.clear() | |
322 | 444 | |
323 | 445 | def disconnect(self): |
324 | self.queueEmpty.addCallback(lambda result: self.stopConnecting()) | |
446 | self.queueEmpty.addCallbacks(lambda result: self.stopConnecting(), log.err) | |
325 | 447 | readyToStop = DeferredList( |
326 | 448 | [self.connectionLost, self.connectFailed], |
327 | 449 | fireOnOneCallback=True, |
343 | 465 | class CarbonPickleClientProtocol(CarbonClientProtocol, Int32StringReceiver): |
344 | 466 | |
345 | 467 | def _sendDatapointsNow(self, datapoints): |
346 | self.sendString(pickle.dumps(datapoints, protocol=-1)) | |
468 | self.sendString(pickle.dumps(datapoints, protocol=2)) | |
347 | 469 | |
348 | 470 | |
349 | 471 | class CarbonPickleClientFactory(CarbonClientFactory): |
357 | 479 | |
358 | 480 | def _sendDatapointsNow(self, datapoints): |
359 | 481 | for metric, datapoint in datapoints: |
360 | self.sendLine("%s %s %d" % (metric, datapoint[1], datapoint[0])) | |
482 | if isinstance(datapoint[1], float): | |
483 | value = ("%.10f" % datapoint[1]).rstrip('0').rstrip('.') | |
484 | else: | |
485 | value = "%d" % datapoint[1] | |
486 | self.sendLine("%s %s %d" % (metric, value, datapoint[0])) | |
361 | 487 | |
362 | 488 | |
363 | 489 | class CarbonLineClientFactory(CarbonClientFactory): |
367 | 493 | return CarbonLineClientProtocol() |
368 | 494 | |
369 | 495 | |
496 | class FakeClientFactory(object): | |
497 | """Fake client factory that buffers points | |
498 | ||
499 | This is used when all the destinations are down and before we | |
500 | pause the reception of metrics to avoid loosing points. | |
501 | """ | |
502 | ||
503 | def __init__(self): | |
504 | # This queue isn't explicitely bounded but will implicitely be. It receives | |
505 | # only metrics when no destinations are available, and as soon as we detect | |
506 | # that we don't have any destination we pause the producer: this mean that | |
507 | # it will contain only a few seconds of metrics. | |
508 | self.queue = deque() | |
509 | self.started = False | |
510 | ||
511 | def startConnecting(self): | |
512 | pass | |
513 | ||
514 | def sendDatapoint(self, metric, datapoint): | |
515 | self.queue.append((metric, datapoint)) | |
516 | ||
517 | def sendHighPriorityDatapoint(self, metric, datapoint): | |
518 | self.queue.append((metric, datapoint)) | |
519 | ||
520 | def reinjectDatapoints(self): | |
521 | metrics = list(self.queue) | |
522 | log.clients("Re-injecting %d metrics from %s" % (len(metrics), self)) | |
523 | for metric, datapoint in metrics: | |
524 | state.events.metricGenerated(metric, datapoint) | |
525 | self.queue.clear() | |
526 | ||
527 | ||
370 | 528 | class CarbonClientManager(Service): |
371 | 529 | def __init__(self, router): |
530 | if settings.DESTINATION_POOL_REPLICAS: | |
531 | # If we decide to open multiple TCP connection to a replica, we probably | |
532 | # want to try to also load-balance accross hosts. In this case we need | |
533 | # to make sure rfc3484 doesn't get in the way. | |
534 | if setUpRandomResolver: | |
535 | setUpRandomResolver(reactor) | |
536 | else: | |
537 | print("Import error, Twisted >= 17.1.0 needed for using DESTINATION_POOL_REPLICAS.") | |
538 | raise SystemExit(1) | |
539 | ||
372 | 540 | self.router = router |
373 | 541 | self.client_factories = {} # { destination : CarbonClientFactory() } |
542 | # { destination[0:2]: set(CarbonClientFactory()) } | |
543 | self.pooled_factories = defaultdict(set) | |
544 | ||
545 | # This fake factory will be used as a buffer when we did not manage | |
546 | # to connect to any destination. | |
547 | fake_factory = FakeClientFactory() | |
548 | self.client_factories[None] = fake_factory | |
549 | state.events.resumeReceivingMetrics.addHandler(fake_factory.reinjectDatapoints) | |
374 | 550 | |
375 | 551 | def createFactory(self, destination): |
376 | from carbon.conf import settings | |
377 | ||
378 | 552 | factory_name = settings["DESTINATION_PROTOCOL"] |
379 | 553 | factory_class = CarbonClientFactory.plugins.get(factory_name) |
380 | 554 | |
381 | 555 | if not factory_class: |
382 | print ("In carbon.conf, DESTINATION_PROTOCOL must be one of %s. " | |
383 | "Invalid value: '%s'" % (', '.join(CarbonClientFactory.plugins), factory_name)) | |
556 | print("In carbon.conf, DESTINATION_PROTOCOL must be one of %s. " | |
557 | "Invalid value: '%s'" % (', '.join(CarbonClientFactory.plugins), factory_name)) | |
384 | 558 | raise SystemExit(1) |
385 | 559 | |
386 | return factory_class(destination) | |
560 | return factory_class(destination, self.router) | |
387 | 561 | |
388 | 562 | def startService(self): |
389 | 563 | if 'signal' in globals().keys(): |
403 | 577 | return |
404 | 578 | |
405 | 579 | log.clients("connecting to carbon daemon at %s:%d:%s" % destination) |
406 | self.router.addDestination(destination) | |
580 | if not settings.DYNAMIC_ROUTER: | |
581 | # If not using a dynamic router we add the destination before | |
582 | # it's known to be working. | |
583 | self.router.addDestination(destination) | |
407 | 584 | |
408 | 585 | factory = self.createFactory(destination) |
409 | 586 | self.client_factories[destination] = factory |
587 | self.pooled_factories[destination[0:2]].add(factory) | |
588 | ||
410 | 589 | connectAttempted = DeferredList( |
411 | 590 | [factory.connectionMade, factory.connectFailed], |
412 | 591 | fireOnOneCallback=True, |
418 | 597 | |
419 | 598 | def stopClient(self, destination): |
420 | 599 | factory = self.client_factories.get(destination) |
421 | if factory is None: | |
422 | return | |
600 | if factory is None or destination is None: | |
601 | return None | |
423 | 602 | |
424 | 603 | self.router.removeDestination(destination) |
425 | 604 | stopCompleted = factory.disconnect() |
426 | stopCompleted.addCallback(lambda result: self.disconnectClient(destination)) | |
605 | stopCompleted.addCallbacks( | |
606 | lambda result: self.disconnectClient(destination), log.err | |
607 | ) | |
427 | 608 | return stopCompleted |
428 | 609 | |
429 | 610 | def disconnectClient(self, destination): |
430 | 611 | factory = self.client_factories.pop(destination) |
612 | self.pooled_factories[destination[0:2]].remove(factory) | |
431 | 613 | c = factory.connector |
432 | 614 | if c and c.state == 'connecting' and not factory.hasQueuedDatapoints(): |
433 | 615 | c.stopConnecting() |
435 | 617 | def stopAllClients(self): |
436 | 618 | deferreds = [] |
437 | 619 | for destination in list(self.client_factories): |
438 | deferreds.append(self.stopClient(destination)) | |
620 | deferred = self.stopClient(destination) | |
621 | if deferred: | |
622 | deferreds.append(deferred) | |
439 | 623 | return DeferredList(deferreds) |
440 | 624 | |
625 | def getDestinations(self, metric): | |
626 | destinations = list(self.router.getDestinations(metric)) | |
627 | # If we can't find any destination we just buffer the | |
628 | # points. We will also pause the socket on the receiving side. | |
629 | if not destinations: | |
630 | return [None] | |
631 | return destinations | |
632 | ||
633 | def getFactories(self, metric): | |
634 | destinations = self.getDestinations(metric) | |
635 | factories = set() | |
636 | ||
637 | if not settings.DESTINATION_POOL_REPLICAS: | |
638 | # Simple case, with only one replica per destination. | |
639 | for d in destinations: | |
640 | # If we can't find it, we add to the 'fake' factory / buffer. | |
641 | factories.add(self.client_factories.get(d)) | |
642 | else: | |
643 | # Here we might have multiple replicas per destination. | |
644 | for d in destinations: | |
645 | if d is None: | |
646 | # d == None means there are no destinations currently available, so | |
647 | # we just put the data into our fake factory / buffer. | |
648 | factories.add(self.client_factories[None]) | |
649 | else: | |
650 | # Else we take the replica with the smallest queue size. | |
651 | key = d[0:2] # Take only host:port, not instance. | |
652 | factories.add(min(self.pooled_factories[key], key=lambda f: f.queueSize)) | |
653 | return factories | |
654 | ||
441 | 655 | def sendDatapoint(self, metric, datapoint): |
442 | for destination in self.router.getDestinations(metric): | |
443 | self.client_factories[destination].sendDatapoint(metric, datapoint) | |
656 | for factory in self.getFactories(metric): | |
657 | factory.sendDatapoint(metric, datapoint) | |
444 | 658 | |
445 | 659 | def sendHighPriorityDatapoint(self, metric, datapoint): |
446 | for destination in self.router.getDestinations(metric): | |
447 | self.client_factories[destination].sendHighPriorityDatapoint(metric, datapoint) | |
660 | for factory in self.getFactories(metric): | |
661 | factory.sendHighPriorityDatapoint(metric, datapoint) | |
448 | 662 | |
449 | 663 | def __str__(self): |
450 | 664 | return "<%s[%x]>" % (self.__class__.__name__, id(self)) |
18 | 18 | |
19 | 19 | from os.path import join, dirname, normpath, exists, isdir |
20 | 20 | from optparse import OptionParser |
21 | from ConfigParser import ConfigParser | |
21 | ||
22 | try: | |
23 | from ConfigParser import ConfigParser | |
24 | # ConfigParser is renamed to configparser in py3 | |
25 | except ImportError: | |
26 | from configparser import ConfigParser | |
22 | 27 | |
23 | 28 | from carbon import log, state |
24 | 29 | from carbon.database import TimeSeriesDatabase |
34 | 39 | MAX_UPDATES_PER_SECOND=500, |
35 | 40 | MAX_CREATES_PER_MINUTE=float('inf'), |
36 | 41 | MIN_TIMESTAMP_RESOLUTION=0, |
42 | MIN_TIMESTAMP_LAG=0, | |
37 | 43 | LINE_RECEIVER_INTERFACE='0.0.0.0', |
38 | 44 | LINE_RECEIVER_PORT=2003, |
39 | 45 | ENABLE_UDP_LISTENER=False, |
69 | 75 | AMQP_VERBOSE=False, |
70 | 76 | AMQP_SPEC=None, |
71 | 77 | BIND_PATTERNS=['#'], |
78 | GRAPHITE_URL='http://127.0.0.1:80', | |
79 | ENABLE_TAGS=True, | |
80 | TAG_UPDATE_INTERVAL=100, | |
81 | TAG_BATCH_SIZE=100, | |
82 | TAG_QUEUE_SIZE=10000, | |
83 | TAG_HASH_FILENAMES=True, | |
72 | 84 | ENABLE_MANHOLE=False, |
73 | 85 | MANHOLE_INTERFACE='127.0.0.1', |
74 | 86 | MANHOLE_PORT=7222, |
75 | 87 | MANHOLE_USER="", |
76 | 88 | MANHOLE_PUBLIC_KEY="", |
77 | 89 | RELAY_METHOD='rules', |
90 | DYNAMIC_ROUTER=False, | |
91 | DYNAMIC_ROUTER_MAX_RETRIES=5, | |
92 | ROUTER_HASH_TYPE=None, | |
78 | 93 | REPLICATION_FACTOR=1, |
79 | 94 | DIVERSE_REPLICAS=True, |
80 | 95 | DESTINATIONS=[], |
81 | 96 | DESTINATION_PROTOCOL="pickle", |
97 | DESTINATION_TRANSPORT="none", | |
98 | DESTINATION_SSL_CA=None, | |
99 | DESTINATION_POOL_REPLICAS=False, | |
82 | 100 | USE_FLOW_CONTROL=True, |
83 | 101 | USE_INSECURE_UNPICKLER=False, |
84 | 102 | USE_WHITELIST=False, |
89 | 107 | MIN_RESET_STAT_FLOW=1000, |
90 | 108 | MIN_RESET_RATIO=0.9, |
91 | 109 | MIN_RESET_INTERVAL=121, |
110 | TCP_KEEPALIVE=True, | |
111 | TCP_KEEPIDLE=10, | |
112 | TCP_KEEPINTVL=30, | |
113 | TCP_KEEPCNT=2, | |
92 | 114 | USE_RATIO_RESET=False, |
93 | 115 | LOG_LISTENER_CONN_SUCCESS=True, |
94 | 116 | LOG_AGGREGATOR_MISSES=True, |
97 | 119 | RELAY_RULES='relay-rules.conf', |
98 | 120 | ENABLE_LOGROTATION=True, |
99 | 121 | METRIC_CLIENT_IDLE_TIMEOUT=None, |
122 | CACHE_METRIC_NAMES_MAX=0, | |
123 | CACHE_METRIC_NAMES_TTL=0, | |
124 | RAVEN_DSN=None, | |
100 | 125 | ) |
101 | 126 | |
102 | 127 | |
107 | 132 | try: |
108 | 133 | os.kill(int(pid), 0) |
109 | 134 | return True |
110 | except OSError, err: | |
135 | except OSError as err: | |
111 | 136 | return err.errno == errno.EPERM |
112 | 137 | |
113 | 138 | |
215 | 240 | self["pidfile"] = pidfile |
216 | 241 | |
217 | 242 | # Enforce a default umask of '022' if none was set. |
218 | if not self.parent.has_key("umask") or self.parent["umask"] is None: | |
219 | self.parent["umask"] = 022 | |
243 | if "umask" not in self.parent or self.parent["umask"] is None: | |
244 | self.parent["umask"] = 0o022 | |
220 | 245 | |
221 | 246 | # Read extra settings from the configuration file. |
222 | 247 | program_settings = read_config(program, self) |
224 | 249 | settings["program"] = program |
225 | 250 | |
226 | 251 | # Normalize and expand paths |
227 | settings["STORAGE_DIR"] = os.path.normpath(os.path.expanduser(settings["STORAGE_DIR"])) | |
228 | settings["LOCAL_DATA_DIR"] = os.path.normpath(os.path.expanduser(settings["LOCAL_DATA_DIR"])) | |
229 | settings["WHITELISTS_DIR"] = os.path.normpath(os.path.expanduser(settings["WHITELISTS_DIR"])) | |
230 | settings["PID_DIR"] = os.path.normpath(os.path.expanduser(settings["PID_DIR"])) | |
231 | settings["LOG_DIR"] = os.path.normpath(os.path.expanduser(settings["LOG_DIR"])) | |
232 | settings["pidfile"] = os.path.normpath(os.path.expanduser(settings["pidfile"])) | |
252 | def cleanpath(path): | |
253 | return os.path.normpath(os.path.expanduser(path)) | |
254 | settings["STORAGE_DIR"] = cleanpath(settings["STORAGE_DIR"]) | |
255 | settings["LOCAL_DATA_DIR"] = cleanpath(settings["LOCAL_DATA_DIR"]) | |
256 | settings["WHITELISTS_DIR"] = cleanpath(settings["WHITELISTS_DIR"]) | |
257 | settings["PID_DIR"] = cleanpath(settings["PID_DIR"]) | |
258 | settings["LOG_DIR"] = cleanpath(settings["LOG_DIR"]) | |
259 | settings["pidfile"] = cleanpath(settings["pidfile"]) | |
233 | 260 | |
234 | 261 | # Set process uid/gid by changing the parent config, if a user was |
235 | 262 | # provided in the configuration file. |
243 | 270 | |
244 | 271 | storage_schemas = join(settings["CONF_DIR"], "storage-schemas.conf") |
245 | 272 | if not exists(storage_schemas): |
246 | print "Error: missing required config %s" % storage_schemas | |
273 | print("Error: missing required config %s" % storage_schemas) | |
247 | 274 | sys.exit(1) |
248 | 275 | |
249 | 276 | if settings.CACHE_WRITE_STRATEGY not in ('timesorted', 'sorted', 'max', 'naive'): |
255 | 282 | # Database-specific settings |
256 | 283 | database = settings.DATABASE |
257 | 284 | if database not in TimeSeriesDatabase.plugins: |
258 | print "No database plugin implemented for '%s'" % database | |
285 | print("No database plugin implemented for '%s'" % database) | |
259 | 286 | raise SystemExit(1) |
260 | 287 | |
261 | 288 | database_class = TimeSeriesDatabase.plugins[database] |
263 | 290 | |
264 | 291 | settings.CACHE_SIZE_LOW_WATERMARK = settings.MAX_CACHE_SIZE * 0.95 |
265 | 292 | |
266 | if not "action" in self: | |
293 | if "action" not in self: | |
267 | 294 | self["action"] = "start" |
268 | 295 | self.handleAction() |
269 | 296 | |
314 | 341 | |
315 | 342 | if action == "stop": |
316 | 343 | if not exists(pidfile): |
317 | print "Pidfile %s does not exist" % pidfile | |
344 | print("Pidfile %s does not exist" % pidfile) | |
318 | 345 | raise SystemExit(0) |
319 | 346 | pf = open(pidfile, 'r') |
320 | 347 | try: |
321 | 348 | pid = int(pf.read().strip()) |
322 | 349 | pf.close() |
323 | 350 | except ValueError: |
324 | print "Failed to parse pid from pidfile %s" % pidfile | |
351 | print("Failed to parse pid from pidfile %s" % pidfile) | |
325 | 352 | raise SystemExit(1) |
326 | 353 | except IOError: |
327 | print "Could not read pidfile %s" % pidfile | |
354 | print("Could not read pidfile %s" % pidfile) | |
328 | 355 | raise SystemExit(1) |
329 | print "Sending kill signal to pid %d" % pid | |
356 | print("Sending kill signal to pid %d" % pid) | |
330 | 357 | try: |
331 | 358 | os.kill(pid, 15) |
332 | except OSError, e: | |
359 | except OSError as e: | |
333 | 360 | if e.errno == errno.ESRCH: |
334 | print "No process with pid %d running" % pid | |
361 | print("No process with pid %d running" % pid) | |
335 | 362 | else: |
336 | 363 | raise |
337 | 364 | |
339 | 366 | |
340 | 367 | elif action == "status": |
341 | 368 | if not exists(pidfile): |
342 | print "%s (instance %s) is not running" % (program, instance) | |
369 | print("%s (instance %s) is not running" % (program, instance)) | |
343 | 370 | raise SystemExit(1) |
344 | 371 | pf = open(pidfile, "r") |
345 | 372 | try: |
346 | 373 | pid = int(pf.read().strip()) |
347 | 374 | pf.close() |
348 | 375 | except ValueError: |
349 | print "Failed to parse pid from pidfile %s" % pidfile | |
376 | print("Failed to parse pid from pidfile %s" % pidfile) | |
350 | 377 | raise SystemExit(1) |
351 | 378 | except IOError: |
352 | print "Failed to read pid from %s" % pidfile | |
379 | print("Failed to read pid from %s" % pidfile) | |
353 | 380 | raise SystemExit(1) |
354 | 381 | |
355 | 382 | if _process_alive(pid): |
356 | print ("%s (instance %s) is running with pid %d" % | |
357 | (program, instance, pid)) | |
383 | print("%s (instance %s) is running with pid %d" % | |
384 | (program, instance, pid)) | |
358 | 385 | raise SystemExit(0) |
359 | 386 | else: |
360 | print "%s (instance %s) is not running" % (program, instance) | |
387 | print("%s (instance %s) is not running" % (program, instance)) | |
361 | 388 | raise SystemExit(1) |
362 | 389 | |
363 | 390 | elif action == "start": |
367 | 394 | pid = int(pf.read().strip()) |
368 | 395 | pf.close() |
369 | 396 | except ValueError: |
370 | print "Failed to parse pid from pidfile %s" % pidfile | |
397 | print("Failed to parse pid from pidfile %s" % pidfile) | |
371 | 398 | raise SystemExit(1) |
372 | 399 | except IOError: |
373 | print "Could not read pidfile %s" % pidfile | |
400 | print("Could not read pidfile %s" % pidfile) | |
374 | 401 | raise SystemExit(1) |
375 | 402 | if _process_alive(pid): |
376 | print ("%s (instance %s) is already running with pid %d" % | |
377 | (program, instance, pid)) | |
403 | print("%s (instance %s) is already running with pid %d" % | |
404 | (program, instance, pid)) | |
378 | 405 | raise SystemExit(1) |
379 | 406 | else: |
380 | print "Removing stale pidfile %s" % pidfile | |
407 | print("Removing stale pidfile %s" % pidfile) | |
381 | 408 | try: |
382 | 409 | os.unlink(pidfile) |
383 | 410 | except IOError: |
384 | print "Could not remove pidfile %s" % pidfile | |
411 | print("Could not remove pidfile %s" % pidfile) | |
385 | 412 | # Try to create the PID directory |
386 | 413 | else: |
387 | 414 | if not os.path.exists(settings["PID_DIR"]): |
388 | 415 | try: |
389 | 416 | os.makedirs(settings["PID_DIR"]) |
390 | except OSError as exc: # Python >2.5 | |
417 | except OSError as exc: # Python >2.5 | |
391 | 418 | if exc.errno == errno.EEXIST and os.path.isdir(settings["PID_DIR"]): |
392 | 419 | pass |
393 | 420 | else: |
394 | 421 | raise |
395 | 422 | |
396 | ||
397 | ||
398 | print "Starting %s (instance %s)" % (program, instance) | |
423 | print("Starting %s (instance %s)" % (program, instance)) | |
399 | 424 | |
400 | 425 | else: |
401 | print "Invalid action '%s'" % action | |
402 | print "Valid actions: start stop status" | |
426 | print("Invalid action '%s'" % action) | |
427 | print("Valid actions: start stop status") | |
403 | 428 | raise SystemExit(1) |
404 | 429 | |
405 | 430 | |
408 | 433 | optParameters = [ |
409 | 434 | ["rules", "", None, "Use the given aggregation rules file."], |
410 | 435 | ["rewrite-rules", "", None, "Use the given rewrite rules file."], |
411 | ] + CarbonCacheOptions.optParameters | |
436 | ] + CarbonCacheOptions.optParameters | |
412 | 437 | |
413 | 438 | def postOptions(self): |
414 | 439 | CarbonCacheOptions.postOptions(self) |
427 | 452 | optParameters = [ |
428 | 453 | ["rules", "", None, "Use the given relay rules file."], |
429 | 454 | ["aggregation-rules", "", None, "Use the given aggregation rules file."], |
430 | ] + CarbonCacheOptions.optParameters | |
455 | ] + CarbonCacheOptions.optParameters | |
431 | 456 | |
432 | 457 | def postOptions(self): |
433 | 458 | CarbonCacheOptions.postOptions(self) |
441 | 466 | |
442 | 467 | router = settings["RELAY_METHOD"] |
443 | 468 | if router not in DatapointRouter.plugins: |
444 | print ("In carbon.conf, RELAY_METHOD must be one of %s. " | |
445 | "Invalid value: '%s'" % (', '.join(DatapointRouter.plugins), router)) | |
469 | print("In carbon.conf, RELAY_METHOD must be one of %s. " | |
470 | "Invalid value: '%s'" % (', '.join(DatapointRouter.plugins), router)) | |
446 | 471 | raise SystemExit(1) |
447 | 472 | |
448 | 473 | |
490 | 515 | "--instance", |
491 | 516 | default='a', |
492 | 517 | help="Manage a specific carbon instance") |
493 | ||
518 | parser.add_option( | |
519 | "--logfile", | |
520 | default=None, | |
521 | help="Log to a specified file, - for stdout") | |
522 | parser.add_option( | |
523 | "--logger", | |
524 | default=None, | |
525 | help="A fully-qualified name to a log observer factory to use for the initial log " | |
526 | "observer. Takes precedence over --logfile and --syslog (when available).") | |
494 | 527 | return parser |
495 | 528 | |
496 | 529 | |
497 | 530 | def get_parser(name): |
498 | 531 | parser = get_default_parser() |
499 | if name == "carbon-aggregator": | |
532 | if "carbon-aggregator" in name: | |
500 | 533 | parser.add_option( |
501 | 534 | "--rules", |
502 | 535 | default=None, |
548 | 581 | graphite_root = os.environ.get('GRAPHITE_ROOT') |
549 | 582 | if graphite_root is None: |
550 | 583 | raise CarbonConfigException("Either ROOT_DIR or GRAPHITE_ROOT " |
551 | "needs to be provided.") | |
584 | "needs to be provided.") | |
552 | 585 | |
553 | 586 | # Default config directory to root-relative, unless overriden by the |
554 | 587 | # 'GRAPHITE_CONF_DIR' environment variable. |
580 | 613 | settings.setdefault( |
581 | 614 | "WHITELISTS_DIR", join(settings["STORAGE_DIR"], "lists")) |
582 | 615 | |
583 | ||
584 | ||
585 | 616 | # Read configuration options from program-specific section. |
586 | 617 | section = program[len("carbon-"):] |
587 | 618 | config = options["config"] |
601 | 632 | "%s:%s" % (section, options["instance"])) |
602 | 633 | settings["pidfile"] = ( |
603 | 634 | options["pidfile"] or |
604 | join(settings["PID_DIR"], "%s-%s.pid" % | |
605 | (program, options["instance"]))) | |
606 | settings["LOG_DIR"] = (options["logdir"] or | |
607 | join(settings["LOG_DIR"], | |
608 | "%s-%s" % (program, options["instance"]))) | |
635 | join(settings["PID_DIR"], "%s-%s.pid" % (program, options["instance"]))) | |
636 | settings["LOG_DIR"] = ( | |
637 | options["logdir"] or | |
638 | join(settings["LOG_DIR"], "%s-%s" % (program, options["instance"]))) | |
609 | 639 | else: |
610 | 640 | settings["pidfile"] = ( |
611 | options["pidfile"] or | |
612 | join(settings["PID_DIR"], '%s.pid' % program)) | |
641 | options["pidfile"] or join(settings["PID_DIR"], '%s.pid' % program)) | |
613 | 642 | settings["LOG_DIR"] = (options["logdir"] or settings["LOG_DIR"]) |
614 | 643 | |
615 | 644 | update_STORAGE_DIR_deps() |
12 | 12 | limitations under the License.""" |
13 | 13 | |
14 | 14 | import os |
15 | import time | |
16 | ||
15 | 17 | from os.path import exists, dirname, join, sep |
16 | from carbon.util import PluginRegistrar | |
18 | from carbon.util import PluginRegistrar, TaggedSeries | |
17 | 19 | from carbon import log |
18 | ||
19 | ||
20 | class TimeSeriesDatabase(object): | |
20 | from six import with_metaclass | |
21 | ||
22 | ||
23 | class TimeSeriesDatabase(with_metaclass(PluginRegistrar, object)): | |
21 | 24 | "Abstract base class for Carbon database backends." |
22 | __metaclass__ = PluginRegistrar | |
23 | 25 | plugins = {} |
24 | 26 | |
25 | 27 | "List of supported aggregation methods for the database." |
26 | 28 | aggregationMethods = [] |
27 | 29 | |
30 | def __init__(self, settings): | |
31 | self.graphite_url = settings.GRAPHITE_URL | |
32 | ||
28 | 33 | def write(self, metric, datapoints): |
29 | 34 | "Persist datapoints in the database for metric." |
30 | raise NotImplemented() | |
35 | raise NotImplementedError() | |
31 | 36 | |
32 | 37 | def exists(self, metric): |
33 | 38 | "Return True if the given metric path exists, False otherwise." |
34 | raise NotImplemented() | |
39 | raise NotImplementedError() | |
35 | 40 | |
36 | 41 | def create(self, metric, retentions, xfilesfactor, aggregation_method): |
37 | 42 | "Create an entry in the database for metric using options." |
38 | raise NotImplemented() | |
43 | raise NotImplementedError() | |
39 | 44 | |
40 | 45 | def getMetadata(self, metric, key): |
41 | 46 | "Lookup metric metadata." |
42 | raise NotImplemented() | |
47 | raise NotImplementedError() | |
43 | 48 | |
44 | 49 | def setMetadata(self, metric, key, value): |
45 | 50 | "Modify metric metadata." |
46 | raise NotImplemented() | |
51 | raise NotImplementedError() | |
47 | 52 | |
48 | 53 | def getFilesystemPath(self, metric): |
49 | 54 | "Return filesystem path for metric, defaults to None." |
52 | 57 | def validateArchiveList(self, archiveList): |
53 | 58 | "Validate that the database can handle the given archiveList." |
54 | 59 | pass |
60 | ||
61 | def tag(self, *metrics): | |
62 | from carbon.http import httpRequest | |
63 | ||
64 | log.debug("Tagging %s" % ', '.join(metrics), type='tagdb') | |
65 | t = time.time() | |
66 | ||
67 | try: | |
68 | httpRequest( | |
69 | self.graphite_url + '/tags/tagMultiSeries', | |
70 | [('path', metric) for metric in metrics] | |
71 | ) | |
72 | log.debug("Tagged %s in %s" % (', '.join(metrics), time.time() - t), type='tagdb') | |
73 | except Exception as err: | |
74 | log.msg("Error tagging %s: %s" % (', '.join(metrics), err), type='tagdb') | |
55 | 75 | |
56 | 76 | |
57 | 77 | try: |
64 | 84 | aggregationMethods = whisper.aggregationMethods |
65 | 85 | |
66 | 86 | def __init__(self, settings): |
87 | super(WhisperDatabase, self).__init__(settings) | |
88 | ||
67 | 89 | self.data_dir = settings.LOCAL_DATA_DIR |
90 | self.tag_hash_filenames = settings.TAG_HASH_FILENAMES | |
68 | 91 | self.sparse_create = settings.WHISPER_SPARSE_CREATE |
69 | 92 | self.fallocate_create = settings.WHISPER_FALLOCATE_CREATE |
70 | 93 | if settings.WHISPER_AUTOFLUSH: |
92 | 115 | else: |
93 | 116 | log.err("WHISPER_FADVISE_RANDOM is enabled but import of ftools module failed.") |
94 | 117 | except AttributeError: |
95 | log.err("WHISPER_FADVISE_RANDOM is enabled but skipped because it is not compatible with the version of Whisper.") | |
118 | log.err("WHISPER_FADVISE_RANDOM is enabled but skipped because it is not compatible " + | |
119 | "with the version of Whisper.") | |
96 | 120 | |
97 | 121 | def write(self, metric, datapoints): |
98 | 122 | path = self.getFilesystemPath(metric) |
99 | 123 | whisper.update_many(path, datapoints) |
100 | 124 | |
101 | 125 | def exists(self, metric): |
102 | return exists(self.getFilesystemPath(metric)) | |
126 | if exists(self.getFilesystemPath(metric)): | |
127 | return True | |
128 | # if we're using hashed filenames and a non-hashed file exists then move it to the new name | |
129 | if self.tag_hash_filenames and exists(self._getFilesystemPath(metric, False)): | |
130 | os.rename(self._getFilesystemPath(metric, False), self.getFilesystemPath(metric)) | |
131 | return True | |
132 | return False | |
103 | 133 | |
104 | 134 | def create(self, metric, retentions, xfilesfactor, aggregation_method): |
105 | 135 | path = self.getFilesystemPath(metric) |
107 | 137 | try: |
108 | 138 | if not exists(directory): |
109 | 139 | os.makedirs(directory) |
110 | except OSError, e: | |
140 | except OSError as e: | |
111 | 141 | log.err("%s" % e) |
112 | 142 | |
113 | 143 | whisper.create(path, retentions, xfilesfactor, aggregation_method, |
128 | 158 | return whisper.setAggregationMethod(wsp_path, value) |
129 | 159 | |
130 | 160 | def getFilesystemPath(self, metric): |
131 | metric_path = metric.replace('.', sep).lstrip(sep) + '.wsp' | |
132 | return join(self.data_dir, metric_path) | |
161 | return self._getFilesystemPath(metric, self.tag_hash_filenames) | |
162 | ||
163 | def _getFilesystemPath(self, metric, tag_hash_filenames): | |
164 | return join( | |
165 | self.data_dir, | |
166 | TaggedSeries.encode(metric, sep, hash_only=tag_hash_filenames) + '.wsp' | |
167 | ) | |
133 | 168 | |
134 | 169 | def validateArchiveList(self, archiveList): |
135 | 170 | try: |
136 | 171 | whisper.validateArchiveList(archiveList) |
137 | except whisper.InvalidConfiguration, e: | |
172 | except whisper.InvalidConfiguration as e: | |
138 | 173 | raise ValueError("%s" % e) |
139 | 174 | |
140 | 175 | |
145 | 180 | else: |
146 | 181 | class CeresDatabase(TimeSeriesDatabase): |
147 | 182 | plugin_name = 'ceres' |
148 | aggregationMethods = ['average','sum','last','max','min'] | |
183 | aggregationMethods = ['average', 'sum', 'last', 'max', 'min'] | |
149 | 184 | |
150 | 185 | def __init__(self, settings): |
186 | super(CeresDatabase, self).__init__(settings) | |
187 | ||
151 | 188 | self.data_dir = settings.LOCAL_DATA_DIR |
189 | self.tag_hash_filenames = settings.TAG_HASH_FILENAMES | |
152 | 190 | ceres.setDefaultNodeCachingBehavior(settings.CERES_NODE_CACHING_BEHAVIOR) |
153 | 191 | ceres.setDefaultSliceCachingBehavior(settings.CERES_SLICE_CACHING_BEHAVIOR) |
154 | 192 | ceres.MAX_SLICE_GAP = int(settings.CERES_MAX_SLICE_GAP) |
162 | 200 | |
163 | 201 | self.tree = ceres.CeresTree(self.data_dir) |
164 | 202 | |
203 | def encode(self, metric, tag_hash_filenames=None): | |
204 | if tag_hash_filenames is None: | |
205 | tag_hash_filenames = self.tag_hash_filenames | |
206 | return TaggedSeries.encode(metric, hash_only=tag_hash_filenames) | |
207 | ||
165 | 208 | def write(self, metric, datapoints): |
166 | self.tree.store(metric, datapoints) | |
209 | self.tree.store(self.encode(metric), datapoints) | |
167 | 210 | |
168 | 211 | def exists(self, metric): |
169 | return self.tree.hasNode(metric) | |
212 | if self.tree.hasNode(self.encode(metric)): | |
213 | return True | |
214 | # if we're using hashed filenames and a non-hashed file exists then move it to the new name | |
215 | if self.tag_hash_filenames and self.tree.hasNode(self.encode(metric, False)): | |
216 | os.rename(self._getFilesystemPath(metric, False), self.getFilesystemPath(metric)) | |
217 | return True | |
218 | return False | |
170 | 219 | |
171 | 220 | def create(self, metric, retentions, xfilesfactor, aggregation_method): |
172 | self.tree.createNode(metric, retentions=retentions, | |
221 | self.tree.createNode(self.encode(metric), | |
222 | retentions=retentions, | |
173 | 223 | timeStep=retentions[0][0], |
174 | 224 | xFilesFactor=xfilesfactor, |
175 | 225 | aggregationMethod=aggregation_method) |
176 | 226 | |
177 | 227 | def getMetadata(self, metric, key): |
178 | return self.tree.getNode(metric).readMetadata()[key] | |
228 | return self.tree.getNode(self.encode(metric)).readMetadata()[key] | |
179 | 229 | |
180 | 230 | def setMetadata(self, metric, key, value): |
181 | node = self.tree.getNode(metric) | |
231 | node = self.tree.getNode(self.encode(metric)) | |
182 | 232 | metadata = node.readMetadata() |
183 | 233 | metadata[key] = value |
184 | 234 | node.writeMetadata(metadata) |
185 | 235 | |
186 | 236 | def getFilesystemPath(self, metric): |
187 | return self.tree.getFilesystemPath(metric) | |
237 | return self._getFilesystemPath(metric, self.tag_hash_filenames) | |
238 | ||
239 | def _getFilesystemPath(self, metric, tag_hash_filenames): | |
240 | return self.tree.getFilesystemPath(self.encode(metric, tag_hash_filenames)) |
0 | from twisted.python.failure import Failure | |
1 | ||
2 | ||
3 | 0 | class Event: |
4 | 1 | def __init__(self, name): |
5 | 2 | self.name = name |
18 | 15 | try: |
19 | 16 | handler(*args, **kwargs) |
20 | 17 | except Exception: |
21 | log.err(None, "Exception in %s event handler: args=%s kwargs=%s" % (self.name, args, kwargs)) | |
18 | log.err( | |
19 | None, "Exception in %s event handler: args=%s kwargs=%s" % (self.name, args, kwargs)) | |
22 | 20 | |
23 | 21 | |
24 | 22 | metricReceived = Event('metricReceived') |
29 | 27 | resumeReceivingMetrics = Event('resumeReceivingMetrics') |
30 | 28 | |
31 | 29 | # Default handlers |
32 | metricReceived.addHandler(lambda metric, datapoint: state.instrumentation.increment('metricsReceived')) | |
30 | metricReceived.addHandler( | |
31 | lambda metric, datapoint: state.instrumentation.increment('metricsReceived')) | |
33 | 32 | |
34 | 33 | |
35 | 34 | cacheFull.addHandler(lambda: state.instrumentation.increment('cache.overflow')) |
41 | 40 | |
42 | 41 | |
43 | 42 | # Avoid import circularities |
44 | from carbon import log, state | |
43 | from carbon import log, state # NOQA |
0 | from hashlib import md5 | |
1 | import bisect | |
2 | import sys | |
3 | ||
0 | 4 | try: |
1 | from hashlib import md5 | |
5 | import mmh3 | |
2 | 6 | except ImportError: |
3 | from md5 import md5 | |
4 | import bisect | |
7 | mmh3 = None | |
5 | 8 | |
6 | 9 | try: |
7 | 10 | import pyhash |
8 | 11 | hasher = pyhash.fnv1a_32() |
9 | def fnv32a(string, seed=0x811c9dc5): | |
10 | return hasher(string, seed=seed) | |
12 | ||
13 | def fnv32a(data, seed=0x811c9dc5): | |
14 | return hasher(data, seed=seed) | |
11 | 15 | except ImportError: |
12 | def fnv32a(string, seed=0x811c9dc5): | |
16 | def fnv32a(data, seed=0x811c9dc5): | |
13 | 17 | """ |
14 | 18 | FNV-1a Hash (http://isthe.com/chongo/tech/comp/fnv/) in Python. |
15 | 19 | Taken from https://gist.github.com/vaiorabbit/5670985 |
17 | 21 | hval = seed |
18 | 22 | fnv_32_prime = 0x01000193 |
19 | 23 | uint32_max = 2 ** 32 |
20 | for s in string: | |
21 | hval = hval ^ ord(s) | |
22 | hval = (hval * fnv_32_prime) % uint32_max | |
24 | if sys.version_info >= (3, 0): | |
25 | # data is a bytes object, s is an integer | |
26 | for s in data: | |
27 | hval = hval ^ s | |
28 | hval = (hval * fnv_32_prime) % uint32_max | |
29 | else: | |
30 | # data is an str object, s is a single character | |
31 | for s in data: | |
32 | hval = hval ^ ord(s) | |
33 | hval = (hval * fnv_32_prime) % uint32_max | |
23 | 34 | return hval |
35 | ||
36 | ||
37 | def compactHash(string): | |
38 | return md5(string.encode('utf-8')).hexdigest() | |
39 | ||
40 | ||
41 | def carbonHash(key, hash_type): | |
42 | if hash_type == 'fnv1a_ch': | |
43 | big_hash = int(fnv32a(key.encode('utf-8'))) | |
44 | small_hash = (big_hash >> 16) ^ (big_hash & 0xffff) | |
45 | elif hash_type == 'mmh3_ch': | |
46 | if mmh3 is None: | |
47 | raise Exception('Install "mmh3" to use this hashing function.') | |
48 | small_hash = mmh3.hash(key) | |
49 | else: | |
50 | big_hash = compactHash(key) | |
51 | small_hash = int(big_hash[:4], 16) | |
52 | return small_hash | |
53 | ||
24 | 54 | |
25 | 55 | class ConsistentHashRing: |
26 | 56 | def __init__(self, nodes, replica_count=100, hash_type='carbon_ch'): |
27 | 57 | self.ring = [] |
58 | self.ring_len = len(self.ring) | |
28 | 59 | self.nodes = set() |
60 | self.nodes_len = len(self.nodes) | |
29 | 61 | self.replica_count = replica_count |
30 | 62 | self.hash_type = hash_type |
31 | 63 | for node in nodes: |
32 | 64 | self.add_node(node) |
33 | 65 | |
34 | 66 | def compute_ring_position(self, key): |
35 | if self.hash_type == 'fnv1a_ch': | |
36 | big_hash = '{:x}'.format(int(fnv32a( str(key) ))) | |
37 | small_hash = int(big_hash[:4], 16) ^ int(big_hash[4:], 16) | |
38 | else: | |
39 | big_hash = md5(str(key)).hexdigest() | |
40 | small_hash = int(big_hash[:4], 16) | |
41 | return small_hash | |
67 | return carbonHash(key, self.hash_type) | |
42 | 68 | |
43 | def add_node(self, node): | |
44 | self.nodes.add(node) | |
69 | def add_node(self, key): | |
70 | self.nodes.add(key) | |
71 | self.nodes_len = len(self.nodes) | |
45 | 72 | for i in range(self.replica_count): |
46 | 73 | if self.hash_type == 'fnv1a_ch': |
47 | replica_key = "%d-%s" % (i, node[1]) | |
74 | replica_key = "%d-%s" % (i, key[1]) | |
48 | 75 | else: |
49 | replica_key = "%s:%d" % (node, i) | |
76 | replica_key = "%s:%d" % (key, i) | |
50 | 77 | position = self.compute_ring_position(replica_key) |
51 | 78 | while position in [r[0] for r in self.ring]: |
52 | 79 | position = position + 1 |
53 | entry = (position, node) | |
80 | entry = (position, key) | |
54 | 81 | bisect.insort(self.ring, entry) |
82 | self.ring_len = len(self.ring) | |
55 | 83 | |
56 | def remove_node(self, node): | |
57 | self.nodes.discard(node) | |
58 | self.ring = [entry for entry in self.ring if entry[1] != node] | |
84 | def remove_node(self, key): | |
85 | self.nodes.discard(key) | |
86 | self.nodes_len = len(self.nodes) | |
87 | self.ring = [entry for entry in self.ring if entry[1] != key] | |
88 | self.ring_len = len(self.ring) | |
59 | 89 | |
60 | 90 | def get_node(self, key): |
61 | 91 | assert self.ring |
62 | node = None | |
63 | node_iter = self.get_nodes(key) | |
64 | node = node_iter.next() | |
65 | node_iter.close() | |
66 | return node | |
92 | position = self.compute_ring_position(key) | |
93 | search_entry = (position, ()) | |
94 | index = bisect.bisect_left(self.ring, search_entry) % self.ring_len | |
95 | entry = self.ring[index] | |
96 | return entry[1] | |
67 | 97 | |
68 | 98 | def get_nodes(self, key): |
69 | assert self.ring | |
70 | if len(self.nodes) == 1: | |
71 | # short circuit in simple 1-node case | |
99 | nodes = set() | |
100 | if not self.ring: | |
101 | return | |
102 | if self.nodes_len == 1: | |
72 | 103 | for node in self.nodes: |
73 | 104 | yield node |
74 | return | |
75 | nodes = set() | |
76 | 105 | position = self.compute_ring_position(key) |
77 | search_entry = (position, None) | |
78 | index = bisect.bisect_left(self.ring, search_entry) % len(self.ring) | |
79 | last_index = (index - 1) % len(self.ring) | |
80 | while len(nodes) < len(self.nodes) and index != last_index: | |
106 | search_entry = (position, ()) | |
107 | index = bisect.bisect_left(self.ring, search_entry) % self.ring_len | |
108 | last_index = (index - 1) % self.ring_len | |
109 | nodes_len = len(nodes) | |
110 | while nodes_len < self.nodes_len and index != last_index: | |
81 | 111 | next_entry = self.ring[index] |
82 | 112 | (position, next_node) = next_entry |
83 | 113 | if next_node not in nodes: |
84 | 114 | nodes.add(next_node) |
115 | nodes_len += 1 | |
85 | 116 | yield next_node |
86 | 117 | |
87 | index = (index + 1) % len(self.ring) | |
118 | index = (index + 1) % self.ring_len |
0 | import urllib3 | |
1 | ||
2 | # async http client connection pool | |
3 | http = urllib3.PoolManager() | |
4 | ||
5 | ||
6 | def httpRequest(url, values=None, headers=None, method='POST', timeout=5): | |
7 | try: | |
8 | result = http.request( | |
9 | method, | |
10 | url, | |
11 | fields=values, | |
12 | headers=headers, | |
13 | timeout=timeout) | |
14 | except BaseException as err: | |
15 | raise Exception("Error requesting %s: %s" % (url, err)) | |
16 | ||
17 | if result.status != 200: | |
18 | raise Exception("Error response %d from %s" % (result.status, url)) | |
19 | ||
20 | return result.data |
20 | 20 | |
21 | 21 | # TODO(chrismd) refactor the graphite metrics hierarchy to be cleaner, |
22 | 22 | # more consistent, and make room for frontend metrics. |
23 | #metric_prefix = "Graphite.backend.%(program)s.%(instance)s." % settings | |
23 | # metric_prefix = "Graphite.backend.%(program)s.%(instance)s." % settings | |
24 | 24 | |
25 | 25 | |
26 | 26 | def increment(stat, increase=1): |
28 | 28 | stats[stat] += increase |
29 | 29 | except KeyError: |
30 | 30 | stats[stat] = increase |
31 | ||
31 | 32 | |
32 | 33 | def max(stat, newval): |
33 | 34 | try: |
36 | 37 | except KeyError: |
37 | 38 | stats[stat] = newval |
38 | 39 | |
40 | ||
39 | 41 | def append(stat, value): |
40 | 42 | try: |
41 | 43 | stats[stat].append(value) |
65 | 67 | |
66 | 68 | |
67 | 69 | def getMemUsage(): |
68 | rss_pages = int(open('/proc/self/statm').read().split()[1]) | |
70 | with open('/proc/self/statm') as statm: | |
71 | rss_pages = int(statm.read().split()[1]) | |
69 | 72 | return rss_pages * PAGESIZE |
70 | 73 | |
71 | 74 | |
77 | 80 | stats.clear() |
78 | 81 | |
79 | 82 | # cache metrics |
80 | if settings.program == 'carbon-cache': | |
83 | if 'cache' in settings.program: | |
81 | 84 | record = cache_record |
82 | 85 | updateTimes = myStats.get('updateTimes', []) |
83 | 86 | committedPoints = myStats.get('committedPoints', 0) |
118 | 121 | record('cache.overflow', cacheOverflow) |
119 | 122 | |
120 | 123 | # aggregator metrics |
121 | elif settings.program == 'carbon-aggregator': | |
124 | elif 'aggregator' in settings.program: | |
122 | 125 | record = aggregator_record |
123 | 126 | record('allocatedBuffers', len(BufferManager)) |
124 | 127 | record('bufferedDatapoints', |
132 | 135 | # shared relay stats for relays & aggregators |
133 | 136 | if settings.program in ['carbon-aggregator', 'carbon-relay']: |
134 | 137 | prefix = 'destinations.' |
135 | relay_stats = [(k,v) for (k,v) in myStats.items() if k.startswith(prefix)] | |
138 | relay_stats = [(k, v) for (k, v) in myStats.items() if k.startswith(prefix)] | |
136 | 139 | for stat_name, stat_value in relay_stats: |
137 | 140 | record(stat_name, stat_value) |
138 | 141 | # Preserve the count of sent metrics so that the ratio of |
139 | 142 | # received : sent can be checked per-relay to determine the |
140 | 143 | # health of the destination. |
141 | if stat_name.endswith('.sent'): | |
144 | if stat_name.endswith('.sent') or stat_name.endswith('.attemptedRelays'): | |
142 | 145 | myPriorStats[stat_name] = stat_value |
143 | 146 | |
144 | 147 | # common metrics |
205 | 208 | |
206 | 209 | |
207 | 210 | # Avoid import circularities |
208 | from carbon import state, events, cache | |
209 | from carbon.aggregator.buffers import BufferManager | |
211 | from carbon import state, events, cache # NOQA | |
212 | from carbon.aggregator.buffers import BufferManager # NOQA |
0 | 0 | import os |
1 | 1 | import time |
2 | from sys import stdout, stderr | |
3 | from zope.interface import implements | |
4 | from twisted.python.log import startLoggingWithObserver, textFromEventDict, msg, err, ILogObserver | |
2 | from sys import stdout | |
3 | from zope.interface import implementer | |
4 | from twisted.python.log import startLoggingWithObserver, textFromEventDict, msg, err, ILogObserver # NOQA | |
5 | 5 | from twisted.python.syslog import SyslogObserver |
6 | 6 | from twisted.python.logfile import DailyLogFile |
7 | 7 | |
18 | 18 | """ |
19 | 19 | Fix Umask Issue https://twistedmatrix.com/trac/ticket/7026 |
20 | 20 | """ |
21 | openMode = self.defaultMode or 0777 | |
21 | openMode = self.defaultMode or 0o777 | |
22 | 22 | self._file = os.fdopen(os.open( |
23 | self.path, os.O_CREAT|os.O_RDWR, openMode), 'r+', 1) | |
23 | self.path, os.O_CREAT | os.O_RDWR, openMode), 'rb+', 1) | |
24 | 24 | self.closed = False |
25 | 25 | # Try our best to update permissions for files which already exist. |
26 | 26 | if self.defaultMode: |
47 | 47 | else: |
48 | 48 | path_stat = os.stat(self.path) |
49 | 49 | fd_stat = os.fstat(self._file.fileno()) |
50 | if not (path_stat.st_ino == fd_stat.st_ino | |
51 | and path_stat.st_dev == fd_stat.st_dev): | |
50 | if not (path_stat.st_ino == fd_stat.st_ino and path_stat.st_dev == fd_stat.st_dev): | |
52 | 51 | self.reopen() |
53 | 52 | DailyLogFile.write(self, data) |
54 | 53 | |
58 | 57 | self._openFile() |
59 | 58 | |
60 | 59 | |
60 | @implementer(ILogObserver) | |
61 | 61 | class CarbonLogObserver(object): |
62 | implements(ILogObserver) | |
62 | ||
63 | def __init__(self): | |
64 | self._raven_client = None | |
65 | ||
66 | def raven_client(self): | |
67 | if self._raven_client is not None: | |
68 | return self._raven_client | |
69 | ||
70 | # Import here to avoid dependency hell. | |
71 | try: | |
72 | import raven | |
73 | except ImportError: | |
74 | return None | |
75 | from carbon.conf import settings | |
76 | ||
77 | if settings.RAVEN_DSN is None: | |
78 | return None | |
79 | self._raven_client = raven.Client(dsn=settings.RAVEN_DSN) | |
80 | return self._raven_client | |
81 | ||
82 | def log_to_raven(self, event): | |
83 | if not event.get('isError') or 'failure' not in event: | |
84 | return | |
85 | client = self.raven_client() | |
86 | if client is None: | |
87 | return | |
88 | f = event['failure'] | |
89 | client.captureException( | |
90 | (f.type, f.value, f.getTracebackObject()) | |
91 | ) | |
63 | 92 | |
64 | 93 | def log_to_dir(self, logdir): |
65 | 94 | self.logdir = logdir |
76 | 105 | self.observer = syslog_observer |
77 | 106 | |
78 | 107 | def __call__(self, event): |
108 | self.log_to_raven(event) | |
79 | 109 | return self.observer(event) |
80 | 110 | |
81 | def stdout_observer(self, event): | |
111 | @staticmethod | |
112 | def stdout_observer(event): | |
82 | 113 | stdout.write(formatEvent(event, includeType=True) + '\n') |
83 | 114 | stdout.flush() |
84 | 115 | |
95 | 126 | |
96 | 127 | # Default to stdout |
97 | 128 | observer = stdout_observer |
98 | ||
129 | ||
99 | 130 | |
100 | 131 | carbonLogObserver = CarbonLogObserver() |
101 | 132 | |
166 | 197 | if debugEnabled: |
167 | 198 | msg(message, **context) |
168 | 199 | |
200 | ||
169 | 201 | debugEnabled = False |
170 | 202 | |
171 | 203 |
0 | 0 | from carbon.util import PluginRegistrar |
1 | 1 | from carbon import state, log |
2 | from six import with_metaclass | |
2 | 3 | |
3 | 4 | |
4 | class Processor(object): | |
5 | __metaclass__ = PluginRegistrar | |
5 | class Processor(with_metaclass(PluginRegistrar, object)): | |
6 | 6 | plugins = {} |
7 | 7 | NO_OUTPUT = () |
8 | 8 |
0 | 0 | import time |
1 | import socket | |
2 | import sys | |
1 | 3 | |
2 | 4 | from twisted.internet.protocol import ServerFactory, DatagramProtocol |
3 | from twisted.application.internet import TCPServer, UDPServer | |
5 | # from twisted.application.internet import TCPServer, UDPServer | |
6 | from twisted.application import service | |
4 | 7 | from twisted.internet.error import ConnectionDone |
8 | from twisted.internet import reactor, tcp, udp | |
5 | 9 | from twisted.protocols.basic import LineOnlyReceiver, Int32StringReceiver |
6 | 10 | from twisted.protocols.policies import TimeoutMixin |
7 | 11 | from carbon import log, events, state, management |
9 | 13 | from carbon.regexlist import WhiteList, BlackList |
10 | 14 | from carbon.util import pickle, get_unpickler |
11 | 15 | from carbon.util import PluginRegistrar |
16 | from six import with_metaclass | |
17 | from carbon.util import enableTcpKeepAlive | |
18 | ||
19 | ||
20 | def checkIfAcceptingConnections(): | |
21 | clients = len(state.connectedMetricReceiverProtocols) | |
22 | max_clients = settings.MAX_RECEIVER_CONNECTIONS | |
23 | ||
24 | if clients < max_clients: | |
25 | for port in state.listeningPorts: | |
26 | if port.paused: | |
27 | log.listener( | |
28 | "Resuming %s (%d/%d connections)" % (port, clients, max_clients)) | |
29 | port.resumeProducing() | |
30 | port.paused = False | |
31 | else: | |
32 | for port in state.listeningPorts: | |
33 | if not port.paused: | |
34 | log.listener( | |
35 | "Pausing %s (%d/%d connections)" % (port, clients, max_clients)) | |
36 | port.pauseProducing() | |
37 | port.paused = True | |
12 | 38 | |
13 | 39 | |
14 | 40 | class CarbonReceiverFactory(ServerFactory): |
41 | ||
15 | 42 | def buildProtocol(self, addr): |
16 | from carbon.conf import settings | |
17 | ||
18 | # Don't establish the connection if we have reached the limit. | |
19 | if len(state.connectedMetricReceiverProtocols) < settings.MAX_RECEIVER_CONNECTIONS: | |
43 | clients = len(state.connectedMetricReceiverProtocols) | |
44 | max_clients = settings.MAX_RECEIVER_CONNECTIONS | |
45 | ||
46 | if clients < max_clients: | |
20 | 47 | return ServerFactory.buildProtocol(self, addr) |
21 | 48 | else: |
22 | 49 | return None |
23 | 50 | |
24 | 51 | |
25 | class CarbonServerProtocol(object): | |
26 | __metaclass__ = PluginRegistrar | |
52 | class CarbonService(service.Service): | |
53 | """Create our own socket to support SO_REUSEPORT. | |
54 | To be removed when twisted supports it natively | |
55 | See: https://github.com/twisted/twisted/pull/759. | |
56 | """ | |
57 | factory = None | |
58 | protocol = None | |
59 | ||
60 | def __init__(self, interface, port, protocol, factory): | |
61 | self.protocol = protocol | |
62 | self.factory = factory | |
63 | self.interface = interface | |
64 | self.port = port | |
65 | ||
66 | def startService(self): | |
67 | # use socket creation from twisted to use the same options as before | |
68 | if hasattr(self.protocol, 'datagramReceived'): | |
69 | tmp_port = udp.Port(None, None, interface=self.interface) | |
70 | else: | |
71 | tmp_port = tcp.Port(None, None, interface=self.interface) | |
72 | carbon_sock = tmp_port.createInternetSocket() | |
73 | if hasattr(socket, 'SO_REUSEPORT'): | |
74 | carbon_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) | |
75 | carbon_sock.bind((self.interface, self.port)) | |
76 | ||
77 | if hasattr(self.protocol, 'datagramReceived'): | |
78 | self._port = reactor.adoptDatagramPort( | |
79 | carbon_sock.fileno(), socket.AF_INET, self.protocol()) | |
80 | else: | |
81 | carbon_sock.listen(tmp_port.backlog) | |
82 | self._port = reactor.adoptStreamPort( | |
83 | carbon_sock.fileno(), socket.AF_INET, self.factory) | |
84 | state.listeningPorts.append(self._port) | |
85 | self._port.paused = False | |
86 | carbon_sock.close() | |
87 | ||
88 | def stopService(self): | |
89 | self._port.stopListening() | |
90 | ||
91 | ||
92 | class CarbonServerProtocol(with_metaclass(PluginRegistrar, object)): | |
27 | 93 | plugins = {} |
28 | 94 | |
29 | 95 | @classmethod |
37 | 103 | return |
38 | 104 | |
39 | 105 | if hasattr(protocol, 'datagramReceived'): |
40 | service = UDPServer(port, protocol(), interface=interface) | |
41 | service.setServiceParent(root_service) | |
106 | service = CarbonService(interface, port, protocol, None) | |
42 | 107 | else: |
43 | 108 | factory = CarbonReceiverFactory() |
44 | 109 | factory.protocol = protocol |
45 | service = TCPServer(port, factory, interface=interface) | |
46 | service.setServiceParent(root_service) | |
110 | service = CarbonService(interface, port, protocol, factory) | |
111 | service.setServiceParent(root_service) | |
47 | 112 | |
48 | 113 | |
49 | 114 | class MetricReceiver(CarbonServerProtocol, TimeoutMixin): |
53 | 118 | |
54 | 119 | def connectionMade(self): |
55 | 120 | self.setTimeout(settings.METRIC_CLIENT_IDLE_TIMEOUT) |
121 | enableTcpKeepAlive(self.transport, settings.TCP_KEEPALIVE, settings) | |
56 | 122 | self.peerName = self.getPeerName() |
123 | ||
57 | 124 | if settings.LOG_LISTENER_CONN_SUCCESS: |
58 | 125 | log.listener("%s connection with %s established" % ( |
59 | 126 | self.__class__.__name__, self.peerName)) |
62 | 129 | self.pauseReceiving() |
63 | 130 | |
64 | 131 | state.connectedMetricReceiverProtocols.add(self) |
132 | checkIfAcceptingConnections() | |
65 | 133 | if settings.USE_FLOW_CONTROL: |
66 | 134 | events.pauseReceivingMetrics.addHandler(self.pauseReceiving) |
67 | 135 | events.resumeReceivingMetrics.addHandler(self.resumeReceiving) |
82 | 150 | def connectionLost(self, reason): |
83 | 151 | if reason.check(ConnectionDone): |
84 | 152 | if settings.LOG_LISTENER_CONN_SUCCESS: |
85 | log.listener("%s connection with %s closed cleanly" % (self.__class__.__name__, self.peerName)) | |
86 | ||
87 | else: | |
88 | log.listener("%s connection with %s lost: %s" % (self.__class__.__name__, self.peerName, reason.value)) | |
153 | log.listener( | |
154 | "%s connection with %s closed cleanly" % (self.__class__.__name__, self.peerName)) | |
155 | ||
156 | else: | |
157 | log.listener( | |
158 | "%s connection with %s lost: %s" % (self.__class__.__name__, self.peerName, reason.value)) | |
89 | 159 | |
90 | 160 | state.connectedMetricReceiverProtocols.remove(self) |
161 | checkIfAcceptingConnections() | |
91 | 162 | if settings.USE_FLOW_CONTROL: |
92 | 163 | events.pauseReceivingMetrics.removeHandler(self.pauseReceiving) |
93 | 164 | events.resumeReceivingMetrics.removeHandler(self.resumeReceiving) |
101 | 172 | return |
102 | 173 | if datapoint[1] != datapoint[1]: # filter out NaN values |
103 | 174 | return |
104 | if int(datapoint[0]) == -1: # use current time if none given: https://github.com/graphite-project/carbon/issues/54 | |
175 | # use current time if none given: https://github.com/graphite-project/carbon/issues/54 | |
176 | if int(datapoint[0]) == -1: | |
105 | 177 | datapoint = (time.time(), datapoint[1]) |
106 | 178 | res = settings.MIN_TIMESTAMP_RESOLUTION |
107 | 179 | if res: |
112 | 184 | |
113 | 185 | class MetricLineReceiver(MetricReceiver, LineOnlyReceiver): |
114 | 186 | plugin_name = "line" |
115 | delimiter = '\n' | |
187 | delimiter = b'\n' | |
116 | 188 | |
117 | 189 | def lineReceived(self, line): |
190 | if sys.version_info >= (3, 0): | |
191 | line = line.decode('utf-8') | |
192 | ||
118 | 193 | try: |
119 | 194 | metric, value, timestamp = line.strip().split() |
120 | 195 | datapoint = (float(timestamp), float(value)) |
121 | 196 | except ValueError: |
122 | 197 | if len(line) > 400: |
123 | 198 | line = line[:400] + '...' |
124 | log.listener('invalid line received from client %s, ignoring [%s]' % (self.peerName, line.strip().encode('string_escape'))) | |
199 | log.listener('invalid line received from client %s, ignoring [%s]' % | |
200 | (self.peerName, repr(line.strip())[1:-1])) | |
125 | 201 | return |
126 | 202 | |
127 | 203 | self.metricReceived(metric, datapoint) |
137 | 213 | |
138 | 214 | super(MetricDatagramReceiver, cls).build(root_service) |
139 | 215 | |
140 | def datagramReceived(self, data, (host, port)): | |
216 | def datagramReceived(self, data, addr): | |
217 | (host, _) = addr | |
218 | if sys.version_info >= (3, 0): | |
219 | data = data.decode('utf-8') | |
220 | ||
141 | 221 | for line in data.splitlines(): |
142 | 222 | try: |
143 | 223 | metric, value, timestamp = line.strip().split() |
147 | 227 | except ValueError: |
148 | 228 | if len(line) > 400: |
149 | 229 | line = line[:400] + '...' |
150 | log.listener('invalid line received from %s, ignoring [%s]' % (host, line.strip().encode('string_escape'))) | |
230 | log.listener('invalid line received from %s, ignoring [%s]' % | |
231 | (host, repr(line.strip())[1:-1])) | |
151 | 232 | |
152 | 233 | |
153 | 234 | class MetricPickleReceiver(MetricReceiver, Int32StringReceiver): |
161 | 242 | def stringReceived(self, data): |
162 | 243 | try: |
163 | 244 | datapoints = self.unpickler.loads(data) |
164 | except pickle.UnpicklingError: | |
245 | # Pickle can throw a wide range of exceptions | |
246 | except (pickle.UnpicklingError, ValueError, IndexError, ImportError, KeyError): | |
165 | 247 | log.listener('invalid pickle received from %s, ignoring' % self.peerName) |
166 | 248 | return |
167 | 249 | |
168 | 250 | for raw in datapoints: |
169 | 251 | try: |
170 | 252 | (metric, (value, timestamp)) = raw |
171 | except Exception, e: | |
253 | except Exception as e: | |
172 | 254 | log.listener('Error decoding pickle: %s' % e) |
255 | continue | |
173 | 256 | |
174 | 257 | try: |
175 | 258 | datapoint = (float(value), float(timestamp)) # force proper types |
176 | except ValueError: | |
259 | except (ValueError, TypeError): | |
177 | 260 | continue |
178 | 261 | |
262 | # convert python2 unicode objects to str/bytes | |
263 | if not isinstance(metric, str): | |
264 | metric = metric.encode('utf-8') | |
265 | ||
179 | 266 | self.metricReceived(metric, datapoint) |
180 | 267 | |
181 | 268 | |
182 | 269 | class CacheManagementHandler(Int32StringReceiver): |
183 | MAX_LENGTH = 1024 ** 3 # 1mb | |
270 | MAX_LENGTH = 1024 ** 3 # 1mb | |
184 | 271 | |
185 | 272 | def connectionMade(self): |
186 | 273 | peer = self.transport.getPeer() |
199 | 286 | cache = MetricCache() |
200 | 287 | if request['type'] == 'cache-query': |
201 | 288 | metric = request['metric'] |
202 | datapoints = cache.get(metric, {}).items() | |
289 | datapoints = list(cache.get(metric, {}).items()) | |
203 | 290 | result = dict(datapoints=datapoints) |
204 | 291 | if settings.LOG_CACHE_HITS: |
205 | log.query('[%s] cache query for \"%s\" returned %d values' % (self.peerAddr, metric, len(datapoints))) | |
292 | log.query('[%s] cache query for \"%s\" returned %d values' % ( | |
293 | self.peerAddr, metric, len(datapoints) | |
294 | )) | |
206 | 295 | instrumentation.increment('cacheQueries') |
207 | 296 | |
208 | 297 | elif request['type'] == 'cache-query-bulk': |
209 | 298 | datapointsByMetric = {} |
210 | 299 | metrics = request['metrics'] |
211 | 300 | for metric in metrics: |
212 | datapointsByMetric[metric] = cache.get(metric, {}).items() | |
301 | datapointsByMetric[metric] = list(cache.get(metric, {}).items()) | |
213 | 302 | |
214 | 303 | result = dict(datapointsByMetric=datapointsByMetric) |
215 | 304 | |
216 | 305 | if settings.LOG_CACHE_HITS: |
217 | log.query('[%s] cache query bulk for \"%d\" metrics returned %d values' % | |
218 | (self.peerAddr, len(metrics), sum([len(datapoints) for datapoints in datapointsByMetric.values()]))) | |
306 | log.query('[%s] cache query bulk for \"%d\" metrics returned %d values' % ( | |
307 | self.peerAddr, | |
308 | len(metrics), | |
309 | sum([len(datapoints) for datapoints in datapointsByMetric.values()]) | |
310 | )) | |
219 | 311 | instrumentation.increment('cacheBulkQueries') |
220 | 312 | instrumentation.append('cacheBulkQuerySize', len(metrics)) |
221 | 313 | |
228 | 320 | else: |
229 | 321 | result = dict(error="Invalid request type \"%s\"" % request['type']) |
230 | 322 | |
231 | response = pickle.dumps(result, protocol=-1) | |
323 | response = pickle.dumps(result, protocol=2) | |
232 | 324 | self.sendString(response) |
233 | 325 | |
234 | 326 | |
235 | 327 | # Avoid import circularities |
236 | from carbon.cache import MetricCache | |
237 | from carbon import instrumentation | |
328 | from carbon.cache import MetricCache # NOQA | |
329 | from carbon import instrumentation # NOQA |
0 | import time | |
1 | 0 | import re |
2 | 1 | import os.path |
3 | 2 | from carbon import log |
56 | 55 | def __nonzero__(self): |
57 | 56 | return bool(self.regex_list) |
58 | 57 | |
58 | __bool__ = __nonzero__ # py2/3 compatibility | |
59 | ||
59 | 60 | |
60 | 61 | WhiteList = RegexList() |
61 | 62 | BlackList = RegexList() |
24 | 24 | for section in parser.sections(): |
25 | 25 | if not parser.has_option(section, 'destinations'): |
26 | 26 | raise CarbonConfigException("Rules file %s section %s does not define a " |
27 | "'destinations' list" % (path, section)) | |
27 | "'destinations' list" % (path, section)) | |
28 | 28 | |
29 | 29 | destination_strings = parser.get(section, 'destinations').split(',') |
30 | 30 | destinations = parseDestinations(destination_strings) |
32 | 32 | if parser.has_option(section, 'pattern'): |
33 | 33 | if parser.has_option(section, 'default'): |
34 | 34 | raise CarbonConfigException("Section %s contains both 'pattern' and " |
35 | "'default'. You must use one or the other." % section) | |
35 | "'default'. You must use one or the other." % section) | |
36 | 36 | pattern = parser.get(section, 'pattern') |
37 | 37 | regex = re.compile(pattern, re.I) |
38 | 38 | |
39 | 39 | continue_matching = False |
40 | 40 | if parser.has_option(section, 'continue'): |
41 | 41 | continue_matching = parser.getboolean(section, 'continue') |
42 | rule = RelayRule(condition=regex.search, destinations=destinations, continue_matching=continue_matching) | |
42 | rule = RelayRule( | |
43 | condition=regex.search, destinations=destinations, continue_matching=continue_matching) | |
43 | 44 | rules.append(rule) |
44 | 45 | continue |
45 | 46 | |
53 | 54 | |
54 | 55 | if not defaultRule: |
55 | 56 | raise CarbonConfigException("No default rule defined. You must specify exactly one " |
56 | "rule with 'default = true' instead of a pattern.") | |
57 | "rule with 'default = true' instead of a pattern.") | |
57 | 58 | |
58 | 59 | rules.append(defaultRule) |
59 | 60 | return rules |
0 | import random | |
1 | ||
2 | from zope.interface import implementer | |
3 | ||
4 | from twisted.internet._resolver import GAIResolver | |
5 | from twisted.internet.defer import Deferred | |
6 | from twisted.internet.address import IPv4Address | |
7 | from twisted.internet.interfaces import IResolverSimple, IResolutionReceiver | |
8 | from twisted.internet.error import DNSLookupError | |
9 | ||
10 | ||
11 | # Inspired from /twisted/internet/_resolver.py | |
12 | @implementer(IResolutionReceiver) | |
13 | class RandomWins(object): | |
14 | """ | |
15 | An L{IResolutionReceiver} which fires a L{Deferred} with a random result. | |
16 | """ | |
17 | ||
18 | def __init__(self, deferred): | |
19 | """ | |
20 | @param deferred: The L{Deferred} to fire with one resolution | |
21 | result arrives. | |
22 | """ | |
23 | self._deferred = deferred | |
24 | self._results = [] | |
25 | ||
26 | def resolutionBegan(self, resolution): | |
27 | """ | |
28 | See L{IResolutionReceiver.resolutionBegan} | |
29 | @param resolution: See L{IResolutionReceiver.resolutionBegan} | |
30 | """ | |
31 | self._resolution = resolution | |
32 | ||
33 | def addressResolved(self, address): | |
34 | """ | |
35 | See L{IResolutionReceiver.addressResolved} | |
36 | @param address: See L{IResolutionReceiver.addressResolved} | |
37 | """ | |
38 | self._results.append(address.host) | |
39 | ||
40 | def resolutionComplete(self): | |
41 | """ | |
42 | See L{IResolutionReceiver.resolutionComplete} | |
43 | """ | |
44 | if self._results: | |
45 | random.shuffle(self._results) | |
46 | self._deferred.callback(self._results[0]) | |
47 | else: | |
48 | self._deferred.errback(DNSLookupError(self._resolution.name)) | |
49 | ||
50 | ||
51 | @implementer(IResolverSimple) | |
52 | class ComplexResolverSimplifier(object): | |
53 | """ | |
54 | A converter from L{IHostnameResolver} to L{IResolverSimple} | |
55 | """ | |
56 | def __init__(self, nameResolver): | |
57 | """ | |
58 | Create a L{ComplexResolverSimplifier} with an L{IHostnameResolver}. | |
59 | @param nameResolver: The L{IHostnameResolver} to use. | |
60 | """ | |
61 | self._nameResolver = nameResolver | |
62 | ||
63 | def getHostByName(self, name, timeouts=()): | |
64 | """ | |
65 | See L{IResolverSimple.getHostByName} | |
66 | @param name: see L{IResolverSimple.getHostByName} | |
67 | @param timeouts: see L{IResolverSimple.getHostByName} | |
68 | @return: see L{IResolverSimple.getHostByName} | |
69 | """ | |
70 | result = Deferred() | |
71 | self._nameResolver.resolveHostName(RandomWins(result), name, 0, | |
72 | [IPv4Address]) | |
73 | return result | |
74 | ||
75 | ||
76 | def setUpRandomResolver(reactor): | |
77 | resolver = GAIResolver(reactor, reactor.getThreadPool) | |
78 | reactor.installResolver(ComplexResolverSimplifier(resolver)) |
21 | 21 | yield (metric, datapoint) |
22 | 22 | |
23 | 23 | |
24 | class RewriteRuleManager: | |
24 | class _RewriteRuleManager: | |
25 | 25 | def __init__(self): |
26 | 26 | self.rulesets = defaultdict(list) |
27 | 27 | self.rules_file = None |
93 | 93 | |
94 | 94 | |
95 | 95 | # Ghetto singleton |
96 | RewriteRuleManager = RewriteRuleManager() | |
96 | RewriteRuleManager = _RewriteRuleManager() |
0 | 0 | import imp |
1 | from carbon.hashing import ConsistentHashRing | |
1 | from carbon.hashing import ConsistentHashRing, carbonHash | |
2 | 2 | from carbon.util import PluginRegistrar |
3 | ||
4 | ||
5 | class DatapointRouter(object): | |
3 | from six import with_metaclass | |
4 | from six.moves import xrange | |
5 | ||
6 | ||
7 | class DatapointRouter(with_metaclass(PluginRegistrar, object)): | |
6 | 8 | "Abstract base class for datapoint routing logic implementations" |
7 | __metaclass__ = PluginRegistrar | |
8 | 9 | plugins = {} |
9 | 10 | |
10 | 11 | def addDestination(self, destination): |
14 | 15 | def removeDestination(self, destination): |
15 | 16 | "destination is a (host, port, instance) triple" |
16 | 17 | raise NotImplemented() |
18 | ||
19 | def hasDestination(self, destination): | |
20 | "destination is a (host, port, instance) triple" | |
21 | raise NotImplemented | |
22 | ||
23 | def countDestinations(self): | |
24 | "return number of configured destinations" | |
25 | raise NotImplemented | |
17 | 26 | |
18 | 27 | def getDestinations(self, key): |
19 | 28 | """Generate the destinations where the given routing key should map to. Only |
40 | 49 | |
41 | 50 | def removeDestination(self, destination): |
42 | 51 | self.destinations.discard(destination) |
52 | ||
53 | def hasDestination(self, destination): | |
54 | return destination in self.destinations | |
55 | ||
56 | def countDestinations(self): | |
57 | return len(self.destinations) | |
43 | 58 | |
44 | 59 | def getDestinations(self, key): |
45 | 60 | for rule in self.rules: |
61 | 76 | self.replication_factor = int(replication_factor) |
62 | 77 | self.diverse_replicas = diverse_replicas |
63 | 78 | self.instance_ports = {} # { (server, instance) : port } |
64 | self.ring = ConsistentHashRing([]) | |
79 | hash_type = settings.ROUTER_HASH_TYPE or 'carbon_ch' | |
80 | self.ring = ConsistentHashRing([], hash_type=hash_type) | |
65 | 81 | |
66 | 82 | def addDestination(self, destination): |
67 | 83 | (server, port, instance) = destination |
68 | if (server, instance) in self.instance_ports: | |
84 | if self.hasDestination(destination): | |
69 | 85 | raise Exception("destination instance (%s, %s) already configured" % (server, instance)) |
70 | 86 | self.instance_ports[(server, instance)] = port |
71 | 87 | self.ring.add_node((server, instance)) |
72 | 88 | |
73 | 89 | def removeDestination(self, destination): |
74 | 90 | (server, port, instance) = destination |
75 | if (server, instance) not in self.instance_ports: | |
91 | if not self.hasDestination(destination): | |
76 | 92 | raise Exception("destination instance (%s, %s) not configured" % (server, instance)) |
77 | 93 | del self.instance_ports[(server, instance)] |
78 | 94 | self.ring.remove_node((server, instance)) |
95 | ||
96 | def hasDestination(self, destination): | |
97 | (server, _, instance) = destination | |
98 | return (server, instance) in self.instance_ports | |
99 | ||
100 | def countDestinations(self): | |
101 | return len(self.instance_ports) | |
79 | 102 | |
80 | 103 | def getDestinations(self, metric): |
81 | 104 | key = self.getKey(metric) |
112 | 135 | keyfunc = getattr(module, func_name) |
113 | 136 | self.setKeyFunction(keyfunc) |
114 | 137 | |
138 | ||
115 | 139 | class AggregatedConsistentHashingRouter(DatapointRouter): |
116 | 140 | plugin_name = 'aggregated-consistent-hashing' |
117 | 141 | |
129 | 153 | |
130 | 154 | def removeDestination(self, destination): |
131 | 155 | self.hash_router.removeDestination(destination) |
156 | ||
157 | def hasDestination(self, destination): | |
158 | return self.hash_router.hasDestination(destination) | |
159 | ||
160 | def countDestinations(self): | |
161 | return self.hash_router.countDestinations() | |
132 | 162 | |
133 | 163 | def getDestinations(self, key): |
134 | 164 | # resolve metric to aggregate forms |
154 | 184 | for destination in destinations: |
155 | 185 | yield destination |
156 | 186 | |
157 | try: | |
158 | import mmh3 | |
159 | except ImportError: | |
160 | pass | |
161 | else: | |
162 | class FastHashRing(object): | |
163 | """A very fast hash 'ring'. | |
164 | ||
165 | Instead of trying to avoid rebalancing data when changing | |
166 | the list of nodes we try to making routing as fast as we | |
167 | can. It's good enough because the current rebalancing | |
168 | tools performances depend on the total number of metrics | |
169 | and not the number of metrics to rebalance. | |
170 | """ | |
171 | ||
172 | def __init__(self): | |
173 | self.nodes = set() | |
174 | self.sorted_nodes = [] | |
175 | ||
176 | def _hash(self, key): | |
177 | return mmh3.hash(key) | |
178 | ||
179 | def _update_nodes(self): | |
180 | self.sorted_nodes = sorted( | |
181 | [(self._hash(str(n)), n) for n in self.nodes], | |
182 | key=lambda v: v[0] | |
183 | ) | |
184 | ||
185 | def add_node(self, node): | |
186 | self.nodes.add(node) | |
187 | self._update_nodes() | |
188 | ||
189 | def remove_node(self, node): | |
190 | self.nodes.discard(node) | |
191 | self._update_nodes() | |
192 | ||
193 | def get_nodes(self, key): | |
194 | seed = self._hash(key) % len(self.nodes) | |
195 | ||
196 | for n in xrange(seed, seed + len(self.nodes)): | |
197 | yield self.sorted_nodes[n % len(self.sorted_nodes)][1] | |
198 | ||
199 | class FastHashingRouter(ConsistentHashingRouter): | |
200 | """Same as ConsistentHashingRouter but using FastHashRing.""" | |
201 | plugin_name = 'fast-hashing' | |
202 | ||
203 | def __init__(self, settings): | |
204 | super(FastHashingRouter, self).__init__(settings) | |
205 | self.ring = FastHashRing() | |
206 | ||
207 | class FastAggregatedHashingRouter(AggregatedConsistentHashingRouter): | |
208 | """Same as AggregatedConsistentHashingRouter but using FastHashRing.""" | |
209 | plugin_name = 'fast-aggregated-hashing' | |
210 | ||
211 | def __init__(self, settings): | |
212 | super(FastAggregatedHashingRouter, self).__init__(settings) | |
213 | self.hash_router.ring = FastHashRing() | |
187 | ||
188 | class FastHashRing(object): | |
189 | """A very fast hash 'ring'. | |
190 | ||
191 | Instead of trying to avoid rebalancing data when changing | |
192 | the list of nodes we try to making routing as fast as we | |
193 | can. It's good enough because the current rebalancing | |
194 | tools performances depend on the total number of metrics | |
195 | and not the number of metrics to rebalance. | |
196 | """ | |
197 | ||
198 | def __init__(self, settings): | |
199 | self.nodes = set() | |
200 | self.sorted_nodes = [] | |
201 | self.hash_type = settings.ROUTER_HASH_TYPE or 'mmh3_ch' | |
202 | ||
203 | def _hash(self, key): | |
204 | return carbonHash(key, self.hash_type) | |
205 | ||
206 | def _update_nodes(self): | |
207 | self.sorted_nodes = sorted( | |
208 | [(self._hash(str(n)), n) for n in self.nodes], | |
209 | key=lambda v: v[0] | |
210 | ) | |
211 | ||
212 | def add_node(self, node): | |
213 | self.nodes.add(node) | |
214 | self._update_nodes() | |
215 | ||
216 | def remove_node(self, node): | |
217 | self.nodes.discard(node) | |
218 | self._update_nodes() | |
219 | ||
220 | def get_nodes(self, key): | |
221 | if not self.nodes: | |
222 | return | |
223 | ||
224 | seed = self._hash(key) % len(self.nodes) | |
225 | ||
226 | for n in xrange(seed, seed + len(self.nodes)): | |
227 | yield self.sorted_nodes[n % len(self.sorted_nodes)][1] | |
228 | ||
229 | ||
230 | class FastHashingRouter(ConsistentHashingRouter): | |
231 | """Same as ConsistentHashingRouter but using FastHashRing.""" | |
232 | plugin_name = 'fast-hashing' | |
233 | ||
234 | def __init__(self, settings): | |
235 | super(FastHashingRouter, self).__init__(settings) | |
236 | self.ring = FastHashRing(settings) | |
237 | ||
238 | ||
239 | class FastAggregatedHashingRouter(AggregatedConsistentHashingRouter): | |
240 | """Same as AggregatedConsistentHashingRouter but using FastHashRing.""" | |
241 | plugin_name = 'fast-aggregated-hashing' | |
242 | ||
243 | def __init__(self, settings): | |
244 | super(FastAggregatedHashingRouter, self).__init__(settings) | |
245 | self.hash_router.ring = FastHashRing(settings) |
14 | 14 | from os.path import exists |
15 | 15 | |
16 | 16 | from twisted.application.service import MultiService |
17 | from twisted.application.internet import TCPServer, TCPClient | |
17 | from twisted.application.internet import TCPServer | |
18 | 18 | from twisted.internet.protocol import ServerFactory |
19 | 19 | from twisted.python.components import Componentized |
20 | 20 | from twisted.python.log import ILogObserver |
36 | 36 | except ImportError: |
37 | 37 | pass |
38 | 38 | try: |
39 | import carbon.protobuf | |
40 | except ImportError, e: | |
39 | import carbon.protobuf # NOQA | |
40 | except ImportError: | |
41 | 41 | pass |
42 | 42 | |
43 | 43 | |
71 | 71 | |
72 | 72 | def setupPipeline(pipeline, root_service, settings): |
73 | 73 | state.pipeline_processors = [] |
74 | ||
74 | 75 | for processor in pipeline: |
75 | 76 | args = [] |
76 | 77 | if ':' in processor: |
91 | 92 | plugin_class = Processor.plugins[processor] |
92 | 93 | state.pipeline_processors.append(plugin_class(*args)) |
93 | 94 | |
94 | if processor == 'relay': | |
95 | if processor in ['relay', 'write']: | |
95 | 96 | state.pipeline_processors_generated.append(plugin_class(*args)) |
96 | ||
97 | 97 | |
98 | 98 | events.metricReceived.addHandler(run_pipeline) |
99 | 99 | events.metricGenerated.addHandler(run_pipeline_generated) |
121 | 121 | |
122 | 122 | settings.RELAY_METHOD = 'consistent-hashing' |
123 | 123 | root_service = createBaseService(config, settings) |
124 | setupPipeline(['rewrite:pre', 'aggregate', 'rewrite:post', 'relay'], root_service, settings) | |
124 | setupPipeline( | |
125 | ['rewrite:pre', 'aggregate', 'rewrite:post', 'relay'], | |
126 | root_service, settings) | |
127 | setupReceivers(root_service, settings) | |
128 | ||
129 | return root_service | |
130 | ||
131 | ||
132 | def createAggregatorCacheService(config): | |
133 | from carbon.conf import settings | |
134 | ||
135 | settings.RELAY_METHOD = 'consistent-hashing' | |
136 | root_service = createBaseService(config, settings) | |
137 | setupPipeline( | |
138 | ['rewrite:pre', 'aggregate', 'rewrite:post', 'write'], | |
139 | root_service, settings) | |
125 | 140 | setupReceivers(root_service, settings) |
126 | 141 | |
127 | 142 | return root_service |
140 | 155 | def setupReceivers(root_service, settings): |
141 | 156 | from carbon.protocols import MetricReceiver |
142 | 157 | |
143 | for plugin_name, plugin_class in MetricReceiver.plugins.items(): | |
158 | for _, plugin_class in MetricReceiver.plugins.items(): | |
144 | 159 | plugin_class.build(root_service) |
145 | 160 | |
146 | 161 | |
147 | 162 | def setupAggregatorProcessor(root_service, settings): |
148 | from carbon.aggregator.processor import AggregationProcessor # Register the plugin class | |
163 | from carbon.aggregator.processor import AggregationProcessor # NOQA Register the plugin class | |
149 | 164 | from carbon.aggregator.rules import RuleManager |
150 | 165 | |
151 | 166 | aggregation_rules_path = settings["aggregation-rules"] |
152 | 167 | if not exists(aggregation_rules_path): |
153 | raise CarbonConfigException("aggregation processor: file does not exist {0}".format(aggregation_rules_path)) | |
168 | raise CarbonConfigException( | |
169 | "aggregation processor: file does not exist {0}".format(aggregation_rules_path)) | |
154 | 170 | RuleManager.read_from(aggregation_rules_path) |
155 | 171 | |
156 | 172 | |
176 | 192 | |
177 | 193 | |
178 | 194 | def setupWriterProcessor(root_service, settings): |
179 | from carbon import cache # Register CacheFeedingProcessor | |
195 | from carbon import cache # NOQA Register CacheFeedingProcessor | |
180 | 196 | from carbon.protocols import CacheManagementHandler |
181 | 197 | from carbon.writer import WriterService |
182 | from carbon import events | |
183 | 198 | |
184 | 199 | factory = ServerFactory() |
185 | 200 | factory.protocol = CacheManagementHandler |
9 | 9 | pipeline_processors = [] |
10 | 10 | pipeline_processors_generated = [] |
11 | 11 | database = None |
12 | listeningPorts = [] |
11 | 11 | See the License for the specific language governing permissions and |
12 | 12 | limitations under the License.""" |
13 | 13 | |
14 | import os | |
15 | 14 | import re |
16 | 15 | |
17 | from os.path import join, exists | |
16 | from os.path import join | |
18 | 17 | from carbon.conf import OrderedConfigParser, settings |
19 | 18 | from carbon.exceptions import CarbonConfigException |
20 | from carbon.util import pickle, parseRetentionDef | |
19 | from carbon.util import parseRetentionDef | |
21 | 20 | from carbon import log, state |
22 | 21 | |
23 | 22 | |
63 | 62 | self.points = int(points) |
64 | 63 | |
65 | 64 | def __str__(self): |
66 | return "Archive = (Seconds per point: %d, Datapoints to save: %d)" % (self.secondsPerPoint, self.points) | |
65 | return "Archive = (Seconds per point: %d, Datapoints to save: %d)" % ( | |
66 | self.secondsPerPoint, self.points) | |
67 | 67 | |
68 | 68 | def getTuple(self): |
69 | 69 | return (self.secondsPerPoint, self.points) |
85 | 85 | |
86 | 86 | try: |
87 | 87 | retentions = options['retentions'].split(',') |
88 | archives = [Archive.fromString(s) for s in retentions] | |
89 | 88 | except KeyError: |
90 | 89 | log.err("Schema %s missing 'retentions', skipping" % section) |
91 | 90 | continue |
91 | ||
92 | try: | |
93 | archives = [Archive.fromString(s) for s in retentions] | |
94 | except ValueError as exc: | |
95 | log.err("{msg} in section [{section}] in {fn}".format( | |
96 | msg=exc, section=section.title(), fn=STORAGE_SCHEMAS_CONFIG)) | |
97 | raise SystemExit(1) | |
92 | 98 | |
93 | 99 | if pattern: |
94 | 100 | mySchema = PatternSchema(section, pattern, archives) |
102 | 108 | if state.database is not None: |
103 | 109 | state.database.validateArchiveList(archiveList) |
104 | 110 | schemaList.append(mySchema) |
105 | except ValueError, e: | |
111 | except ValueError as e: | |
106 | 112 | log.msg("Invalid schemas found in %s: %s" % (section, e)) |
107 | 113 | |
108 | 114 | schemaList.append(defaultSchema) |
150 | 156 | schemaList.append(defaultAggregation) |
151 | 157 | return schemaList |
152 | 158 | |
153 | defaultArchive = Archive(60, 60 * 24 * 7) # default retention for unclassified data (7 days of minutely data) | |
159 | ||
160 | # default retention for unclassified data (7 days of minutely data) | |
161 | defaultArchive = Archive(60, 60 * 24 * 7) | |
154 | 162 | defaultSchema = DefaultSchema('default', [defaultArchive]) |
155 | 163 | defaultAggregation = DefaultSchema('default', (None, None)) |
0 | 0 | import sys |
1 | 1 | import os |
2 | 2 | import pwd |
3 | import __builtin__ | |
4 | ||
3 | import re | |
4 | ||
5 | try: | |
6 | import builtins as __builtin__ | |
7 | except ImportError: | |
8 | import __builtin__ | |
9 | ||
10 | from hashlib import sha256 | |
5 | 11 | from os.path import abspath, basename, dirname |
12 | import socket | |
13 | from time import sleep, time | |
14 | from twisted.python.util import initgroups | |
15 | from twisted.scripts.twistd import runApp | |
16 | from carbon.log import setDebugEnabled | |
6 | 17 | try: |
7 | from cStringIO import StringIO | |
18 | from OpenSSL import SSL | |
8 | 19 | except ImportError: |
9 | from StringIO import StringIO | |
20 | SSL = None | |
21 | ||
22 | ||
23 | # BytesIO is needed on py3 as StringIO does not operate on byte input anymore | |
24 | # We could use BytesIO on py2 as well but it is slower than StringIO | |
25 | if sys.version_info >= (3, 0): | |
26 | from io import BytesIO as StringIO | |
27 | else: | |
28 | try: | |
29 | from cStringIO import StringIO | |
30 | except ImportError: | |
31 | from StringIO import StringIO | |
32 | ||
10 | 33 | try: |
11 | 34 | import cPickle as pickle |
12 | 35 | USING_CPICKLE = True |
14 | 37 | import pickle |
15 | 38 | USING_CPICKLE = False |
16 | 39 | |
17 | from time import sleep, time | |
18 | from twisted.python.util import initgroups | |
19 | from twisted.scripts.twistd import runApp | |
20 | ||
21 | 40 | |
22 | 41 | def dropprivs(user): |
23 | 42 | uid, gid = pwd.getpwnam(user)[2:4] |
27 | 46 | return (uid, gid) |
28 | 47 | |
29 | 48 | |
49 | def enableTcpKeepAlive(transport, enable, settings): | |
50 | if not enable or not hasattr(transport, 'getHandle'): | |
51 | return | |
52 | ||
53 | fd = transport.getHandle() | |
54 | if SSL: | |
55 | if isinstance(fd, SSL.Connection): | |
56 | return | |
57 | if fd.type != socket.SOCK_STREAM: | |
58 | return | |
59 | ||
60 | transport.setTcpKeepAlive(1) | |
61 | for attr in ['TCP_KEEPIDLE', 'TCP_KEEPINTVL', 'TCP_KEEPCNT']: | |
62 | flag = getattr(socket, attr, None) | |
63 | value = getattr(settings, attr, None) | |
64 | if not flag or value is None: | |
65 | continue | |
66 | fd.setsockopt(socket.SOL_TCP, flag, value) | |
67 | ||
68 | ||
30 | 69 | def run_twistd_plugin(filename): |
31 | 70 | from carbon.conf import get_parser |
32 | 71 | from twisted.scripts.twistd import ServerOptions |
56 | 95 | # If no reactor was selected yet, try to use the epoll reactor if |
57 | 96 | # available. |
58 | 97 | try: |
59 | from twisted.internet import epollreactor | |
98 | from twisted.internet import epollreactor # noqa: F401 | |
60 | 99 | twistd_options.append("--reactor=epoll") |
61 | 100 | except ImportError: |
62 | 101 | pass |
71 | 110 | twistd_options.extend(["--pidfile", options.pidfile]) |
72 | 111 | if options.umask: |
73 | 112 | twistd_options.extend(["--umask", options.umask]) |
113 | if options.logger: | |
114 | twistd_options.extend(["--logger", options.logger]) | |
115 | if options.logger: | |
116 | twistd_options.extend(["--logfile", options.logfile]) | |
74 | 117 | if options.syslog: |
75 | 118 | twistd_options.append("--syslog") |
76 | 119 | |
79 | 122 | |
80 | 123 | if options.debug: |
81 | 124 | twistd_options.append("--debug") |
125 | setDebugEnabled(True) | |
82 | 126 | |
83 | 127 | for option_name, option_value in vars(options).items(): |
84 | if (option_value is not None and | |
85 | option_name not in ("debug", "profile", "profiler", "pidfile", "umask", "nodaemon", "syslog")): | |
86 | twistd_options.extend(["--%s" % option_name.replace("_", "-"), | |
87 | option_value]) | |
128 | if (option_value is not None and option_name not in ( | |
129 | "debug", "profile", "profiler", "pidfile", "umask", "nodaemon", "syslog", | |
130 | "logger", "logfile")): | |
131 | twistd_options.extend(["--%s" % option_name.replace("_", "-"), option_value]) | |
88 | 132 | |
89 | 133 | # Finally, append extra args so that twistd has a chance to process them. |
90 | 134 | twistd_options.extend(args) |
125 | 169 | # a dependency on whisper especiaily as carbon moves toward being a more |
126 | 170 | # generic storage service that can use various backends. |
127 | 171 | UnitMultipliers = { |
128 | 's' : 1, | |
129 | 'm' : 60, | |
130 | 'h' : 60 * 60, | |
131 | 'd' : 60 * 60 * 24, | |
132 | 'w' : 60 * 60 * 24 * 7, | |
133 | 'y' : 60 * 60 * 24 * 365, | |
172 | 's': 1, | |
173 | 'm': 60, | |
174 | 'h': 60 * 60, | |
175 | 'd': 60 * 60 * 24, | |
176 | 'w': 60 * 60 * 24 * 7, | |
177 | 'y': 60 * 60 * 24 * 365, | |
134 | 178 | } |
135 | 179 | |
136 | 180 | |
181 | 225 | |
182 | 226 | @classmethod |
183 | 227 | def find_class(cls, module, name): |
184 | if not module in cls.PICKLE_SAFE: | |
228 | if module not in cls.PICKLE_SAFE: | |
185 | 229 | raise pickle.UnpicklingError('Attempting to unpickle unsafe module %s' % module) |
186 | 230 | __import__(module) |
187 | 231 | mod = sys.modules[module] |
188 | if not name in cls.PICKLE_SAFE[module]: | |
232 | if name not in cls.PICKLE_SAFE[module]: | |
189 | 233 | raise pickle.UnpicklingError('Attempting to unpickle unsafe class %s' % name) |
190 | 234 | return getattr(mod, name) |
191 | 235 | |
203 | 247 | } |
204 | 248 | |
205 | 249 | def find_class(self, module, name): |
206 | if not module in self.PICKLE_SAFE: | |
250 | if module not in self.PICKLE_SAFE: | |
207 | 251 | raise pickle.UnpicklingError('Attempting to unpickle unsafe module %s' % module) |
208 | 252 | __import__(module) |
209 | 253 | mod = sys.modules[module] |
210 | if not name in self.PICKLE_SAFE[module]: | |
254 | if name not in self.PICKLE_SAFE[module]: | |
211 | 255 | raise pickle.UnpicklingError('Attempting to unpickle unsafe class %s' % name) |
212 | 256 | return getattr(mod, name) |
213 | 257 | |
214 | 258 | @classmethod |
215 | 259 | def loads(cls, pickle_string): |
216 | return cls(StringIO(pickle_string)).load() | |
260 | if sys.version_info >= (3, 0): | |
261 | return cls(StringIO(pickle_string), encoding='utf-8').load() | |
262 | else: | |
263 | return cls(StringIO(pickle_string)).load() | |
217 | 264 | |
218 | 265 | |
219 | 266 | def get_unpickler(insecure=False): |
242 | 289 | if cost <= self.tokens: |
243 | 290 | self._tokens -= cost |
244 | 291 | return True |
245 | else: | |
246 | if blocking: | |
247 | tokens_needed = cost - self._tokens | |
248 | seconds_per_token = 1 / self.fill_rate | |
249 | seconds_left = seconds_per_token * tokens_needed | |
250 | time_to_sleep = self.timestamp + seconds_left - time() | |
251 | if time_to_sleep > 0: | |
252 | sleep(time_to_sleep) | |
253 | self._tokens -= cost | |
254 | return True | |
292 | ||
293 | if not blocking: | |
255 | 294 | return False |
295 | ||
296 | tokens_needed = cost - self._tokens | |
297 | seconds_per_token = 1 / self.fill_rate | |
298 | seconds_left = seconds_per_token * tokens_needed | |
299 | time_to_sleep = self.timestamp + seconds_left - time() | |
300 | if time_to_sleep > 0: | |
301 | sleep(time_to_sleep) | |
302 | ||
303 | self._tokens -= cost | |
304 | return True | |
256 | 305 | |
257 | 306 | def setCapacityAndFillRate(self, new_capacity, new_fill_rate): |
258 | 307 | delta = float(new_capacity) - self.capacity |
283 | 332 | super(PluginRegistrar, classObj).__init__(name, bases, members) |
284 | 333 | if hasattr(classObj, 'plugin_name'): |
285 | 334 | classObj.plugins[classObj.plugin_name] = classObj |
335 | ||
336 | ||
337 | class TaggedSeries(object): | |
338 | @classmethod | |
339 | def parse(cls, path): | |
340 | # if path is in openmetrics format: metric{tag="value",...} | |
341 | if path[-2:] == '"}' and '{' in path: | |
342 | return cls.parse_openmetrics(path) | |
343 | ||
344 | # path is a carbon path with optional tags: metric;tag=value;... | |
345 | return cls.parse_carbon(path) | |
346 | ||
347 | @classmethod | |
348 | def parse_openmetrics(cls, path): | |
349 | """parse a path in openmetrics format: metric{tag="value",...} | |
350 | ||
351 | https://github.com/RichiH/OpenMetrics | |
352 | """ | |
353 | (metric, rawtags) = path[0:-1].split('{', 2) | |
354 | if not metric: | |
355 | raise Exception('Cannot parse path %s, no metric found' % path) | |
356 | ||
357 | tags = {} | |
358 | ||
359 | while len(rawtags) > 0: | |
360 | m = re.match(r'([^=]+)="((?:[\\]["\\]|[^"\\])+)"(:?,|$)', rawtags) | |
361 | if not m: | |
362 | raise Exception('Cannot parse path %s, invalid segment %s' % (path, rawtags)) | |
363 | ||
364 | tags[m.group(1)] = m.group(2).replace(r'\"', '"').replace(r'\\', '\\') | |
365 | rawtags = rawtags[len(m.group(0)):] | |
366 | ||
367 | tags['name'] = metric | |
368 | return cls(metric, tags) | |
369 | ||
370 | @classmethod | |
371 | def parse_carbon(cls, path): | |
372 | """parse a carbon path with optional tags: metric;tag=value;...""" | |
373 | segments = path.split(';') | |
374 | ||
375 | metric = segments[0] | |
376 | if not metric: | |
377 | raise Exception('Cannot parse path %s, no metric found' % path) | |
378 | ||
379 | tags = {} | |
380 | ||
381 | for segment in segments[1:]: | |
382 | tag = segment.split('=', 1) | |
383 | if len(tag) != 2 or not tag[0]: | |
384 | raise Exception('Cannot parse path %s, invalid segment %s' % (path, segment)) | |
385 | ||
386 | tags[tag[0]] = tag[1] | |
387 | ||
388 | tags['name'] = metric | |
389 | return cls(metric, tags) | |
390 | ||
391 | @staticmethod | |
392 | def format(tags): | |
393 | return tags.get('name', '') + ''.join(sorted([ | |
394 | ';%s=%s' % (tag, value) | |
395 | for tag, value in tags.items() | |
396 | if tag != 'name' | |
397 | ])) | |
398 | ||
399 | @staticmethod | |
400 | def encode(metric, sep='.', hash_only=False): | |
401 | """ | |
402 | Helper function to encode tagged series for storage in whisper etc | |
403 | ||
404 | When tagged series are detected, they are stored in a separate hierarchy of folders under a | |
405 | top-level _tagged folder, where subfolders are created by using the first 3 hex digits of the | |
406 | sha256 hash of the tagged metric path (4096 possible folders), and second-level subfolders are | |
407 | based on the following 3 hex digits (another 4096 possible folders) for a total of 4096^2 | |
408 | possible subfolders. The metric files themselves are created with any . in the metric path | |
409 | replaced with -, to avoid any issues where metrics, tags or values containing a '.' would end | |
410 | up creating further subfolders. This helper is used by both whisper and ceres, but by design | |
411 | each carbon database and graphite-web finder is responsible for handling its own encoding so | |
412 | that different backends can create their own schemes if desired. | |
413 | ||
414 | The hash_only parameter can be set to True to use the hash as the filename instead of a | |
415 | human-readable name. This avoids issues with filename length restrictions, at the expense of | |
416 | being unable to decode the filename and determine the original metric name. | |
417 | ||
418 | A concrete example: | |
419 | ||
420 | .. code-block:: none | |
421 | ||
422 | some.metric;tag1=value2;tag2=value.2 | |
423 | ||
424 | with sha256 hash starting effaae would be stored in: | |
425 | ||
426 | _tagged/eff/aae/some-metric;tag1=value2;tag2=value-2.wsp (whisper) | |
427 | _tagged/eff/aae/some-metric;tag1=value2;tag2=value-2 (ceres) | |
428 | ||
429 | """ | |
430 | if ';' in metric: | |
431 | metric_hash = sha256(metric.encode('utf8')).hexdigest() | |
432 | return sep.join([ | |
433 | '_tagged', | |
434 | metric_hash[0:3], | |
435 | metric_hash[3:6], | |
436 | metric_hash if hash_only else metric.replace('.', '_DOT_') | |
437 | ]) | |
438 | ||
439 | # metric isn't tagged, just replace dots with the separator and trim any leading separator | |
440 | return metric.replace('.', sep).lstrip(sep) | |
441 | ||
442 | @staticmethod | |
443 | def decode(path, sep='.'): | |
444 | """ | |
445 | Helper function to decode tagged series from storage in whisper etc | |
446 | """ | |
447 | if path.startswith('_tagged'): | |
448 | return path.split(sep, 3)[-1].replace('_DOT_', '.') | |
449 | ||
450 | # metric isn't tagged, just replace the separator with dots | |
451 | return path.replace(sep, '.') | |
452 | ||
453 | def __init__(self, metric, tags, series_id=None): | |
454 | self.metric = metric | |
455 | self.tags = tags | |
456 | self.id = series_id | |
457 | ||
458 | @property | |
459 | def path(self): | |
460 | return self.__class__.format(self.tags) |
12 | 12 | limitations under the License.""" |
13 | 13 | |
14 | 14 | import time |
15 | from six.moves import queue | |
15 | 16 | |
16 | 17 | from carbon import state |
17 | 18 | from carbon.cache import MetricCache |
18 | 19 | from carbon.storage import loadStorageSchemas, loadAggregationSchemas |
19 | 20 | from carbon.conf import settings |
20 | from carbon import log, events, instrumentation | |
21 | from carbon import log, instrumentation | |
21 | 22 | from carbon.util import TokenBucket |
22 | 23 | |
23 | 24 | from twisted.internet import reactor |
49 | 50 | UPDATE_BUCKET = TokenBucket(capacity, fill_rate) |
50 | 51 | |
51 | 52 | |
52 | def optimalWriteOrder(): | |
53 | """Generates metrics with the most cached values first and applies a soft | |
54 | rate limit on new metrics""" | |
53 | class TagQueue(object): | |
54 | def __init__(self, maxsize=0, update_interval=1): | |
55 | self.add_queue = queue.Queue(maxsize) | |
56 | self.update_queue = queue.Queue(maxsize) | |
57 | self.update_interval = update_interval | |
58 | self.update_counter = 0 | |
59 | ||
60 | def add(self, metric): | |
61 | try: | |
62 | self.add_queue.put_nowait(metric) | |
63 | except queue.Full: | |
64 | pass | |
65 | ||
66 | def update(self, metric): | |
67 | self.update_counter = self.update_counter % self.update_interval + 1 | |
68 | if self.update_counter == 1: | |
69 | try: | |
70 | self.update_queue.put_nowait(metric) | |
71 | except queue.Full: | |
72 | pass | |
73 | ||
74 | def getbatch(self, maxsize=1): | |
75 | batch = [] | |
76 | while len(batch) < maxsize: | |
77 | try: | |
78 | batch.append(self.add_queue.get_nowait()) | |
79 | except queue.Empty: | |
80 | break | |
81 | while len(batch) < maxsize: | |
82 | try: | |
83 | batch.append(self.update_queue.get_nowait()) | |
84 | except queue.Empty: | |
85 | break | |
86 | return batch | |
87 | ||
88 | ||
89 | tagQueue = TagQueue(maxsize=settings.TAG_QUEUE_SIZE, update_interval=settings.TAG_UPDATE_INTERVAL) | |
90 | ||
91 | ||
92 | def writeCachedDataPoints(): | |
93 | "Write datapoints until the MetricCache is completely empty" | |
94 | ||
55 | 95 | cache = MetricCache() |
56 | 96 | while cache: |
57 | 97 | (metric, datapoints) = cache.drain_metric() |
98 | if metric is None: | |
99 | # end the loop | |
100 | break | |
101 | ||
58 | 102 | dbFileExists = state.database.exists(metric) |
59 | 103 | |
60 | if not dbFileExists and CREATE_BUCKET: | |
61 | # If our tokenbucket has enough tokens available to create a new metric | |
62 | # file then yield the metric data to complete that operation. Otherwise | |
63 | # we'll just drop the metric on the ground and move on to the next | |
64 | # metric. | |
65 | # XXX This behavior should probably be configurable to no tdrop metrics | |
66 | # when rate limitng unless our cache is too big or some other legit | |
67 | # reason. | |
68 | if CREATE_BUCKET.drain(1): | |
69 | yield (metric, datapoints, dbFileExists) | |
70 | continue | |
71 | ||
72 | yield (metric, datapoints, dbFileExists) | |
73 | ||
74 | ||
75 | def writeCachedDataPoints(): | |
76 | "Write datapoints until the MetricCache is completely empty" | |
77 | ||
78 | cache = MetricCache() | |
79 | while cache: | |
80 | dataWritten = False | |
81 | ||
82 | for (metric, datapoints, dbFileExists) in optimalWriteOrder(): | |
83 | dataWritten = True | |
84 | ||
85 | if not dbFileExists: | |
86 | archiveConfig = None | |
87 | xFilesFactor, aggregationMethod = None, None | |
88 | ||
89 | for schema in SCHEMAS: | |
90 | if schema.matches(metric): | |
91 | if settings.LOG_CREATES: | |
92 | log.creates('new metric %s matched schema %s' % (metric, schema.name)) | |
93 | archiveConfig = [archive.getTuple() for archive in schema.archives] | |
94 | break | |
95 | ||
96 | for schema in AGGREGATION_SCHEMAS: | |
97 | if schema.matches(metric): | |
98 | if settings.LOG_CREATES: | |
99 | log.creates('new metric %s matched aggregation schema %s' | |
100 | % (metric, schema.name)) | |
101 | xFilesFactor, aggregationMethod = schema.archives | |
102 | break | |
103 | ||
104 | if not archiveConfig: | |
105 | raise Exception("No storage schema matched the metric '%s', check your storage-schemas.conf file." % metric) | |
106 | ||
107 | if settings.LOG_CREATES: | |
108 | log.creates("creating database metric %s (archive=%s xff=%s agg=%s)" % | |
109 | (metric, archiveConfig, xFilesFactor, aggregationMethod)) | |
110 | try: | |
111 | state.database.create(metric, archiveConfig, xFilesFactor, aggregationMethod) | |
112 | instrumentation.increment('creates') | |
113 | except Exception, e: | |
114 | log.err() | |
115 | log.msg("Error creating %s: %s" % (metric, e)) | |
116 | instrumentation.increment('errors') | |
117 | continue | |
118 | # If we've got a rate limit configured lets makes sure we enforce it | |
119 | if UPDATE_BUCKET: | |
120 | UPDATE_BUCKET.drain(1, blocking=True) | |
121 | try: | |
122 | t1 = time.time() | |
123 | # If we have duplicated points, always pick the last. update_many() | |
124 | # has no guaranted behavior for that, and in fact the current implementation | |
125 | # will keep the first point in the list. | |
126 | datapoints = dict(datapoints).items() | |
127 | state.database.write(metric, datapoints) | |
128 | updateTime = time.time() - t1 | |
129 | except Exception, e: | |
104 | if not dbFileExists: | |
105 | if CREATE_BUCKET and not CREATE_BUCKET.drain(1): | |
106 | # If our tokenbucket doesn't have enough tokens available to create a new metric | |
107 | # file then we'll just drop the metric on the ground and move on to the next | |
108 | # metric. | |
109 | # XXX This behavior should probably be configurable to no tdrop metrics | |
110 | # when rate limitng unless our cache is too big or some other legit | |
111 | # reason. | |
112 | instrumentation.increment('droppedCreates') | |
113 | continue | |
114 | ||
115 | archiveConfig = None | |
116 | xFilesFactor, aggregationMethod = None, None | |
117 | ||
118 | for schema in SCHEMAS: | |
119 | if schema.matches(metric): | |
120 | if settings.LOG_CREATES: | |
121 | log.creates('new metric %s matched schema %s' % (metric, schema.name)) | |
122 | archiveConfig = [archive.getTuple() for archive in schema.archives] | |
123 | break | |
124 | ||
125 | for schema in AGGREGATION_SCHEMAS: | |
126 | if schema.matches(metric): | |
127 | if settings.LOG_CREATES: | |
128 | log.creates('new metric %s matched aggregation schema %s' | |
129 | % (metric, schema.name)) | |
130 | xFilesFactor, aggregationMethod = schema.archives | |
131 | break | |
132 | ||
133 | if not archiveConfig: | |
134 | raise Exception(("No storage schema matched the metric '%s'," | |
135 | " check your storage-schemas.conf file.") % metric) | |
136 | ||
137 | if settings.LOG_CREATES: | |
138 | log.creates("creating database metric %s (archive=%s xff=%s agg=%s)" % | |
139 | (metric, archiveConfig, xFilesFactor, aggregationMethod)) | |
140 | try: | |
141 | state.database.create(metric, archiveConfig, xFilesFactor, aggregationMethod) | |
142 | if settings.ENABLE_TAGS: | |
143 | tagQueue.add(metric) | |
144 | instrumentation.increment('creates') | |
145 | except Exception as e: | |
130 | 146 | log.err() |
131 | log.msg("Error writing to %s: %s" % (metric, e)) | |
147 | log.msg("Error creating %s: %s" % (metric, e)) | |
132 | 148 | instrumentation.increment('errors') |
133 | else: | |
134 | pointCount = len(datapoints) | |
135 | instrumentation.increment('committedPoints', pointCount) | |
136 | instrumentation.append('updateTimes', updateTime) | |
137 | if settings.LOG_UPDATES: | |
138 | log.updates("wrote %d datapoints for %s in %.5f seconds" % (pointCount, metric, updateTime)) | |
139 | ||
140 | # Avoid churning CPU when only new metrics are in the cache | |
141 | if not dataWritten: | |
142 | time.sleep(0.1) | |
149 | continue | |
150 | ||
151 | # If we've got a rate limit configured lets makes sure we enforce it | |
152 | waitTime = 0 | |
153 | if UPDATE_BUCKET: | |
154 | t1 = time.time() | |
155 | UPDATE_BUCKET.drain(1, blocking=True) | |
156 | waitTime = time.time() - t1 | |
157 | ||
158 | try: | |
159 | t1 = time.time() | |
160 | # If we have duplicated points, always pick the last. update_many() | |
161 | # has no guaranted behavior for that, and in fact the current implementation | |
162 | # will keep the first point in the list. | |
163 | datapoints = dict(datapoints).items() | |
164 | state.database.write(metric, datapoints) | |
165 | if settings.ENABLE_TAGS: | |
166 | tagQueue.update(metric) | |
167 | updateTime = time.time() - t1 | |
168 | except Exception as e: | |
169 | log.err() | |
170 | log.msg("Error writing to %s: %s" % (metric, e)) | |
171 | instrumentation.increment('errors') | |
172 | else: | |
173 | pointCount = len(datapoints) | |
174 | instrumentation.increment('committedPoints', pointCount) | |
175 | instrumentation.append('updateTimes', updateTime) | |
176 | if settings.LOG_UPDATES: | |
177 | if waitTime > 0.001: | |
178 | log.updates("wrote %d datapoints for %s in %.5f seconds after waiting %.5f seconds" % ( | |
179 | pointCount, metric, updateTime, waitTime)) | |
180 | else: | |
181 | log.updates("wrote %d datapoints for %s in %.5f seconds" % ( | |
182 | pointCount, metric, updateTime)) | |
143 | 183 | |
144 | 184 | |
145 | 185 | def writeForever(): |
148 | 188 | writeCachedDataPoints() |
149 | 189 | except Exception: |
150 | 190 | log.err() |
151 | time.sleep(0.1) # The writer thread only sleeps when the cache is empty or an error occurs | |
191 | # Back-off on error to give the backend time to recover. | |
192 | time.sleep(0.1) | |
193 | else: | |
194 | # Avoid churning CPU when there are no metrics are in the cache | |
195 | time.sleep(1) | |
196 | ||
197 | ||
198 | def writeTags(): | |
199 | while True: | |
200 | tags = tagQueue.getbatch(settings.TAG_BATCH_SIZE) | |
201 | if not tags: | |
202 | break | |
203 | state.database.tag(*tags) | |
204 | ||
205 | ||
206 | def writeTagsForever(): | |
207 | while reactor.running: | |
208 | try: | |
209 | writeTags() | |
210 | except Exception: | |
211 | log.err() | |
212 | # Back-off on error to give the backend time to recover. | |
213 | time.sleep(0.1) | |
214 | else: | |
215 | # Avoid churning CPU when there are no series in the queue | |
216 | time.sleep(0.2) | |
152 | 217 | |
153 | 218 | |
154 | 219 | def reloadStorageSchemas(): |
155 | 220 | global SCHEMAS |
156 | 221 | try: |
157 | 222 | SCHEMAS = loadStorageSchemas() |
158 | except Exception, e: | |
223 | except Exception as e: | |
159 | 224 | log.msg("Failed to reload storage SCHEMAS: %s" % (e)) |
160 | 225 | |
161 | 226 | |
163 | 228 | global AGGREGATION_SCHEMAS |
164 | 229 | try: |
165 | 230 | AGGREGATION_SCHEMAS = loadAggregationSchemas() |
166 | except Exception, e: | |
231 | except Exception as e: | |
167 | 232 | log.msg("Failed to reload aggregation SCHEMAS: %s" % (e)) |
168 | 233 | |
169 | 234 | |
171 | 236 | try: |
172 | 237 | shut = settings.MAX_UPDATES_PER_SECOND_ON_SHUTDOWN |
173 | 238 | if UPDATE_BUCKET: |
174 | UPDATE_BUCKET.setCapacityAndFillRate(shut,shut) | |
239 | UPDATE_BUCKET.setCapacityAndFillRate(shut, shut) | |
175 | 240 | if CREATE_BUCKET: |
176 | CREATE_BUCKET.setCapacityAndFillRate(shut,shut) | |
177 | log.msg("Carbon shutting down. Changed the update rate to: " + str(settings.MAX_UPDATES_PER_SECOND_ON_SHUTDOWN)) | |
241 | CREATE_BUCKET.setCapacityAndFillRate(shut, shut) | |
242 | log.msg("Carbon shutting down. Changed the update rate to: " + | |
243 | str(settings.MAX_UPDATES_PER_SECOND_ON_SHUTDOWN)) | |
178 | 244 | except KeyError: |
179 | 245 | log.msg("Carbon shutting down. Update rate not changed") |
246 | ||
247 | # Also set MIN_TIMESTAMP_LAG to 0 to avoid waiting for nothing. | |
248 | settings.MIN_TIMESTAMP_LAG = 0 | |
180 | 249 | |
181 | 250 | |
182 | 251 | class WriterService(Service): |
193 | 262 | self.aggregation_reload_task.start(60, False) |
194 | 263 | reactor.addSystemEventTrigger('before', 'shutdown', shutdownModifyUpdateSpeed) |
195 | 264 | reactor.callInThread(writeForever) |
265 | if settings.ENABLE_TAGS: | |
266 | reactor.callInThread(writeTagsForever) | |
196 | 267 | Service.startService(self) |
197 | 268 | |
198 | 269 | def stopService(self): |
0 | from zope.interface import implementer | |
1 | ||
2 | from twisted.plugin import IPlugin | |
3 | from twisted.application.service import IServiceMaker | |
4 | ||
5 | from carbon import conf | |
6 | ||
7 | ||
8 | @implementer(IServiceMaker, IPlugin) | |
9 | class CarbonAggregatorCacheServiceMaker(object): | |
10 | ||
11 | tapname = "carbon-aggregator-cache" | |
12 | description = "Aggregate and write stats for graphite." | |
13 | options = conf.CarbonAggregatorOptions | |
14 | ||
15 | def makeService(self, options): | |
16 | """ | |
17 | Construct a C{carbon-aggregator-cache} service. | |
18 | """ | |
19 | from carbon import service | |
20 | return service.createAggregatorCacheService(options) | |
21 | ||
22 | ||
23 | # Now construct an object which *provides* the relevant interfaces | |
24 | serviceMaker = CarbonAggregatorCacheServiceMaker() |
0 | from zope.interface import implements | |
0 | from zope.interface import implementer | |
1 | 1 | |
2 | 2 | from twisted.plugin import IPlugin |
3 | 3 | from twisted.application.service import IServiceMaker |
5 | 5 | from carbon import conf |
6 | 6 | |
7 | 7 | |
8 | @implementer(IServiceMaker, IPlugin) | |
8 | 9 | class CarbonAggregatorServiceMaker(object): |
9 | 10 | |
10 | implements(IServiceMaker, IPlugin) | |
11 | 11 | tapname = "carbon-aggregator" |
12 | 12 | description = "Aggregate stats for graphite." |
13 | 13 | options = conf.CarbonAggregatorOptions |
0 | from zope.interface import implements | |
0 | from zope.interface import implementer | |
1 | 1 | |
2 | 2 | from twisted.plugin import IPlugin |
3 | 3 | from twisted.application.service import IServiceMaker |
5 | 5 | from carbon import conf |
6 | 6 | |
7 | 7 | |
8 | @implementer(IServiceMaker, IPlugin) | |
8 | 9 | class CarbonCacheServiceMaker(object): |
9 | 10 | |
10 | implements(IServiceMaker, IPlugin) | |
11 | 11 | tapname = "carbon-cache" |
12 | 12 | description = "Collect stats for graphite." |
13 | 13 | options = conf.CarbonCacheOptions |
0 | from zope.interface import implements | |
0 | from zope.interface import implementer | |
1 | 1 | |
2 | 2 | from twisted.plugin import IPlugin |
3 | 3 | from twisted.application.service import IServiceMaker |
5 | 5 | from carbon import conf |
6 | 6 | |
7 | 7 | |
8 | @implementer(IServiceMaker, IPlugin) | |
8 | 9 | class CarbonRelayServiceMaker(object): |
9 | 10 | |
10 | implements(IServiceMaker, IPlugin) | |
11 | 11 | tapname = "carbon-relay" |
12 | 12 | description = "Relay stats for graphite." |
13 | 13 | options = conf.CarbonRelayOptions |
2 | 2 | from __future__ import with_statement |
3 | 3 | |
4 | 4 | import os |
5 | import ConfigParser | |
5 | from glob import glob | |
6 | try: | |
7 | from ConfigParser import ConfigParser, DuplicateSectionError # Python 2 | |
8 | except ImportError: | |
9 | from configparser import ConfigParser, DuplicateSectionError # Python 3 | |
6 | 10 | |
7 | import platform | |
8 | from glob import glob | |
9 | ||
10 | try: | |
11 | from io import BytesIO | |
12 | except ImportError: | |
13 | from StringIO import StringIO as BytesIO | |
14 | 11 | |
15 | 12 | # Graphite historically has an install prefix set in setup.cfg. Being in a |
16 | 13 | # configuration file, it's not easy to override it or unset it (for installing |
21 | 18 | # required for installations from a source tarball because running |
22 | 19 | # ``python setup.py sdist`` will re-add the prefix to the tarball's |
23 | 20 | # ``setup.cfg``. |
21 | cf = ConfigParser() | |
22 | ||
24 | 23 | with open('setup.cfg', 'r') as f: |
25 | 24 | orig_setup_cfg = f.read() |
26 | cf = ConfigParser.ConfigParser() | |
27 | cf.readfp(BytesIO(orig_setup_cfg), 'setup.cfg') | |
25 | f.seek(0) | |
26 | cf.readfp(f, 'setup.cfg') | |
28 | 27 | |
29 | 28 | if os.environ.get('GRAPHITE_NO_PREFIX'): |
30 | 29 | cf.remove_section('install') |
31 | 30 | else: |
31 | print('#' * 80) | |
32 | print('') | |
33 | print('Carbon\'s default installation prefix is "/opt/graphite".') | |
34 | print('') | |
35 | print('To install Carbon in the Python\'s default location run:') | |
36 | print('$ GRAPHITE_NO_PREFIX=True python setup.py install') | |
37 | print('') | |
38 | print('#' * 80) | |
32 | 39 | try: |
33 | 40 | cf.add_section('install') |
34 | except ConfigParser.DuplicateSectionError: | |
41 | except DuplicateSectionError: | |
35 | 42 | pass |
36 | 43 | if not cf.has_option('install', 'prefix'): |
37 | 44 | cf.set('install', 'prefix', '/opt/graphite') |
38 | 45 | if not cf.has_option('install', 'install-lib'): |
39 | 46 | cf.set('install', 'install-lib', '%(prefix)s/lib') |
40 | 47 | |
41 | with open('setup.cfg', 'wb') as f: | |
48 | with open('setup.cfg', 'w') as f: | |
42 | 49 | cf.write(f) |
50 | ||
43 | 51 | |
44 | 52 | if os.environ.get('USE_SETUPTOOLS'): |
45 | 53 | from setuptools import setup |
46 | 54 | setup_kwargs = dict(zip_safe=0) |
47 | ||
48 | 55 | else: |
49 | 56 | from distutils.core import setup |
50 | 57 | setup_kwargs = dict() |
51 | 58 | |
52 | 59 | |
53 | storage_dirs = [ ('storage/ceres', []), ('storage/whisper',[]), | |
54 | ('storage/lists',[]), ('storage/log',[]), | |
55 | ('storage/rrd',[]) ] | |
60 | storage_dirs = [ ('storage/ceres/dummy.txt', []), ('storage/whisper/dummy.txt',[]), | |
61 | ('storage/lists',[]), ('storage/log/dummy.txt',[]), | |
62 | ('storage/rrd/dummy.txt',[]) ] | |
56 | 63 | conf_files = [ ('conf', glob('conf/*.example')) ] |
57 | 64 | |
58 | 65 | install_files = storage_dirs + conf_files |
64 | 71 | 'distro/redhat/init.d/carbon-aggregator']) ] |
65 | 72 | install_files += init_scripts |
66 | 73 | |
74 | ||
67 | 75 | try: |
68 | 76 | setup( |
69 | name='carbon', | |
70 | version='1.0.2', | |
71 | url='http://graphiteapp.org/', | |
72 | author='Chris Davis', | |
73 | author_email='chrismd@gmail.com', | |
74 | license='Apache Software License 2.0', | |
75 | description='Backend data caching and persistence daemon for Graphite', | |
76 | long_description='Backend data caching and persistence daemon for Graphite', | |
77 | packages=['carbon', 'carbon.aggregator', 'twisted.plugins'], | |
78 | package_dir={'' : 'lib'}, | |
79 | scripts=glob('bin/*'), | |
80 | package_data={ 'carbon' : ['*.xml'] }, | |
81 | data_files=install_files, | |
82 | install_requires=['Twisted', 'txAMQP'], | |
83 | classifiers=( | |
84 | 'Intended Audience :: Developers', | |
85 | 'Natural Language :: English', | |
86 | 'License :: OSI Approved :: Apache Software License', | |
87 | 'Programming Language :: Python', | |
88 | 'Programming Language :: Python :: 2', | |
89 | 'Programming Language :: Python :: 2.7', | |
90 | 'Programming Language :: Python :: 2 :: Only', | |
91 | ), | |
92 | ||
93 | **setup_kwargs | |
77 | name='carbon', | |
78 | version='1.1.4', | |
79 | url='http://graphiteapp.org/', | |
80 | author='Chris Davis', | |
81 | author_email='chrismd@gmail.com', | |
82 | license='Apache Software License 2.0', | |
83 | description='Backend data caching and persistence daemon for Graphite', | |
84 | long_description='Backend data caching and persistence daemon for Graphite', | |
85 | packages=['carbon', 'carbon.aggregator', 'twisted.plugins'], | |
86 | package_dir={'' : 'lib'}, | |
87 | scripts=glob('bin/*'), | |
88 | package_data={ 'carbon' : ['*.xml'] }, | |
89 | data_files=install_files, | |
90 | install_requires=['Twisted', 'txAMQP', 'cachetools', 'urllib3'], | |
91 | classifiers=( | |
92 | 'Intended Audience :: Developers', | |
93 | 'Natural Language :: English', | |
94 | 'License :: OSI Approved :: Apache Software License', | |
95 | 'Programming Language :: Python', | |
96 | 'Programming Language :: Python :: 2', | |
97 | 'Programming Language :: Python :: 2.7', | |
98 | 'Programming Language :: Python :: 3', | |
99 | 'Programming Language :: Python :: 3.4', | |
100 | 'Programming Language :: Python :: 3.5', | |
101 | 'Programming Language :: Python :: 3.6', | |
102 | 'Programming Language :: Python :: 3.7', | |
103 | 'Programming Language :: Python :: Implementation :: CPython', | |
104 | 'Programming Language :: Python :: Implementation :: PyPy', | |
105 | ), | |
106 | **setup_kwargs | |
94 | 107 | ) |
95 | 108 | finally: |
96 | 109 | with open('setup.cfg', 'w') as f: |