Codebase list graphite-carbon / 3182976
Imported Upstream version 0.9.13 Jonas Genannt 9 years ago
24 changed file(s) with 175 addition(s) and 378 deletion(s). Raw diff Collapse all Expand all
00 Metadata-Version: 1.0
11 Name: carbon
2 Version: 0.9.12
2 Version: 0.9.13
33 Summary: Backend data caching and persistence daemon for Graphite
44 Home-page: http://graphite-project.github.com
55 Author: Chris Davis
108108 datapoint = (float(timestamp), float(value))
109109 assert datapoint[1] == datapoint[1] # filter out NaNs
110110 client_manager.sendDatapoint(metric, datapoint)
111 except:
111 except ValueError:
112112 log.err(None, 'Dropping invalid line: %s' % line)
113113
114114 def connectionLost(self, reason):
3131 #
3232 # <env>.applications.<app>.all.<app_metric> (60) = sum <env>.applications.<app>.*.<<app_metric>>
3333 #
34 # It is also possible to use regular expressions. Following the example above
35 # when using:
36 #
37 # <env>.applications.<app>.<domain>.requests (60) = sum <env>.applications.<app>.<domain>\d{2}.requests
38 #
39 # You will end up with 'prod.applications.apache.www.requests' instead of
40 # 'prod.applications.apache.all.requests'.
41 #
3442 # Note that any time this file is modified, it will be re-read automatically.
2929 #
3030 #LOCAL_DATA_DIR = /opt/graphite/storage/whisper/
3131
32 # Enable daily log rotation. If disabled, a kill -HUP can be used after a manual rotate
32 # Enable daily log rotation. If disabled, carbon will automatically re-open
33 # the file if it's rotated out of place (e.g. by logrotate daemon)
3334 ENABLE_LOGROTATION = True
3435
3536 # Specify the user to drop privileges to
5556 # If defined, this changes the MAX_UPDATES_PER_SECOND in Carbon when a
5657 # stop/shutdown is initiated. This helps when MAX_UPDATES_PER_SECOND is
5758 # relatively low and carbon has cached a lot of updates; it enables the carbon
58 # daemon to shutdown more quickly.
59 # daemon to shutdown more quickly.
5960 # MAX_UPDATES_PER_SECOND_ON_SHUTDOWN = 1000
6061
6162 # Softly limits the number of whisper files that get created each minute.
265266 MAX_DATAPOINTS_PER_MESSAGE = 500
266267 MAX_QUEUE_SIZE = 10000
267268
269 # This is the percentage that the queue must be empty before it will accept
270 # more messages. For a larger site, if the queue is very large it makes sense
271 # to tune this to allow for incoming stats. So if you have an average
272 # flow of 100k stats/minute, and a MAX_QUEUE_SIZE of 3,000,000, it makes sense
273 # to allow stats to start flowing when you've cleared the queue to 95% since
274 # you should have space to accommodate the next minute's worth of stats
275 # even before the relay incrementally clears more of the queue
276 QUEUE_LOW_WATERMARK_PCT = 0.8
277
268278 # Set this to False to drop datapoints when any send queue (sending datapoints
269279 # to a downstream carbon daemon) hits MAX_QUEUE_SIZE. If this is True (the
270280 # default) then sockets over which metrics are received will temporarily stop accepting
271 # data until the send queues fall below 80% MAX_QUEUE_SIZE.
281 # data until the send queues fall below QUEUE_LOW_WATERMARK_PCT * MAX_QUEUE_SIZE.
272282 USE_FLOW_CONTROL = True
273283
274284 # Set this to True to enable whitelisting and blacklisting of metrics in
295305
296306 # If set true, metric received will be forwarded to DESTINATIONS in addition to
297307 # the output of the aggregation rules. If set false the carbon-aggregator will
298 # only ever send the output of aggregation.
299 FORWARD_ALL = True
308 # only ever send the output of aggregation. Default value is set to false and will not forward
309 FORWARD_ALL = False
310
311 # Filenames of the configuration files to use for this instance of aggregator.
312 # Filenames are relative to CONF_DIR.
313 #
314 # AGGREGATION_RULES = aggregation-rules.conf
315 # REWRITE_RULES = rewrite-rules.conf
300316
301317 # This is a list of carbon daemons we will send any relayed or
302318 # generated metrics to. The default provided would send to a single
304320 # use multiple carbon-cache instances then it would look like this:
305321 #
306322 # DESTINATIONS = 127.0.0.1:2004:a, 127.0.0.1:2104:b
307 #
323 #
308324 # The format is comma-delimited IP:PORT:INSTANCE where the :INSTANCE part is
309325 # optional and refers to the "None" instance if omitted.
310326 #
+0
-102
distro/redhat/init.d/carbon-aggregator less more
0 #!/bin/bash
1 # chkconfig: - 25 75
2 # description: carbon-aggregator
3 # processname: carbon-aggregator
4
5 export PYTHONPATH="$GRAPHITE_DIR/lib:$PYTHONPATH"
6
7 # Source function library.
8 if [ -e /etc/rc.d/init.d/functions ]; then
9 . /etc/rc.d/init.d/functions;
10 fi;
11
12 CARBON_DAEMON="aggregator"
13 GRAPHITE_DIR="/opt/graphite"
14 INSTANCES=`grep "^\[${CARBON_DAEMON}" ${GRAPHITE_DIR}/conf/carbon.conf | cut -d \[ -f 2 | cut -d \] -f 1 | cut -d : -f 2`
15
16 function die {
17 echo $1
18 exit 1
19 }
20
21 start(){
22 cd $GRAPHITE_DIR;
23
24 for INSTANCE in ${INSTANCES}; do
25 if [ "${INSTANCE}" == "${CARBON_DAEMON}" ]; then
26 INSTANCE="a";
27 fi;
28 echo "Starting carbon-${CARBON_DAEMON}:${INSTANCE}..."
29 bin/carbon-${CARBON_DAEMON}.py --instance=${INSTANCE} start;
30
31 if [ $? -eq 0 ]; then
32 echo_success
33 else
34 echo_failure
35 fi;
36 echo ""
37 done;
38 }
39
40 stop(){
41 cd $GRAPHITE_DIR
42
43 for INSTANCE in ${INSTANCES}; do
44 if [ "${INSTANCE}" == "${CARBON_DAEMON}" ]; then
45 INSTANCE="a";
46 fi;
47 echo "Stopping carbon-${CARBON_DAEMON}:${INSTANCE}..."
48 bin/carbon-${CARBON_DAEMON}.py --instance=${INSTANCE} stop
49
50 if [ `sleep 3; /usr/bin/pgrep -f "carbon-${CARBON_DAEMON}.py --instance=${INSTANCE}" | /usr/bin/wc -l` -gt 0 ]; then
51 echo "Carbon did not stop yet. Sleeping longer, then force killing it...";
52 sleep 20;
53 /usr/bin/pkill -9 -f "carbon-${CARBON_DAEMON}.py --instance=${INSTANCE}";
54 fi;
55
56 if [ $? -eq 0 ]; then
57 echo_success
58 else
59 echo_failure
60 fi;
61 echo ""
62 done;
63 }
64
65 status(){
66 cd $GRAPHITE_DIR;
67
68 for INSTANCE in ${INSTANCES}; do
69 if [ "${INSTANCE}" == "${CARBON_DAEMON}" ]; then
70 INSTANCE="a";
71 fi;
72 bin/carbon-${CARBON_DAEMON}.py --instance=${INSTANCE} status;
73
74 if [ $? -eq 0 ]; then
75 echo_success
76 else
77 echo_failure
78 fi;
79 echo ""
80 done;
81 }
82
83 case "$1" in
84 start)
85 start
86 ;;
87 stop)
88 stop
89 ;;
90 status)
91 status
92 ;;
93 restart|reload)
94 stop
95 start
96 ;;
97 *)
98 echo $"Usage: $0 {start|stop|restart|status}"
99 exit 1
100 esac
101
+0
-102
distro/redhat/init.d/carbon-cache less more
0 #!/bin/bash
1 # chkconfig: - 25 75
2 # description: carbon-cache
3 # processname: carbon-cache
4
5 export PYTHONPATH="$GRAPHITE_DIR/lib:$PYTHONPATH"
6
7 # Source function library.
8 if [ -e /etc/rc.d/init.d/functions ]; then
9 . /etc/rc.d/init.d/functions;
10 fi;
11
12 CARBON_DAEMON="cache"
13 GRAPHITE_DIR="/opt/graphite"
14 INSTANCES=`grep "^\[${CARBON_DAEMON}" ${GRAPHITE_DIR}/conf/carbon.conf | cut -d \[ -f 2 | cut -d \] -f 1 | cut -d : -f 2`
15
16 function die {
17 echo $1
18 exit 1
19 }
20
21 start(){
22 cd $GRAPHITE_DIR;
23
24 for INSTANCE in ${INSTANCES}; do
25 if [ "${INSTANCE}" == "${CARBON_DAEMON}" ]; then
26 INSTANCE="a";
27 fi;
28 echo "Starting carbon-${CARBON_DAEMON}:${INSTANCE}..."
29 bin/carbon-${CARBON_DAEMON}.py --instance=${INSTANCE} start;
30
31 if [ $? -eq 0 ]; then
32 echo_success
33 else
34 echo_failure
35 fi;
36 echo ""
37 done;
38 }
39
40 stop(){
41 cd $GRAPHITE_DIR
42
43 for INSTANCE in ${INSTANCES}; do
44 if [ "${INSTANCE}" == "${CARBON_DAEMON}" ]; then
45 INSTANCE="a";
46 fi;
47 echo "Stopping carbon-${CARBON_DAEMON}:${INSTANCE}..."
48 bin/carbon-${CARBON_DAEMON}.py --instance=${INSTANCE} stop
49
50 if [ `sleep 3; /usr/bin/pgrep -f "carbon-${CARBON_DAEMON}.py --instance=${INSTANCE}" | /usr/bin/wc -l` -gt 0 ]; then
51 echo "Carbon did not stop yet. Sleeping longer, then force killing it...";
52 sleep 20;
53 /usr/bin/pkill -9 -f "carbon-${CARBON_DAEMON}.py --instance=${INSTANCE}";
54 fi;
55
56 if [ $? -eq 0 ]; then
57 echo_success
58 else
59 echo_failure
60 fi;
61 echo ""
62 done;
63 }
64
65 status(){
66 cd $GRAPHITE_DIR;
67
68 for INSTANCE in ${INSTANCES}; do
69 if [ "${INSTANCE}" == "${CARBON_DAEMON}" ]; then
70 INSTANCE="a";
71 fi;
72 bin/carbon-${CARBON_DAEMON}.py --instance=${INSTANCE} status;
73
74 if [ $? -eq 0 ]; then
75 echo_success
76 else
77 echo_failure
78 fi;
79 echo ""
80 done;
81 }
82
83 case "$1" in
84 start)
85 start
86 ;;
87 stop)
88 stop
89 ;;
90 status)
91 status
92 ;;
93 restart|reload)
94 stop
95 start
96 ;;
97 *)
98 echo $"Usage: $0 {start|stop|restart|status}"
99 exit 1
100 esac
101
+0
-102
distro/redhat/init.d/carbon-relay less more
0 #!/bin/bash
1 # chkconfig: - 25 75
2 # description: carbon-relay
3 # processname: carbon-relay
4
5 export PYTHONPATH="$GRAPHITE_DIR/lib:$PYTHONPATH"
6
7 # Source function library.
8 if [ -e /etc/rc.d/init.d/functions ]; then
9 . /etc/rc.d/init.d/functions;
10 fi;
11
12 CARBON_DAEMON="relay"
13 GRAPHITE_DIR="/opt/graphite"
14 INSTANCES=`grep "^\[${CARBON_DAEMON}" ${GRAPHITE_DIR}/conf/carbon.conf | cut -d \[ -f 2 | cut -d \] -f 1 | cut -d : -f 2`
15
16 function die {
17 echo $1
18 exit 1
19 }
20
21 start(){
22 cd $GRAPHITE_DIR;
23
24 for INSTANCE in ${INSTANCES}; do
25 if [ "${INSTANCE}" == "${CARBON_DAEMON}" ]; then
26 INSTANCE="a";
27 fi;
28 echo "Starting carbon-${CARBON_DAEMON}:${INSTANCE}..."
29 bin/carbon-${CARBON_DAEMON}.py --instance=${INSTANCE} start;
30
31 if [ $? -eq 0 ]; then
32 echo_success
33 else
34 echo_failure
35 fi;
36 echo ""
37 done;
38 }
39
40 stop(){
41 cd $GRAPHITE_DIR
42
43 for INSTANCE in ${INSTANCES}; do
44 if [ "${INSTANCE}" == "${CARBON_DAEMON}" ]; then
45 INSTANCE="a";
46 fi;
47 echo "Stopping carbon-${CARBON_DAEMON}:${INSTANCE}..."
48 bin/carbon-${CARBON_DAEMON}.py --instance=${INSTANCE} stop
49
50 if [ `sleep 3; /usr/bin/pgrep -f "carbon-${CARBON_DAEMON}.py --instance=${INSTANCE}" | /usr/bin/wc -l` -gt 0 ]; then
51 echo "Carbon did not stop yet. Sleeping longer, then force killing it...";
52 sleep 20;
53 /usr/bin/pkill -9 -f "carbon-${CARBON_DAEMON}.py --instance=${INSTANCE}";
54 fi;
55
56 if [ $? -eq 0 ]; then
57 echo_success
58 else
59 echo_failure
60 fi;
61 echo ""
62 done;
63 }
64
65 status(){
66 cd $GRAPHITE_DIR;
67
68 for INSTANCE in ${INSTANCES}; do
69 if [ "${INSTANCE}" == "${CARBON_DAEMON}" ]; then
70 INSTANCE="a";
71 fi;
72 bin/carbon-${CARBON_DAEMON}.py --instance=${INSTANCE} status;
73
74 if [ $? -eq 0 ]; then
75 echo_success
76 else
77 echo_failure
78 fi;
79 echo ""
80 done;
81 }
82
83 case "$1" in
84 start)
85 start
86 ;;
87 stop)
88 stop
89 ;;
90 status)
91 status
92 ;;
93 restart|reload)
94 stop
95 start
96 ;;
97 *)
98 echo $"Usage: $0 {start|stop|restart|status}"
99 exit 1
100 esac
101
2828 # Only read if the rules file has been modified
2929 try:
3030 mtime = getmtime(self.rules_file)
31 except:
31 except OSError:
3232 log.err("Failed to get mtime of %s" % self.rules_file)
3333 return
3434 if mtime <= self.rules_last_read:
5858 frequency = int( frequency.lstrip('(').rstrip(')') )
5959 return AggregationRule(input_pattern, output_pattern, method, frequency)
6060
61 except:
61 except ValueError:
6262 log.err("Failed to parse line: %s" % line)
6363 raise
6464
8989 extracted_fields = match.groupdict()
9090 try:
9191 result = self.output_template % extracted_fields
92 except:
92 except TypeError:
9393 log.err("Failed to interpolate template %s with fields %s" % (self.output_template, extracted_fields))
9494
9595 self.cache[metric_path] = result
4444
4545 try:
4646 import carbon
47 except:
47 except ImportError:
4848 # this is being run directly, carbon is not installed
4949 LIB_DIR = os.path.dirname(os.path.dirname(__file__))
5050 sys.path.insert(0, LIB_DIR)
115115 else:
116116 timestamp = time.time()
117117
118 except:
118 except ValueError:
119119 parser.print_usage()
120120 raise SystemExit(1)
121121
1616 from carbon.conf import settings
1717 try:
1818 from collections import defaultdict
19 except:
19 except ImportError:
2020 from util import defaultdict
2121
2222
2323 class _MetricCache(defaultdict):
2424 def __init__(self, defaultfactory=deque, method="sorted"):
25 self.size = 0
2526 self.method = method
2627 if self.method == "sorted":
2728 self.queue = self.gen_queue()
3839 while queue:
3940 yield queue.pop()[0]
4041
41 @property
42 def size(self):
43 return reduce(lambda x, y: x + len(y), self.values(), 0)
44
4542 def store(self, metric, datapoint):
43 self.size += 1
4644 self[metric].append(datapoint)
4745 if self.isFull():
4846 log.msg("MetricCache is full: self.size=%d" % self.size)
5856 raise KeyError(metric)
5957 elif not metric and self.method == "max":
6058 metric = max(self.items(), key=lambda x: len(x[1]))[0]
59 datapoints = (metric, super(_MetricCache, self).pop(metric))
6160 elif not metric and self.method == "naive":
62 return self.popitem()
61 datapoints = self.popitem()
6362 elif not metric and self.method == "sorted":
6463 metric = self.queue.next()
65 datapoints = (metric, super(_MetricCache, self).pop(metric))
64 # Save only last value for each timestamp
65 popped = super(_MetricCache, self).pop(metric)
66 ordered = sorted(dict(popped).items(), key=lambda x: x[0])
67 datapoints = (metric, deque(ordered))
68 self.size -= len(datapoints[1])
6669 return datapoints
6770
6871 @property
66 from carbon.util import pickle
77 from carbon import log, state, instrumentation
88
9
10 SEND_QUEUE_LOW_WATERMARK = settings.MAX_QUEUE_SIZE * 0.8
9 try:
10 import signal
11 except ImportError:
12 log.debug("Couldn't import signal module")
13
14
15 SEND_QUEUE_LOW_WATERMARK = settings.MAX_QUEUE_SIZE * settings.QUEUE_LOW_WATERMARK_PCT
1116
1217
1318 class CarbonClientProtocol(Int32StringReceiver):
101106 self.attemptedRelays = 'destinations.%s.attemptedRelays' % self.destinationName
102107 self.fullQueueDrops = 'destinations.%s.fullQueueDrops' % self.destinationName
103108 self.queuedUntilConnected = 'destinations.%s.queuedUntilConnected' % self.destinationName
109 self.relayMaxQueueLength = 'destinations.%s.relayMaxQueueLength' % self.destinationName
104110
105111 def queueFullCallback(self, result):
106112 state.events.cacheFull()
152158
153159 def sendDatapoint(self, metric, datapoint):
154160 instrumentation.increment(self.attemptedRelays)
161 instrumentation.max(self.relayMaxQueueLength, self.queueSize)
155162 queueSize = self.queueSize
156163 if queueSize >= settings.MAX_QUEUE_SIZE:
157164 if not self.queueFull.called:
204211 self.client_factories = {} # { destination : CarbonClientFactory() }
205212
206213 def startService(self):
214 if 'signal' in globals().keys():
215 log.debug("Installing SIG_IGN for SIGHUP")
216 signal.signal(signal.SIGHUP, signal.SIG_IGN)
207217 Service.startService(self)
208218 for factory in self.client_factories.values():
209219 if not factory.started:
5252 MAX_AGGREGATION_INTERVALS=5,
5353 FORWARD_ALL=False,
5454 MAX_QUEUE_SIZE=1000,
55 QUEUE_LOW_WATERMARK_PCT = 0.8,
5556 ENABLE_AMQP=False,
5657 AMQP_VERBOSE=False,
5758 BIND_PATTERNS=['#'],
7273 WRITE_BACK_FREQUENCY=None,
7374 ENABLE_LOGROTATION=True,
7475 LOG_LISTENER_CONNECTIONS=True,
76 AGGREGATION_RULES='aggregation-rules.conf',
77 REWRITE_RULES='rewrite-rules.conf',
78 RELAY_RULES='relay-rules.conf',
7579 )
7680
7781
148152 # Attempt to figure out numeric types automatically
149153 try:
150154 value = int(value)
151 except:
155 except ValueError:
152156 try:
153157 value = float(value)
154 except:
158 except ValueError:
155159 pass
156160
157161 self[key] = value
249253 logdir = settings.LOG_DIR
250254 if not isdir(logdir):
251255 os.makedirs(logdir)
256 if settings.USER:
257 # We have not yet switched to the specified user,
258 # but that user must be able to create files in this
259 # directory.
260 os.chown(logdir, self.parent["uid"], self.parent["gid"])
252261 log.logToDir(logdir)
253262
254263 if self["whitelist"] is None:
286295 try:
287296 pid = int(pf.read().strip())
288297 pf.close()
289 except:
298 except IOError:
290299 print "Could not read pidfile %s" % pidfile
291300 raise SystemExit(1)
292301 print "Sending kill signal to pid %d" % pid
308317 try:
309318 pid = int(pf.read().strip())
310319 pf.close()
311 except:
320 except IOError:
312321 print "Failed to read pid from %s" % pidfile
313322 raise SystemExit(1)
314323
326335 try:
327336 pid = int(pf.read().strip())
328337 pf.close()
329 except:
338 except IOError:
330339 print "Could not read pidfile %s" % pidfile
331340 raise SystemExit(1)
332341 if _process_alive(pid):
337346 print "Removing stale pidfile %s" % pidfile
338347 try:
339348 os.unlink(pidfile)
340 except:
349 except IOError:
341350 print "Could not remove pidfile %s" % pidfile
342351
343352 print "Starting %s (instance %s)" % (program, instance)
358367 def postOptions(self):
359368 CarbonCacheOptions.postOptions(self)
360369 if self["rules"] is None:
361 self["rules"] = join(settings["CONF_DIR"], "aggregation-rules.conf")
370 self["rules"] = join(settings["CONF_DIR"], settings['AGGREGATION_RULES'])
362371 settings["aggregation-rules"] = self["rules"]
363372
364373 if self["rewrite-rules"] is None:
365374 self["rewrite-rules"] = join(settings["CONF_DIR"],
366 "rewrite-rules.conf")
375 settings['REWRITE_RULES'])
367376 settings["rewrite-rules"] = self["rewrite-rules"]
368377
369378
377386 def postOptions(self):
378387 CarbonCacheOptions.postOptions(self)
379388 if self["rules"] is None:
380 self["rules"] = join(settings["CONF_DIR"], "relay-rules.conf")
389 self["rules"] = join(settings["CONF_DIR"], settings['RELAY_RULES'])
381390 settings["relay-rules"] = self["rules"]
382391
383392 if self["aggregation-rules"] is None:
384 self["aggregation-rules"] = join(settings["CONF_DIR"], "aggregation-rules.conf")
393 self["aggregation-rules"] = join(settings["CONF_DIR"], settings['AGGREGATION_RULES'])
385394 settings["aggregation-rules"] = self["aggregation-rules"]
386395
387396 if settings["RELAY_METHOD"] not in ("rules", "consistent-hashing", "aggregated-consistent-hashing"):
397406 parser.add_option(
398407 "--debug", action="store_true",
399408 help="Run in the foreground, log to stdout")
409 parser.add_option(
410 "--nodaemon", action="store_true",
411 help="Run in the foreground")
400412 parser.add_option(
401413 "--profile",
402414 help="Record performance profile data to the given file")
1717 for handler in self.handlers:
1818 try:
1919 handler(*args, **kwargs)
20 except:
20 except Exception:
2121 log.err(None, "Exception in %s event handler: args=%s kwargs=%s" % (self.name, args, kwargs))
2222
2323
2828 except KeyError:
2929 stats[stat] = increase
3030
31 def max(stat, newval):
32 try:
33 if stats[stat] < newval:
34 stats[stat] = newval
35 except KeyError:
36 stats[stat] = newval
3137
3238 def append(stat, value):
3339 try:
7581 creates = myStats.get('creates', 0)
7682 errors = myStats.get('errors', 0)
7783 cacheQueries = myStats.get('cacheQueries', 0)
84 cacheBulkQueries = myStats.get('cacheBulkQueries', 0)
7885 cacheOverflow = myStats.get('cache.overflow', 0)
86 cacheBulkQuerySizes = myStats.get('cacheBulkQuerySize', [])
7987
8088 # Calculate cache-data-structure-derived metrics prior to storing anything
8189 # in the cache itself -- which would otherwise affect said metrics.
92100 pointsPerUpdate = float(committedPoints) / len(updateTimes)
93101 record('pointsPerUpdate', pointsPerUpdate)
94102
103 if cacheBulkQuerySizes:
104 avgBulkSize = sum(cacheBulkQuerySizes) / len(cacheBulkQuerySizes)
105 record('cache.bulk_queries_average_size', avgBulkSize)
106
95107 record('updateOperations', len(updateTimes))
96108 record('committedPoints', committedPoints)
97109 record('creates', creates)
98110 record('errors', errors)
99111 record('cache.queries', cacheQueries)
112 record('cache.bulk_queries', cacheBulkQueries)
100113 record('cache.overflow', cacheOverflow)
101114
102115 # aggregator metrics
117130
118131 # common metrics
119132 record('metricsReceived', myStats.get('metricsReceived', 0))
133 record('blacklistMatches', myStats.get('blacklistMatches', 0))
134 record('whitelistRejects', myStats.get('whitelistRejects', 0))
120135 record('cpuUsage', getCpuUsage())
121136 try: # This only works on Linux
122137 record('memUsage', getMemUsage())
123 except:
138 except Exception:
124139 pass
125140
126141
1111 try:
1212 value = whisper.info(wsp_path)['aggregationMethod']
1313 return dict(value=value)
14 except:
14 except Exception:
1515 log.err()
1616 return dict(error=traceback.format_exc())
1717
2424 try:
2525 old_value = whisper.setAggregationMethod(wsp_path, value)
2626 return dict(old_value=old_value, new_value=value)
27 except:
27 except Exception:
2828 log.err()
2929 return dict(error=traceback.format_exc())
7070 try:
7171 metric, value, timestamp = line.strip().split()
7272 datapoint = (float(timestamp), float(value))
73 except:
73 except ValueError:
7474 log.listener('invalid line received from client %s, ignoring' % self.peerName)
7575 return
7676
8585 datapoint = (float(timestamp), float(value))
8686
8787 self.metricReceived(metric, datapoint)
88 except:
88 except ValueError:
8989 log.listener('invalid line received from %s, ignoring' % host)
9090
9191
9999 def stringReceived(self, data):
100100 try:
101101 datapoints = self.unpickler.loads(data)
102 except:
102 except pickle.UnpicklingError:
103103 log.listener('invalid pickle received from %s, ignoring' % self.peerName)
104104 return
105105
110110 log.listener('Error decoding pickle: %s' % e)
111111 try:
112112 datapoint = (float(value), float(timestamp)) # force proper types
113 except:
113 except ValueError:
114114 continue
115115
116116 self.metricReceived(metric, datapoint)
117117
118118
119119 class CacheManagementHandler(Int32StringReceiver):
120 MAX_LENGTH = 1024 ** 3 # 1mb
121
120122 def connectionMade(self):
121123 peer = self.transport.getPeer()
122124 self.peerAddr = "%s:%d" % (peer.host, peer.port)
135137 metric = request['metric']
136138 datapoints = MetricCache.get(metric, [])
137139 result = dict(datapoints=datapoints)
138 if settings.LOG_CACHE_HITS is True:
140 if settings.LOG_CACHE_HITS:
139141 log.query('[%s] cache query for \"%s\" returned %d values' % (self.peerAddr, metric, len(datapoints)))
140142 instrumentation.increment('cacheQueries')
143
144 elif request['type'] == 'cache-query-bulk':
145 datapointsByMetric = {}
146 metrics = request['metrics']
147 for metric in metrics:
148 datapointsByMetric[metric] = MetricCache.get(metric, [])
149
150 result = dict(datapointsByMetric=datapointsByMetric)
151
152 if settings.LOG_CACHE_HITS:
153 log.query('[%s] cache query bulk for \"%d\" metrics returned %d values' %
154 (self.peerAddr, len(metrics), sum([len(datapoints) for datapoints in datapointsByMetric.values()])))
155 instrumentation.increment('cacheBulkQueries')
156 instrumentation.append('cacheBulkQuerySize', len(metrics))
141157
142158 elif request['type'] == 'get-metadata':
143159 result = management.getMetadata(request['metric'], request['key'])
2626
2727 try:
2828 mtime = os.path.getmtime(self.list_file)
29 except:
29 except OSError:
3030 log.err("Failed to get mtime of %s" % self.list_file)
3131 return
3232
4141 continue
4242 try:
4343 new_regex_list.append(re.compile(pattern))
44 except:
44 except re.error:
4545 log.err("Failed to parse '%s' in '%s'. Ignoring line" % (pattern, self.list_file))
4646
4747 self.regex_list = new_regex_list
2828 # Only read if the rules file has been modified
2929 try:
3030 mtime = getmtime(self.rules_file)
31 except:
31 except OSError:
3232 log.err("Failed to get mtime of %s" % self.rules_file)
3333 return
3434 if mtime <= self.rules_last_read:
1919 from twisted.python.components import Componentized
2020 from twisted.python.log import ILogObserver
2121 # Attaching modules to the global state module simplifies import order hassles
22 from carbon import util, state, events, instrumentation
22 from carbon import state, util, events
2323 from carbon.log import carbonLogObserver
2424 from carbon.exceptions import CarbonConfigException
25 state.events = events
26 state.instrumentation = instrumentation
25
26 state.events = state
2727
2828
2929 class CarbonRootService(MultiService):
3737
3838 def createBaseService(config):
3939 from carbon.conf import settings
40 from carbon import instrumentation
41
42 global state
43 state.instrumentation = instrumentation
44
4045 from carbon.protocols import (MetricLineReceiver, MetricPickleReceiver,
4146 MetricDatagramReceiver)
4247
146151 root_service = createBaseService(config)
147152
148153 # Configure application components
149 router = ConsistentHashingRouter()
154 router = ConsistentHashingRouter(settings.REPLICATION_FACTOR)
150155 client_manager = CarbonClientManager(router)
151156 client_manager.setServiceParent(root_service)
152157
170170 assert 0 <= xFilesFactor <= 1
171171 if aggregationMethod is not None:
172172 assert aggregationMethod in whisper.aggregationMethods
173 except:
173 except ValueError:
174174 log.msg("Invalid schemas found in %s." % section)
175175 continue
176176
1010 try:
1111 import cPickle as pickle
1212 USING_CPICKLE = True
13 except:
13 except ImportError:
1414 import pickle
1515 USING_CPICKLE = False
1616
1717 from time import sleep, time
1818 from twisted.python.util import initgroups
1919 from twisted.scripts.twistd import runApp
20 from twisted.scripts._twistd_unix import daemonize
21
22
23 daemonize = daemonize # Backwards compatibility
2420
2521
2622 def dropprivs(user):
6258 try:
6359 from twisted.internet import epollreactor
6460 twistd_options.append("--reactor=epoll")
65 except:
61 except ImportError:
6662 pass
6763
68 if options.debug:
64 if options.debug or options.nodaemon:
6965 twistd_options.extend(["--nodaemon"])
7066 if options.profile:
7167 twistd_options.append("--profile")
8278
8379 for option_name, option_value in vars(options).items():
8480 if (option_value is not None and
85 option_name not in ("debug", "profile", "pidfile", "umask")):
81 option_name not in ("debug", "profile", "pidfile", "umask", "nodaemon")):
8682 twistd_options.extend(["--%s" % option_name.replace("_", "-"),
8783 option_value])
8884
198194 return True
199195 return False
200196
197 def setCapacityAndFillRate(self, new_capacity, new_fill_rate):
198 delta = float(new_capacity) - self.capacity
199 self.capacity = float(new_capacity)
200 self.fill_rate = float(new_fill_rate)
201 self._tokens = delta + self._tokens
202
201203 @property
202204 def tokens(self):
203205 '''The tokens property will return the current number of tokens in the
2727 from twisted.internet import reactor
2828 from twisted.internet.task import LoopingCall
2929 from twisted.application.service import Service
30
31 try:
32 import signal
33 except ImportError:
34 log.debug("Couldn't import signal module")
3035
3136
3237 SCHEMAS = loadStorageSchemas()
106111 dbDir = dirname(dbFilePath)
107112 try:
108113 if not exists(dbDir):
109 os.makedirs(dbDir, 0755)
114 os.makedirs(dbDir)
110115 except OSError, e:
111116 log.err("%s" % e)
112117 log.creates("creating database file %s (archive=%s xff=%s agg=%s)" %
113118 (dbFilePath, archiveConfig, xFilesFactor, aggregationMethod))
114 whisper.create(
115 dbFilePath,
116 archiveConfig,
117 xFilesFactor,
118 aggregationMethod,
119 settings.WHISPER_SPARSE_CREATE,
120 settings.WHISPER_FALLOCATE_CREATE)
121 instrumentation.increment('creates')
119 try:
120 whisper.create(
121 dbFilePath,
122 archiveConfig,
123 xFilesFactor,
124 aggregationMethod,
125 settings.WHISPER_SPARSE_CREATE,
126 settings.WHISPER_FALLOCATE_CREATE)
127 instrumentation.increment('creates')
128 except:
129 log.err("Error creating %s" % (dbFilePath))
130 continue
122131 # If we've got a rate limit configured lets makes sure we enforce it
123132 if UPDATE_BUCKET:
124133 UPDATE_BUCKET.drain(1, blocking=True)
126135 t1 = time.time()
127136 whisper.update_many(dbFilePath, datapoints)
128137 updateTime = time.time() - t1
129 except:
138 except Exception:
130139 log.msg("Error writing to %s" % (dbFilePath))
131140 log.err()
132141 instrumentation.increment('errors')
146155 while reactor.running:
147156 try:
148157 writeCachedDataPoints()
149 except:
158 except Exception:
150159 log.err()
151160 time.sleep(1) # The writer thread only sleeps when the cache is empty or an error occurs
152161
155164 global SCHEMAS
156165 try:
157166 SCHEMAS = loadStorageSchemas()
158 except:
167 except Exception:
159168 log.msg("Failed to reload storage SCHEMAS")
160169 log.err()
161170
164173 global AGGREGATION_SCHEMAS
165174 try:
166175 AGGREGATION_SCHEMAS = loadAggregationSchemas()
167 except:
176 except Exception:
168177 log.msg("Failed to reload aggregation SCHEMAS")
169178 log.err()
170179
171180
172181 def shutdownModifyUpdateSpeed():
173182 try:
174 settings.MAX_UPDATES_PER_SECOND = settings.MAX_UPDATES_PER_SECOND_ON_SHUTDOWN
183 shut = settings.MAX_UPDATES_PER_SECOND_ON_SHUTDOWN
184 if UPDATE_BUCKET:
185 UPDATE_BUCKET.setCapacityAndFillRate(shut,shut)
186 if CREATE_BUCKET:
187 CREATE_BUCKET.setCapacityAndFillRate(shut,shut)
175188 log.msg("Carbon shutting down. Changed the update rate to: " + str(settings.MAX_UPDATES_PER_SECOND_ON_SHUTDOWN))
176189 except KeyError:
177190 log.msg("Carbon shutting down. Update rate not changed")
184197 self.aggregation_reload_task = LoopingCall(reloadAggregationSchemas)
185198
186199 def startService(self):
200 if 'signal' in globals().keys():
201 log.debug("Installing SIG_IGN for SIGHUP")
202 signal.signal(signal.SIGHUP, signal.SIG_IGN)
187203 self.storage_reload_task.start(60, False)
188204 self.aggregation_reload_task.start(60, False)
189205 reactor.addSystemEventTrigger('before', 'shutdown', shutdownModifyUpdateSpeed)
2828
2929 setup(
3030 name='carbon',
31 version='0.9.12',
31 version='0.9.13',
3232 url='http://graphite-project.github.com',
3333 author='Chris Davis',
3434 author_email='chrismd@gmail.com',