Codebase list graphite-carbon / 37b007f
Imported Upstream version 0.9.12 Jonas Genannt 10 years ago
32 changed file(s) with 964 addition(s) and 287 deletion(s). Raw diff Collapse all Expand all
00 Metadata-Version: 1.0
11 Name: carbon
2 Version: 0.9.10
2 Version: 0.9.12
33 Summary: Backend data caching and persistence daemon for Graphite
4 Home-page: https://launchpad.net/graphite
4 Home-page: http://graphite-project.github.com
55 Author: Chris Davis
66 Author-email: chrismd@gmail.com
77 License: Apache Software License 2.0
1313 limitations under the License."""
1414
1515 import sys
16 from os.path import dirname, join, abspath
16 import os.path
1717
1818 # Figure out where we're installed
19 BIN_DIR = dirname(abspath(__file__))
20 ROOT_DIR = dirname(BIN_DIR)
19 BIN_DIR = os.path.dirname(os.path.abspath(__file__))
20 ROOT_DIR = os.path.dirname(BIN_DIR)
2121
2222 # Make sure that carbon's 'lib' dir is in the $PYTHONPATH if we're running from
2323 # source.
24 LIB_DIR = join(ROOT_DIR, 'lib')
24 LIB_DIR = os.path.join(ROOT_DIR, "lib")
2525 sys.path.insert(0, LIB_DIR)
2626
2727 from carbon.util import run_twistd_plugin
28 from carbon.exceptions import CarbonConfigException
2829
29 run_twistd_plugin(__file__)
30 try:
31 run_twistd_plugin(__file__)
32 except CarbonConfigException, exc:
33 raise SystemExit(str(exc))
1313 limitations under the License."""
1414
1515 import sys
16 from os.path import dirname, join, abspath
16 import os.path
1717
1818 # Figure out where we're installed
19 BIN_DIR = dirname(abspath(__file__))
20 ROOT_DIR = dirname(BIN_DIR)
19 BIN_DIR = os.path.dirname(os.path.abspath(__file__))
20 ROOT_DIR = os.path.dirname(BIN_DIR)
2121
2222 # Make sure that carbon's 'lib' dir is in the $PYTHONPATH if we're running from
2323 # source.
24 LIB_DIR = join(ROOT_DIR, 'lib')
24 LIB_DIR = os.path.join(ROOT_DIR, "lib")
2525 sys.path.insert(0, LIB_DIR)
2626
2727 from carbon.util import run_twistd_plugin
28 from carbon.exceptions import CarbonConfigException
2829
29 run_twistd_plugin(__file__)
30 try:
31 run_twistd_plugin(__file__)
32 except CarbonConfigException, exc:
33 raise SystemExit(str(exc))
1313 limitations under the License."""
1414
1515 import sys
16 from os.path import dirname, join, abspath
16 import os.path
1717
1818 # Figure out where we're installed
19 BIN_DIR = dirname(abspath(__file__))
20 ROOT_DIR = dirname(BIN_DIR)
19 BIN_DIR = os.path.dirname(os.path.abspath(__file__))
20 ROOT_DIR = os.path.dirname(BIN_DIR)
2121
2222 # Make sure that carbon's 'lib' dir is in the $PYTHONPATH if we're running from
2323 # source.
24 LIB_DIR = join(ROOT_DIR, 'lib')
24 LIB_DIR = os.path.join(ROOT_DIR, "lib")
2525 sys.path.insert(0, LIB_DIR)
2626
2727 from carbon.util import run_twistd_plugin
28 from carbon.exceptions import CarbonConfigException
2829
29 run_twistd_plugin(__file__)
30 try:
31 run_twistd_plugin(__file__)
32 except CarbonConfigException, exc:
33 raise SystemExit(str(exc))
2626
2727 config_parser = ConfigParser()
2828 if not config_parser.read(SCHEMAS_FILE):
29 print "Error: Couldn't read config file: %s" % SCHEMAS_FILE
30 sys.exit(1)
29 raise SystemExit("Error: Couldn't read config file: %s" % SCHEMAS_FILE)
3130
3231 errors_found = 0
3332
6160 print " OK"
6261
6362 if errors_found:
64 print
65 print "Storage-schemas configuration '%s' failed validation" % SCHEMAS_FILE
66 sys.exit(1)
63 raise SystemExit( "Storage-schemas configuration '%s' failed validation" % SCHEMAS_FILE)
6764
68 print
6965 print "Storage-schemas configuration '%s' is valid" % SCHEMAS_FILE
2525 # aggregate metric 'prod.applications.apache.all.requests' would be calculated
2626 # by summing their values.
2727 #
28 # Template components such as <env> will match everything up to the next dot.
29 # To match metric multiple components including the dots, use <<metric>> in the
30 # input template:
31 #
32 # <env>.applications.<app>.all.<app_metric> (60) = sum <env>.applications.<app>.*.<<app_metric>>
33 #
2834 # Note that any time this file is modified, it will be re-read automatically.
2929 #
3030 #LOCAL_DATA_DIR = /opt/graphite/storage/whisper/
3131
32 # Enable daily log rotation. If disabled, a kill -HUP can be used after a manual rotate
33 ENABLE_LOGROTATION = True
34
3235 # Specify the user to drop privileges to
3336 # If this is blank carbon runs as the user that invokes it
3437 # This user must have write access to the local data directory
3538 USER =
39 #
40 # NOTE: The above settings must be set under [relay] and [aggregator]
41 # to take effect for those daemons as well
3642
3743 # Limit the size of the cache to avoid swapping or becoming CPU bound.
3844 # Sorts and serving cache queries gets more expensive as the cache grows.
4551 # When the rate of required updates exceeds this, then carbon's caching will
4652 # take effect and increase the overall throughput accordingly.
4753 MAX_UPDATES_PER_SECOND = 500
54
55 # If defined, this changes the MAX_UPDATES_PER_SECOND in Carbon when a
56 # stop/shutdown is initiated. This helps when MAX_UPDATES_PER_SECOND is
57 # relatively low and carbon has cached a lot of updates; it enables the carbon
58 # daemon to shutdown more quickly.
59 # MAX_UPDATES_PER_SECOND_ON_SHUTDOWN = 1000
4860
4961 # Softly limits the number of whisper files that get created each minute.
5062 # Setting this value low (like at 50) is a good way to ensure your graphite
6880 PICKLE_RECEIVER_INTERFACE = 0.0.0.0
6981 PICKLE_RECEIVER_PORT = 2004
7082
83 # Set to false to disable logging of successful connections
84 LOG_LISTENER_CONNECTIONS = True
85
7186 # Per security concerns outlined in Bug #817247 the pickle receiver
7287 # will use a more secure and slightly less efficient unpickler.
7388 # Set this to True to revert to the old-fashioned insecure unpickler.
8297 # data until the cache size falls below 95% MAX_CACHE_SIZE.
8398 USE_FLOW_CONTROL = True
8499
85 # By default, carbon-cache will log every whisper update. This can be excessive and
100 # By default, carbon-cache will log every whisper update and cache hit. This can be excessive and
86101 # degrade performance if logging on the same volume as the whisper data is stored.
87102 LOG_UPDATES = False
103 LOG_CACHE_HITS = False
104 LOG_CACHE_QUEUE_SORTS = True
105
106 # The thread that writes metrics to disk can use on of the following strategies
107 # determining the order in which metrics are removed from cache and flushed to
108 # disk. The default option preserves the same behavior as has been historically
109 # available in version 0.9.10.
110 #
111 # sorted - All metrics in the cache will be counted and an ordered list of
112 # them will be sorted according to the number of datapoints in the cache at the
113 # moment of the list's creation. Metrics will then be flushed from the cache to
114 # disk in that order.
115 #
116 # max - The writer thread will always pop and flush the metric from cache
117 # that has the most datapoints. This will give a strong flush preference to
118 # frequently updated metrics and will also reduce random file-io. Infrequently
119 # updated metrics may only ever be persisted to disk at daemon shutdown if
120 # there are a large number of metrics which receive very frequent updates OR if
121 # disk i/o is very slow.
122 #
123 # naive - Metrics will be flushed from the cache to disk in an unordered
124 # fashion. This strategy may be desirable in situations where the storage for
125 # whisper files is solid state, CPU resources are very limited or deference to
126 # the OS's i/o scheduler is expected to compensate for the random write
127 # pattern.
128 #
129 CACHE_WRITE_STRATEGY = sorted
88130
89131 # On some systems it is desirable for whisper to write synchronously.
90132 # Set this option to True if you'd like to try this. Basically it will
98140 # MAX_CREATES_PER_MINUTE but may have longer term performance implications
99141 # depending on the underlying storage configuration.
100142 # WHISPER_SPARSE_CREATE = False
143
144 # Only beneficial on linux filesystems that support the fallocate system call.
145 # It maintains the benefits of contiguous reads/writes, but with a potentially
146 # much faster creation speed, by allowing the kernel to handle the block
147 # allocation and zero-ing. Enabling this option may allow a large increase of
148 # MAX_CREATES_PER_MINUTE. If enabled on an OS or filesystem that is unsupported
149 # this option will gracefully fallback to standard POSIX file access methods.
150 WHISPER_FALLOCATE_CREATE = True
101151
102152 # Enabling this option will cause Whisper to lock each Whisper file it writes
103153 # to with an exclusive lock (LOCK_EX, see: man 2 flock). This is useful when
169219 PICKLE_RECEIVER_INTERFACE = 0.0.0.0
170220 PICKLE_RECEIVER_PORT = 2014
171221
172 # To use consistent hashing instead of the user defined relay-rules.conf,
173 # change this to:
174 # RELAY_METHOD = consistent-hashing
222 # Set to false to disable logging of successful connections
223 LOG_LISTENER_CONNECTIONS = True
224
225 # Carbon-relay has several options for metric routing controlled by RELAY_METHOD
226 #
227 # Use relay-rules.conf to route metrics to destinations based on pattern rules
228 #RELAY_METHOD = rules
229 #
230 # Use consistent-hashing for even distribution of metrics between destinations
231 #RELAY_METHOD = consistent-hashing
232 #
233 # Use consistent-hashing but take into account an aggregation-rules.conf shared
234 # by downstream carbon-aggregator daemons. This will ensure that all metrics
235 # that map to a given aggregation rule are sent to the same carbon-aggregator
236 # instance.
237 # Enable this for carbon-relays that send to a group of carbon-aggregators
238 #RELAY_METHOD = aggregated-consistent-hashing
175239 RELAY_METHOD = rules
176240
177 # If you use consistent-hashing you may want to add redundancy
178 # of your data by replicating every datapoint to more than
179 # one machine.
241 # If you use consistent-hashing you can add redundancy by replicating every
242 # datapoint to more than one machine.
180243 REPLICATION_FACTOR = 1
181244
182245 # This is a list of carbon daemons we will send any relayed or
227290 PICKLE_RECEIVER_INTERFACE = 0.0.0.0
228291 PICKLE_RECEIVER_PORT = 2024
229292
293 # Set to false to disable logging of successful connections
294 LOG_LISTENER_CONNECTIONS = True
295
296 # If set true, metric received will be forwarded to DESTINATIONS in addition to
297 # the output of the aggregation rules. If set false the carbon-aggregator will
298 # only ever send the output of aggregation.
299 FORWARD_ALL = True
300
230301 # This is a list of carbon daemons we will send any relayed or
231302 # generated metrics to. The default provided would send to a single
232303 # carbon-cache instance on the default port. However if you
267338 # the past MAX_AGGREGATION_INTERVALS * intervalSize seconds.
268339 MAX_AGGREGATION_INTERVALS = 5
269340
341 # By default (WRITE_BACK_FREQUENCY = 0), carbon-aggregator will write back
342 # aggregated data points once every rule.frequency seconds, on a per-rule basis.
343 # Set this (WRITE_BACK_FREQUENCY = N) to write back all aggregated data points
344 # every N seconds, independent of rule frequency. This is useful, for example,
345 # to be able to query partially aggregated metrics from carbon-cache without
346 # having to first wait rule.frequency seconds.
347 # WRITE_BACK_FREQUENCY = 0
348
270349 # Set this to True to enable whitelisting and blacklisting of metrics in
271350 # CONF_DIR/whitelist and CONF_DIR/blacklist. If the whitelist is missing or
272351 # empty, all metrics will pass through
0 #!/bin/bash
1 # chkconfig: - 25 75
2 # description: carbon-aggregator
3 # processname: carbon-aggregator
4
5 export PYTHONPATH="$GRAPHITE_DIR/lib:$PYTHONPATH"
6
7 # Source function library.
8 if [ -e /etc/rc.d/init.d/functions ]; then
9 . /etc/rc.d/init.d/functions;
10 fi;
11
12 CARBON_DAEMON="aggregator"
13 GRAPHITE_DIR="/opt/graphite"
14 INSTANCES=`grep "^\[${CARBON_DAEMON}" ${GRAPHITE_DIR}/conf/carbon.conf | cut -d \[ -f 2 | cut -d \] -f 1 | cut -d : -f 2`
15
16 function die {
17 echo $1
18 exit 1
19 }
20
21 start(){
22 cd $GRAPHITE_DIR;
23
24 for INSTANCE in ${INSTANCES}; do
25 if [ "${INSTANCE}" == "${CARBON_DAEMON}" ]; then
26 INSTANCE="a";
27 fi;
28 echo "Starting carbon-${CARBON_DAEMON}:${INSTANCE}..."
29 bin/carbon-${CARBON_DAEMON}.py --instance=${INSTANCE} start;
30
31 if [ $? -eq 0 ]; then
32 echo_success
33 else
34 echo_failure
35 fi;
36 echo ""
37 done;
38 }
39
40 stop(){
41 cd $GRAPHITE_DIR
42
43 for INSTANCE in ${INSTANCES}; do
44 if [ "${INSTANCE}" == "${CARBON_DAEMON}" ]; then
45 INSTANCE="a";
46 fi;
47 echo "Stopping carbon-${CARBON_DAEMON}:${INSTANCE}..."
48 bin/carbon-${CARBON_DAEMON}.py --instance=${INSTANCE} stop
49
50 if [ `sleep 3; /usr/bin/pgrep -f "carbon-${CARBON_DAEMON}.py --instance=${INSTANCE}" | /usr/bin/wc -l` -gt 0 ]; then
51 echo "Carbon did not stop yet. Sleeping longer, then force killing it...";
52 sleep 20;
53 /usr/bin/pkill -9 -f "carbon-${CARBON_DAEMON}.py --instance=${INSTANCE}";
54 fi;
55
56 if [ $? -eq 0 ]; then
57 echo_success
58 else
59 echo_failure
60 fi;
61 echo ""
62 done;
63 }
64
65 status(){
66 cd $GRAPHITE_DIR;
67
68 for INSTANCE in ${INSTANCES}; do
69 if [ "${INSTANCE}" == "${CARBON_DAEMON}" ]; then
70 INSTANCE="a";
71 fi;
72 bin/carbon-${CARBON_DAEMON}.py --instance=${INSTANCE} status;
73
74 if [ $? -eq 0 ]; then
75 echo_success
76 else
77 echo_failure
78 fi;
79 echo ""
80 done;
81 }
82
83 case "$1" in
84 start)
85 start
86 ;;
87 stop)
88 stop
89 ;;
90 status)
91 status
92 ;;
93 restart|reload)
94 stop
95 start
96 ;;
97 *)
98 echo $"Usage: $0 {start|stop|restart|status}"
99 exit 1
100 esac
101
0 #!/bin/bash
1 # chkconfig: - 25 75
2 # description: carbon-cache
3 # processname: carbon-cache
4
5 export PYTHONPATH="$GRAPHITE_DIR/lib:$PYTHONPATH"
6
7 # Source function library.
8 if [ -e /etc/rc.d/init.d/functions ]; then
9 . /etc/rc.d/init.d/functions;
10 fi;
11
12 CARBON_DAEMON="cache"
13 GRAPHITE_DIR="/opt/graphite"
14 INSTANCES=`grep "^\[${CARBON_DAEMON}" ${GRAPHITE_DIR}/conf/carbon.conf | cut -d \[ -f 2 | cut -d \] -f 1 | cut -d : -f 2`
15
16 function die {
17 echo $1
18 exit 1
19 }
20
21 start(){
22 cd $GRAPHITE_DIR;
23
24 for INSTANCE in ${INSTANCES}; do
25 if [ "${INSTANCE}" == "${CARBON_DAEMON}" ]; then
26 INSTANCE="a";
27 fi;
28 echo "Starting carbon-${CARBON_DAEMON}:${INSTANCE}..."
29 bin/carbon-${CARBON_DAEMON}.py --instance=${INSTANCE} start;
30
31 if [ $? -eq 0 ]; then
32 echo_success
33 else
34 echo_failure
35 fi;
36 echo ""
37 done;
38 }
39
40 stop(){
41 cd $GRAPHITE_DIR
42
43 for INSTANCE in ${INSTANCES}; do
44 if [ "${INSTANCE}" == "${CARBON_DAEMON}" ]; then
45 INSTANCE="a";
46 fi;
47 echo "Stopping carbon-${CARBON_DAEMON}:${INSTANCE}..."
48 bin/carbon-${CARBON_DAEMON}.py --instance=${INSTANCE} stop
49
50 if [ `sleep 3; /usr/bin/pgrep -f "carbon-${CARBON_DAEMON}.py --instance=${INSTANCE}" | /usr/bin/wc -l` -gt 0 ]; then
51 echo "Carbon did not stop yet. Sleeping longer, then force killing it...";
52 sleep 20;
53 /usr/bin/pkill -9 -f "carbon-${CARBON_DAEMON}.py --instance=${INSTANCE}";
54 fi;
55
56 if [ $? -eq 0 ]; then
57 echo_success
58 else
59 echo_failure
60 fi;
61 echo ""
62 done;
63 }
64
65 status(){
66 cd $GRAPHITE_DIR;
67
68 for INSTANCE in ${INSTANCES}; do
69 if [ "${INSTANCE}" == "${CARBON_DAEMON}" ]; then
70 INSTANCE="a";
71 fi;
72 bin/carbon-${CARBON_DAEMON}.py --instance=${INSTANCE} status;
73
74 if [ $? -eq 0 ]; then
75 echo_success
76 else
77 echo_failure
78 fi;
79 echo ""
80 done;
81 }
82
83 case "$1" in
84 start)
85 start
86 ;;
87 stop)
88 stop
89 ;;
90 status)
91 status
92 ;;
93 restart|reload)
94 stop
95 start
96 ;;
97 *)
98 echo $"Usage: $0 {start|stop|restart|status}"
99 exit 1
100 esac
101
0 #!/bin/bash
1 # chkconfig: - 25 75
2 # description: carbon-relay
3 # processname: carbon-relay
4
5 export PYTHONPATH="$GRAPHITE_DIR/lib:$PYTHONPATH"
6
7 # Source function library.
8 if [ -e /etc/rc.d/init.d/functions ]; then
9 . /etc/rc.d/init.d/functions;
10 fi;
11
12 CARBON_DAEMON="relay"
13 GRAPHITE_DIR="/opt/graphite"
14 INSTANCES=`grep "^\[${CARBON_DAEMON}" ${GRAPHITE_DIR}/conf/carbon.conf | cut -d \[ -f 2 | cut -d \] -f 1 | cut -d : -f 2`
15
16 function die {
17 echo $1
18 exit 1
19 }
20
21 start(){
22 cd $GRAPHITE_DIR;
23
24 for INSTANCE in ${INSTANCES}; do
25 if [ "${INSTANCE}" == "${CARBON_DAEMON}" ]; then
26 INSTANCE="a";
27 fi;
28 echo "Starting carbon-${CARBON_DAEMON}:${INSTANCE}..."
29 bin/carbon-${CARBON_DAEMON}.py --instance=${INSTANCE} start;
30
31 if [ $? -eq 0 ]; then
32 echo_success
33 else
34 echo_failure
35 fi;
36 echo ""
37 done;
38 }
39
40 stop(){
41 cd $GRAPHITE_DIR
42
43 for INSTANCE in ${INSTANCES}; do
44 if [ "${INSTANCE}" == "${CARBON_DAEMON}" ]; then
45 INSTANCE="a";
46 fi;
47 echo "Stopping carbon-${CARBON_DAEMON}:${INSTANCE}..."
48 bin/carbon-${CARBON_DAEMON}.py --instance=${INSTANCE} stop
49
50 if [ `sleep 3; /usr/bin/pgrep -f "carbon-${CARBON_DAEMON}.py --instance=${INSTANCE}" | /usr/bin/wc -l` -gt 0 ]; then
51 echo "Carbon did not stop yet. Sleeping longer, then force killing it...";
52 sleep 20;
53 /usr/bin/pkill -9 -f "carbon-${CARBON_DAEMON}.py --instance=${INSTANCE}";
54 fi;
55
56 if [ $? -eq 0 ]; then
57 echo_success
58 else
59 echo_failure
60 fi;
61 echo ""
62 done;
63 }
64
65 status(){
66 cd $GRAPHITE_DIR;
67
68 for INSTANCE in ${INSTANCES}; do
69 if [ "${INSTANCE}" == "${CARBON_DAEMON}" ]; then
70 INSTANCE="a";
71 fi;
72 bin/carbon-${CARBON_DAEMON}.py --instance=${INSTANCE} status;
73
74 if [ $? -eq 0 ]; then
75 echo_success
76 else
77 echo_failure
78 fi;
79 echo ""
80 done;
81 }
82
83 case "$1" in
84 start)
85 start
86 ;;
87 stop)
88 stop
89 ;;
90 status)
91 status
92 ;;
93 restart|reload)
94 stop
95 start
96 ;;
97 *)
98 echo $"Usage: $0 {start|stop|restart|status}"
99 exit 1
100 esac
101
5050 self.aggregation_frequency = int(frequency)
5151 self.aggregation_func = func
5252 self.compute_task = LoopingCall(self.compute_value)
53 self.compute_task.start(frequency, now=False)
53 self.compute_task.start(settings['WRITE_BACK_FREQUENCY'] or frequency, now=False)
5454 self.configured = True
5555
5656 def compute_value(self):
6868
6969 if buffer.interval < age_threshold:
7070 del self.interval_buffers[buffer.interval]
71 if not self.interval_buffers:
72 self.close()
73 self.configured = False
74 del BufferManager.buffers[self.metric_path]
7175
7276 def close(self):
7377 if self.compute_task and self.compute_task.running:
00 from carbon.instrumentation import increment
11 from carbon.aggregator.rules import RuleManager
22 from carbon.aggregator.buffers import BufferManager
3 from carbon.conf import settings
34 from carbon.rewrite import RewriteRuleManager
45 from carbon import events
56
3031 for rule in RewriteRuleManager.postRules:
3132 metric = rule.apply(metric)
3233
33 if metric not in aggregate_metrics:
34 if settings['FORWARD_ALL'] and metric not in aggregate_metrics:
3435 events.metricGenerated(metric, datapoint)
138138 AGGREGATION_METHODS = {
139139 'sum' : sum,
140140 'avg' : avg,
141 'min' : min,
142 'max' : max,
141143 }
142144
143145 # Importable singleton
3434 import socket
3535 from optparse import OptionParser
3636
37 from twisted.internet.defer import inlineCallbacks
37 from twisted.python.failure import Failure
38 from twisted.internet.defer import deferredGenerator, waitForDeferred
3839 from twisted.internet import reactor
3940 from twisted.internet.protocol import ReconnectingClientFactory
4041 from txamqp.protocol import AMQClient
6162
6263 consumer_tag = "graphite_consumer"
6364
64 @inlineCallbacks
65 @deferredGenerator
6566 def connectionMade(self):
66 yield AMQClient.connectionMade(self)
67 AMQClient.connectionMade(self)
6768 log.listener("New AMQP connection made")
68 yield self.setup()
69 yield self.receive_loop()
70
71 @inlineCallbacks
69 self.setup()
70 wfd = waitForDeferred(self.receive_loop())
71 yield wfd
72
73 @deferredGenerator
7274 def setup(self):
7375 exchange = self.factory.exchange_name
7476
75 yield self.authenticate(self.factory.username, self.factory.password)
76 chan = yield self.channel(1)
77 yield chan.channel_open()
77 d = self.authenticate(self.factory.username, self.factory.password)
78 wfd = waitForDeferred(d)
79 yield wfd
80
81 wfd = waitForDeferred(self.channel(1))
82 yield wfd
83 chan = wfd.getResult()
84
85 wfd = waitForDeferred(chan.channel_open())
86 yield wfd
7887
7988 # declare the exchange and queue
80 yield chan.exchange_declare(exchange=exchange, type="topic",
81 durable=True, auto_delete=False)
89 d = chan.exchange_declare(exchange=exchange, type="topic",
90 durable=True, auto_delete=False)
91 wfd = waitForDeferred(d)
92 yield wfd
8293
8394 # we use a private queue to avoid conflicting with existing bindings
84 reply = yield chan.queue_declare(exclusive=True)
95 wfd = waitForDeferred(chan.queue_declare(exclusive=True))
96 yield wfd
97 reply = wfd.getResult()
8598 my_queue = reply.queue
8699
87100 # bind each configured metric pattern
88101 for bind_pattern in settings.BIND_PATTERNS:
89102 log.listener("binding exchange '%s' to queue '%s' with pattern %s" \
90103 % (exchange, my_queue, bind_pattern))
91 yield chan.queue_bind(exchange=exchange, queue=my_queue,
92 routing_key=bind_pattern)
93
94 yield chan.basic_consume(queue=my_queue, no_ack=True,
95 consumer_tag=self.consumer_tag)
96 @inlineCallbacks
104 d = chan.queue_bind(exchange=exchange, queue=my_queue,
105 routing_key=bind_pattern)
106 wfd = waitForDeferred(d)
107 yield wfd
108
109 d = chan.basic_consume(queue=my_queue, no_ack=True,
110 consumer_tag=self.consumer_tag)
111 wfd = waitForDeferred(d)
112 yield wfd
113
114 @deferredGenerator
97115 def receive_loop(self):
98 queue = yield self.queue(self.consumer_tag)
116 wfd = waitForDeferred(self.queue(self.consumer_tag))
117 yield wfd
118 queue = wfd.getResult()
99119
100120 while True:
101 msg = yield queue.get()
121 wfd = waitForDeferred(queue.get())
122 yield wfd
123 msg = wfd.getResult()
102124 self.processMessage(msg)
103125
104126 def processMessage(self, message):
119141 else:
120142 value, timestamp = line.split()
121143 datapoint = ( float(timestamp), float(value) )
144 if datapoint[1] != datapoint[1]: # filter out NaN values
145 continue
122146 except ValueError:
123147 log.listener("invalid message line: %s" % (line,))
124148 continue
1919 import time
2020 from optparse import OptionParser
2121
22 from twisted.internet.defer import inlineCallbacks
22 from twisted.python.failure import Failure
23 from twisted.internet.defer import deferredGenerator, waitForDeferred
2324 from twisted.internet import reactor, task
2425 from twisted.internet.protocol import ClientCreator
2526 from txamqp.protocol import AMQClient
2728 from txamqp.content import Content
2829 import txamqp.spec
2930
30
31 @inlineCallbacks
31 @deferredGenerator
3232 def writeMetric(metric_path, value, timestamp, host, port, username, password,
3333 vhost, exchange, spec=None, channel_number=1, ssl=False):
3434
4242 vhost=vhost, spec=spec)
4343 if ssl:
4444 from twisted.internet.ssl import ClientContextFactory
45 conn = yield connector.connectSSL(host, port, ClientContextFactory())
45 wfd = waitForDeferred(connector.connectSSL(host, port,
46 ClientContextFactory()))
47 yield wfd
48 conn = wfd.getResult()
4649 else:
47 conn = yield connector.connectTCP(host, port)
50 wfd = waitForDeferred(connector.connectTCP(host, port))
51 yield wfd
52 conn = wfd.getResult()
4853
49 yield conn.authenticate(username, password)
50 channel = yield conn.channel(channel_number)
51 yield channel.channel_open()
54 wfd = waitForDeferred(conn.authenticate(username, password))
55 yield wfd
5256
53 yield channel.exchange_declare(exchange=exchange, type="topic",
54 durable=True, auto_delete=False)
57 wfd = waitForDeferred(conn.channel(channel_number))
58 yield wfd
59 channel = wfd.getResult()
60
61 wfd = waitForDeferred(channel.channel_open())
62 yield wfd
63
64 wfd = waitForDeferred(channel.exchange_declare(exchange=exchange,
65 type="topic",
66 durable=True,
67 auto_delete=False))
68 yield wfd
5569
5670 message = Content( "%f %d" % (value, timestamp) )
5771 message["delivery mode"] = 2
5872
59 channel.basic_publish(exchange=exchange, content=message, routing_key=metric_path)
60 yield channel.channel_close()
61
73 channel.basic_publish(exchange=exchange, content=message,
74 routing_key=metric_path)
75 wfd = waitForDeferred(channel.channel_close())
76 yield wfd
6277
6378 def main():
6479 parser = OptionParser(usage="%prog [options] <metric> <value> [timestamp]")
1111 See the License for the specific language governing permissions and
1212 limitations under the License."""
1313
14 from threading import Lock
14 import time
15 from collections import deque
1516 from carbon.conf import settings
17 try:
18 from collections import defaultdict
19 except:
20 from util import defaultdict
1621
1722
18 class MetricCache(dict):
19 def __init__(self):
20 self.size = 0
21 self.lock = Lock()
23 class _MetricCache(defaultdict):
24 def __init__(self, defaultfactory=deque, method="sorted"):
25 self.method = method
26 if self.method == "sorted":
27 self.queue = self.gen_queue()
28 else:
29 self.queue = False
30 super(_MetricCache, self).__init__(defaultfactory)
2231
23 def __setitem__(self, key, value):
24 raise TypeError("Use store() method instead!")
32 def gen_queue(self):
33 while True:
34 t = time.time()
35 queue = sorted(self.counts, key=lambda x: x[1])
36 if settings.LOG_CACHE_QUEUE_SORTS:
37 log.debug("Sorted %d cache queues in %.6f seconds" % (len(queue), time.time() - t))
38 while queue:
39 yield queue.pop()[0]
40
41 @property
42 def size(self):
43 return reduce(lambda x, y: x + len(y), self.values(), 0)
2544
2645 def store(self, metric, datapoint):
27 try:
28 self.lock.acquire()
29 self.setdefault(metric, []).append(datapoint)
30 self.size += 1
31 finally:
32 self.lock.release()
33
46 self[metric].append(datapoint)
3447 if self.isFull():
3548 log.msg("MetricCache is full: self.size=%d" % self.size)
3649 state.events.cacheFull()
3750
3851 def isFull(self):
39 return self.size >= settings.MAX_CACHE_SIZE
52 # Short circuit this test if there is no max cache size, then we don't need
53 # to do the someone expensive work of calculating the current size.
54 return settings.MAX_CACHE_SIZE != float('inf') and self.size >= settings.MAX_CACHE_SIZE
4055
41 def pop(self, metric):
42 try:
43 self.lock.acquire()
44 datapoints = dict.pop(self, metric)
45 self.size -= len(datapoints)
46 return datapoints
47 finally:
48 self.lock.release()
56 def pop(self, metric=None):
57 if not self:
58 raise KeyError(metric)
59 elif not metric and self.method == "max":
60 metric = max(self.items(), key=lambda x: len(x[1]))[0]
61 elif not metric and self.method == "naive":
62 return self.popitem()
63 elif not metric and self.method == "sorted":
64 metric = self.queue.next()
65 datapoints = (metric, super(_MetricCache, self).pop(metric))
66 return datapoints
4967
68 @property
5069 def counts(self):
51 try:
52 self.lock.acquire()
53 return [ (metric, len(datapoints)) for (metric, datapoints) in self.items() ]
54 finally:
55 self.lock.release()
70 return [(metric, len(datapoints)) for (metric, datapoints) in self.items()]
5671
5772
5873 # Ghetto singleton
59 MetricCache = MetricCache()
74
75 MetricCache = _MetricCache(method=settings.CACHE_WRITE_STRATEGY)
6076
6177
6278 # Avoid import circularities
44 from twisted.protocols.basic import Int32StringReceiver
55 from carbon.conf import settings
66 from carbon.util import pickle
7 from carbon import log, state, events, instrumentation
7 from carbon import log, state, instrumentation
88
99
1010 SEND_QUEUE_LOW_WATERMARK = settings.MAX_QUEUE_SIZE * 0.8
7171 if (self.factory.queueFull.called and
7272 queueSize < SEND_QUEUE_LOW_WATERMARK):
7373 self.factory.queueHasSpace.callback(queueSize)
74
75 if (settings.USE_FLOW_CONTROL and
76 state.metricReceiversPaused):
77 log.clients('%s resuming paused clients' % self)
78 events.resumeReceivingMetrics()
7974
8075 def __str__(self):
8176 return 'CarbonClientProtocol(%s:%d:%s)' % (self.factory.destination)
108103 self.queuedUntilConnected = 'destinations.%s.queuedUntilConnected' % self.destinationName
109104
110105 def queueFullCallback(self, result):
106 state.events.cacheFull()
111107 log.clients('%s send queue is full (%d datapoints)' % (self, result))
112
108
113109 def queueSpaceCallback(self, result):
114110 if self.queueFull.called:
115111 log.clients('%s send queue has space available' % self.connectedProtocol)
116112 self.queueFull = Deferred()
117113 self.queueFull.addCallback(self.queueFullCallback)
114 state.events.cacheSpaceAvailable()
118115 self.queueHasSpace = Deferred()
119116 self.queueHasSpace.addCallback(self.queueSpaceCallback)
120117
2222
2323 import whisper
2424 from carbon import log
25 from carbon.exceptions import CarbonConfigException
2526
2627 from twisted.python import usage
2728
4142 CACHE_QUERY_INTERFACE='0.0.0.0',
4243 CACHE_QUERY_PORT=7002,
4344 LOG_UPDATES=True,
45 LOG_CACHE_HITS=True,
46 LOG_CACHE_QUEUE_SORTS=True,
4447 WHISPER_AUTOFLUSH=False,
4548 WHISPER_SPARSE_CREATE=False,
49 WHISPER_FALLOCATE_CREATE=False,
4650 WHISPER_LOCK_WRITES=False,
4751 MAX_DATAPOINTS_PER_MESSAGE=500,
4852 MAX_AGGREGATION_INTERVALS=5,
53 FORWARD_ALL=False,
4954 MAX_QUEUE_SIZE=1000,
5055 ENABLE_AMQP=False,
5156 AMQP_VERBOSE=False,
6368 USE_WHITELIST=False,
6469 CARBON_METRIC_PREFIX='carbon',
6570 CARBON_METRIC_INTERVAL=60,
71 CACHE_WRITE_STRATEGY='sorted',
72 WRITE_BACK_FREQUENCY=None,
73 ENABLE_LOGROTATION=True,
74 LOG_LISTENER_CONNECTIONS=True,
6675 )
6776
68
69 def _umask(value):
70 return int(value, 8)
7177
7278 def _process_alive(pid):
7379 if exists("/proc"):
8894 _ordered_sections = []
8995
9096 def read(self, path):
97 # Verifies a file exists *and* is readable
98 if not os.access(path, os.R_OK):
99 raise CarbonConfigException("Error: Missing config file or wrong perms on %s" % path)
100
91101 result = ConfigParser.read(self, path)
92
93102 sections = []
94103 for line in open(path):
95104 line = line.strip()
96105
97106 if line.startswith('[') and line.endswith(']'):
98 sections.append( line[1:-1] )
107 sections.append(line[1:-1])
99108
100109 self._ordered_sections = sections
101110
102111 return result
103112
104113 def sections(self):
105 return list( self._ordered_sections ) # return a copy for safety
114 return list(self._ordered_sections) # return a copy for safety
106115
107116
108117 class Settings(dict):
115124 def readFrom(self, path, section):
116125 parser = ConfigParser()
117126 if not parser.read(path):
118 raise Exception("Failed to read config file %s" % path)
127 raise CarbonConfigException("Failed to read config file %s" % path)
119128
120129 if not parser.has_section(section):
121130 return
122131
123 for key,value in parser.items(section):
132 for key, value in parser.items(section):
124133 key = key.upper()
125134
126135 # Detect type from defaults dict
127136 if key in defaults:
128 valueType = type( defaults[key] )
137 valueType = type(defaults[key])
129138 else:
130139 valueType = str
131140
132141 if valueType is list:
133 value = [ v.strip() for v in value.split(',') ]
142 value = [v.strip() for v in value.split(',')]
134143
135144 elif valueType is bool:
136145 value = parser.getboolean(section, key)
156165
157166 optFlags = [
158167 ["debug", "", "Run in debug mode."],
159 ]
168 ]
160169
161170 optParameters = [
162171 ["config", "c", None, "Use the given config file."],
164173 ["logdir", "", None, "Write logs to the given directory."],
165174 ["whitelist", "", None, "List of metric patterns to allow."],
166175 ["blacklist", "", None, "List of metric patterns to disallow."],
167 ]
176 ]
168177
169178 def postOptions(self):
170179 global settings
207216 log.msg("Enabling Whisper autoflush")
208217 whisper.AUTOFLUSH = True
209218
219 if settings.WHISPER_FALLOCATE_CREATE:
220 if whisper.CAN_FALLOCATE:
221 log.msg("Enabling Whisper fallocate support")
222 else:
223 log.err("WHISPER_FALLOCATE_CREATE is enabled but linking failed.")
224
210225 if settings.WHISPER_LOCK_WRITES:
211226 if whisper.CAN_LOCK:
212227 log.msg("Enabling Whisper file locking")
214229 else:
215230 log.err("WHISPER_LOCK_WRITES is enabled but import of fcntl module failed.")
216231
232 if settings.CACHE_WRITE_STRATEGY not in ('sorted', 'max', 'naive'):
233 log.err("%s is not a valid value for CACHE_WRITE_STRATEGY, defaulting to %s" %
234 (settings.CACHE_WRITE_STRATEGY, defaults['CACHE_WRITE_STRATEGY']))
235 else:
236 log.msg("Using %s write strategy for cache" %
237 settings.CACHE_WRITE_STRATEGY)
217238 if not "action" in self:
218239 self["action"] = "start"
219240 self.handleAction()
350371
351372 optParameters = [
352373 ["rules", "", None, "Use the given relay rules file."],
374 ["aggregation-rules", "", None, "Use the given aggregation rules file."],
353375 ] + CarbonCacheOptions.optParameters
354376
355377 def postOptions(self):
358380 self["rules"] = join(settings["CONF_DIR"], "relay-rules.conf")
359381 settings["relay-rules"] = self["rules"]
360382
361 if settings["RELAY_METHOD"] not in ("rules", "consistent-hashing"):
383 if self["aggregation-rules"] is None:
384 self["aggregation-rules"] = join(settings["CONF_DIR"], "aggregation-rules.conf")
385 settings["aggregation-rules"] = self["aggregation-rules"]
386
387 if settings["RELAY_METHOD"] not in ("rules", "consistent-hashing", "aggregated-consistent-hashing"):
362388 print ("In carbon.conf, RELAY_METHOD must be either 'rules' or "
363 "'consistent-hashing'. Invalid value: '%s'" %
389 "'consistent-hashing' or 'aggregated-consistent-hashing'. Invalid value: '%s'" %
364390 settings.RELAY_METHOD)
365391 sys.exit(1)
366392
377403 parser.add_option(
378404 "--pidfile", default=None,
379405 help="Write pid to the given file")
406 parser.add_option(
407 "--umask", default=None,
408 help="Use the given umask when creating files")
380409 parser.add_option(
381410 "--config",
382411 default=None,
454483 if graphite_root is None:
455484 graphite_root = os.environ.get('GRAPHITE_ROOT')
456485 if graphite_root is None:
457 raise ValueError("Either ROOT_DIR or GRAPHITE_ROOT "
486 raise CarbonConfigException("Either ROOT_DIR or GRAPHITE_ROOT "
458487 "needs to be provided.")
459488
460489 # Default config directory to root-relative, unless overriden by the
491520 config = options["config"]
492521
493522 if not exists(config):
494 raise ValueError("Error: missing required config %r" % config)
523 raise CarbonConfigException("Error: missing required config %r" % config)
495524
496525 settings.readFrom(config, section)
497526 settings.setdefault("instance", options["instance"])
508537 (program, options["instance"])))
509538 settings["LOG_DIR"] = (options["logdir"] or
510539 join(settings["LOG_DIR"],
511 "%s-%s" % (program ,options["instance"])))
540 "%s-%s" % (program, options["instance"])))
512541 else:
513542 settings["pidfile"] = (
514543 options["pidfile"] or
0 class CarbonConfigException(Exception):
1 """Raised when a carbon daemon is improperly configured"""
22 except ImportError:
33 from md5 import md5
44 import bisect
5 from carbon.conf import settings
65
76
87 class ConsistentHashRing:
3231
3332 def get_node(self, key):
3433 assert self.ring
35 position = self.compute_ring_position(key)
36 search_entry = (position, None)
37 index = bisect.bisect_left(self.ring, search_entry) % len(self.ring)
38 entry = self.ring[index]
39 return entry[1]
34 node = None
35 node_iter = self.get_nodes(key)
36 node = node_iter.next()
37 node_iter.close()
38 return node
4039
4140 def get_nodes(self, key):
42 nodes = []
41 assert self.ring
42 nodes = set()
4343 position = self.compute_ring_position(key)
4444 search_entry = (position, None)
4545 index = bisect.bisect_left(self.ring, search_entry) % len(self.ring)
4848 next_entry = self.ring[index]
4949 (position, next_node) = next_entry
5050 if next_node not in nodes:
51 nodes.append(next_node)
51 nodes.add(next_node)
52 yield next_node
5253
5354 index = (index + 1) % len(self.ring)
54
55 return nodes
7777 cacheQueries = myStats.get('cacheQueries', 0)
7878 cacheOverflow = myStats.get('cache.overflow', 0)
7979
80 # Calculate cache-data-structure-derived metrics prior to storing anything
81 # in the cache itself -- which would otherwise affect said metrics.
82 cache_size = cache.MetricCache.size
83 cache_queues = len(cache.MetricCache)
84 record('cache.size', cache_size)
85 record('cache.queues', cache_queues)
86
8087 if updateTimes:
8188 avgUpdateTime = sum(updateTimes) / len(updateTimes)
8289 record('avgUpdateTime', avgUpdateTime)
9097 record('creates', creates)
9198 record('errors', errors)
9299 record('cache.queries', cacheQueries)
93 record('cache.queues', len(cache.MetricCache))
94 record('cache.size', cache.MetricCache.size)
95100 record('cache.overflow', cacheOverflow)
96101
97102 # aggregator metrics
105110 # relay metrics
106111 else:
107112 record = relay_record
113 prefix = 'destinations.'
114 relay_stats = [(k,v) for (k,v) in myStats.items() if k.startswith(prefix)]
115 for stat_name, stat_value in relay_stats:
116 record(stat_name, stat_value)
108117
109118 # common metrics
110119 record('metricsReceived', myStats.get('metricsReceived', 0))
00 import time
1 from os.path import exists
12 from sys import stdout, stderr
23 from zope.interface import implements
34 from twisted.python.log import startLoggingWithObserver, textFromEventDict, msg, err, ILogObserver
45 from twisted.python.syslog import SyslogObserver
56 from twisted.python.logfile import DailyLogFile
67
8
9 class CarbonLogFile(DailyLogFile):
10 """Overridden to support logrotate.d"""
11 def __init__(self, *args, **kwargs):
12 DailyLogFile.__init__(self, *args, **kwargs)
13 # avoid circular dependencies
14 from carbon.conf import settings
15 self.enableRotation = settings.ENABLE_LOGROTATION
16
17 def shouldRotate(self):
18 if self.enableRotation:
19 return DailyLogFile.shouldRotate(self)
20 else:
21 return False
22
23 def write(self, data):
24 if not self.enableRotation:
25 if not exists(self.path):
26 self.reopen()
27 DailyLogFile.write(self, data)
28
29 # Backport from twisted >= 10
30 def reopen(self):
31 self.close()
32 self._openFile()
33
34
735 class CarbonLogObserver(object):
836 implements(ILogObserver)
937
1038 def log_to_dir(self, logdir):
1139 self.logdir = logdir
12 self.console_logfile = DailyLogFile('console.log', logdir)
40 self.console_logfile = CarbonLogFile('console.log', logdir)
1341 self.custom_logs = {}
1442 self.observer = self.logdir_observer
1543
1644 def log_to_syslog(self, prefix):
1745 observer = SyslogObserver(prefix).emit
46
1847 def syslog_observer(event):
1948 event["system"] = event.get("type", "console")
2049 observer(event)
2453 return self.observer(event)
2554
2655 def stdout_observer(self, event):
27 stdout.write( formatEvent(event, includeType=True) + '\n' )
56 stdout.write(formatEvent(event, includeType=True) + '\n')
2857 stdout.flush()
2958
3059 def logdir_observer(self, event):
3261 log_type = event.get('type')
3362
3463 if log_type is not None and log_type not in self.custom_logs:
35 self.custom_logs[log_type] = DailyLogFile(log_type + '.log', self.logdir)
64 self.custom_logs[log_type] = CarbonLogFile(log_type + '.log', self.logdir)
3665
3766 logfile = self.custom_logs.get(log_type, self.console_logfile)
3867 logfile.write(message + '\n')
4069
4170 # Default to stdout
4271 observer = stdout_observer
43
72
4473
4574 carbonLogObserver = CarbonLogObserver()
4675
6291
6392 logToSyslog = carbonLogObserver.log_to_syslog
6493
94
6595 def logToStdout():
6696 startLoggingWithObserver(carbonLogObserver)
97
6798
6899 def cache(message, **context):
69100 context['type'] = 'cache'
70101 msg(message, **context)
71102
103
72104 def clients(message, **context):
73105 context['type'] = 'clients'
74106 msg(message, **context)
107
75108
76109 def creates(message, **context):
77110 context['type'] = 'creates'
78111 msg(message, **context)
79112
113
80114 def updates(message, **context):
81115 context['type'] = 'updates'
82116 msg(message, **context)
117
83118
84119 def listener(message, **context):
85120 context['type'] = 'listener'
86121 msg(message, **context)
87122
123
88124 def relay(message, **context):
89125 context['type'] = 'relay'
90126 msg(message, **context)
127
91128
92129 def aggregator(message, **context):
93130 context['type'] = 'aggregator'
94131 msg(message, **context)
95132
133
96134 def query(message, **context):
97135 context['type'] = 'query'
98136 msg(message, **context)
137
99138
100139 def debug(message, **context):
101140 if debugEnabled:
102141 msg(message, **context)
103142
104143 debugEnabled = False
144
145
105146 def setDebugEnabled(enabled):
106147 global debugEnabled
107148 debugEnabled = enabled
11 import whisper
22 from carbon import log
33 from carbon.storage import getFilesystemPath
4
54
65
76 def getMetadata(metric, key):
0 from twisted.internet import reactor
0 import time
1
12 from twisted.internet.protocol import DatagramProtocol
23 from twisted.internet.error import ConnectionDone
34 from twisted.protocols.basic import LineOnlyReceiver, Int32StringReceiver
1314 """
1415 def connectionMade(self):
1516 self.peerName = self.getPeerName()
16 log.listener("%s connection with %s established" % (self.__class__.__name__, self.peerName))
17 if settings.LOG_LISTENER_CONNECTIONS:
18 log.listener("%s connection with %s established" % (self.__class__.__name__, self.peerName))
1719
1820 if state.metricReceiversPaused:
1921 self.pauseReceiving()
3739
3840 def connectionLost(self, reason):
3941 if reason.check(ConnectionDone):
40 log.listener("%s connection with %s closed cleanly" % (self.__class__.__name__, self.peerName))
42 if settings.LOG_LISTENER_CONNECTIONS:
43 log.listener("%s connection with %s closed cleanly" % (self.__class__.__name__, self.peerName))
4144 else:
4245 log.listener("%s connection with %s lost: %s" % (self.__class__.__name__, self.peerName, reason.value))
4346
5255 if WhiteList and metric not in WhiteList:
5356 instrumentation.increment('whitelistRejects')
5457 return
55 if datapoint[1] == datapoint[1]: # filter out NaN values
56 events.metricReceived(metric, datapoint)
58 if datapoint[1] != datapoint[1]: # filter out NaN values
59 return
60 if int(datapoint[0]) == -1: # use current time if none given
61 datapoint = (time.time(), datapoint[1])
62
63 events.metricReceived(metric, datapoint)
5764
5865
5966 class MetricLineReceiver(MetricReceiver, LineOnlyReceiver):
6269 def lineReceived(self, line):
6370 try:
6471 metric, value, timestamp = line.strip().split()
65 datapoint = ( float(timestamp), float(value) )
72 datapoint = (float(timestamp), float(value))
6673 except:
6774 log.listener('invalid line received from client %s, ignoring' % self.peerName)
6875 return
7582 for line in data.splitlines():
7683 try:
7784 metric, value, timestamp = line.strip().split()
78 datapoint = ( float(timestamp), float(value) )
85 datapoint = (float(timestamp), float(value))
7986
8087 self.metricReceived(metric, datapoint)
8188 except:
96103 log.listener('invalid pickle received from %s, ignoring' % self.peerName)
97104 return
98105
99 for (metric, datapoint) in datapoints:
106 for raw in datapoints:
100107 try:
101 datapoint = ( float(datapoint[0]), float(datapoint[1]) ) #force proper types
108 (metric, (value, timestamp)) = raw
109 except Exception, e:
110 log.listener('Error decoding pickle: %s' % e)
111 try:
112 datapoint = (float(value), float(timestamp)) # force proper types
102113 except:
103114 continue
104115
124135 metric = request['metric']
125136 datapoints = MetricCache.get(metric, [])
126137 result = dict(datapoints=datapoints)
127 log.query('[%s] cache query for \"%s\" returned %d values' % (self.peerAddr, metric, len(datapoints)))
138 if settings.LOG_CACHE_HITS is True:
139 log.query('[%s] cache query for \"%s\" returned %d values' % (self.peerAddr, metric, len(datapoints)))
128140 instrumentation.increment('cacheQueries')
129141
130142 elif request['type'] == 'get-metadata':
3737 new_regex_list = []
3838 for line in open(self.list_file):
3939 pattern = line.strip()
40 if line.startswith('#') or not line:
40 if line.startswith('#') or not pattern:
4141 continue
4242 try:
4343 new_regex_list.append(re.compile(pattern))
00 import re
11 from carbon.conf import OrderedConfigParser
22 from carbon.util import parseDestinations
3 from carbon.exceptions import CarbonConfigException
34
45
56 class RelayRule:
1718 parser = OrderedConfigParser()
1819
1920 if not parser.read(path):
20 raise Exception("Could not read rules file %s" % path)
21 raise CarbonConfigException("Could not read rules file %s" % path)
2122
2223 defaultRule = None
2324 for section in parser.sections():
2425 if not parser.has_option(section, 'destinations'):
25 raise ValueError("Rules file %s section %s does not define a "
26 raise CarbonConfigException("Rules file %s section %s does not define a "
2627 "'destinations' list" % (path, section))
2728
2829 destination_strings = parser.get(section, 'destinations').split(',')
3031
3132 if parser.has_option(section, 'pattern'):
3233 if parser.has_option(section, 'default'):
33 raise Exception("Section %s contains both 'pattern' and "
34 raise CarbonConfigException("Section %s contains both 'pattern' and "
3435 "'default'. You must use one or the other." % section)
3536 pattern = parser.get(section, 'pattern')
3637 regex = re.compile(pattern, re.I)
4647 if not parser.getboolean(section, 'default'):
4748 continue # just ignore default = false
4849 if defaultRule:
49 raise Exception("Only one default rule can be specified")
50 raise CarbonConfigException("Only one default rule can be specified")
5051 defaultRule = RelayRule(condition=lambda metric: True,
5152 destinations=destinations)
5253
5354 if not defaultRule:
54 raise Exception("No default rule defined. You must specify exactly one "
55 raise CarbonConfigException("No default rule defined. You must specify exactly one "
5556 "rule with 'default = true' instead of a pattern.")
5657
5758 rules.append(defaultRule)
4242 class ConsistentHashingRouter(DatapointRouter):
4343 def __init__(self, replication_factor=1):
4444 self.replication_factor = int(replication_factor)
45 self.instance_ports = {} # { (server, instance) : port }
45 self.instance_ports = {} # { (server, instance) : port }
4646 self.ring = ConsistentHashRing([])
4747
4848 def addDestination(self, destination):
4949 (server, port, instance) = destination
5050 if (server, instance) in self.instance_ports:
5151 raise Exception("destination instance (%s, %s) already configured" % (server, instance))
52 self.instance_ports[ (server, instance) ] = port
53 self.ring.add_node( (server, instance) )
52 self.instance_ports[(server, instance)] = port
53 self.ring.add_node((server, instance))
5454
5555 def removeDestination(self, destination):
5656 (server, port, instance) = destination
5757 if (server, instance) not in self.instance_ports:
5858 raise Exception("destination instance (%s, %s) not configured" % (server, instance))
59 del self.instance_ports[ (server, instance) ]
60 self.ring.remove_node( (server, instance) )
59 del self.instance_ports[(server, instance)]
60 self.ring.remove_node((server, instance))
6161
6262 def getDestinations(self, metric):
6363 key = self.getKey(metric)
6464
65 used_servers = set()
66 for (server, instance) in self.ring.get_nodes(key):
67 if server in used_servers:
68 continue
69 else:
70 used_servers.add(server)
71 port = self.instance_ports[ (server, instance) ]
72 yield (server, port, instance)
73
74 if len(used_servers) >= self.replication_factor:
65 for count,node in enumerate(self.ring.get_nodes(key)):
66 if count == self.replication_factor:
7567 return
68 (server, instance) = node
69 port = self.instance_ports[ (server, instance) ]
70 yield (server, port, instance)
7671
7772 def getKey(self, metric):
7873 return metric
8782 module = imp.load_module('keyfunc_module', module_file, module_path, description)
8883 keyfunc = getattr(module, func_name)
8984 self.setKeyFunction(keyfunc)
85
86 class AggregatedConsistentHashingRouter(DatapointRouter):
87 def __init__(self, agg_rules_manager, replication_factor=1):
88 self.hash_router = ConsistentHashingRouter(replication_factor)
89 self.agg_rules_manager = agg_rules_manager
90
91 def addDestination(self, destination):
92 self.hash_router.addDestination(destination)
93
94 def removeDestination(self, destination):
95 self.hash_router.removeDestination(destination)
96
97 def getDestinations(self, key):
98 # resolve metric to aggregate forms
99 resolved_metrics = []
100 for rule in self.agg_rules_manager.rules:
101 aggregate_metric = rule.get_aggregate_metric(key)
102 if aggregate_metric is None:
103 continue
104 else:
105 resolved_metrics.append(aggregate_metric)
106
107 # if the metric will not be aggregated, send it raw
108 # (will pass through aggregation)
109 if len(resolved_metrics) == 0:
110 resolved_metrics.append(key)
111
112 # get consistent hashing destinations based on aggregate forms
113 destinations = set()
114 for resolved_metric in resolved_metrics:
115 for destination in self.hash_router.getDestinations(resolved_metric):
116 destinations.add(destination)
117
118 for destination in destinations:
119 yield destination
2121 # Attaching modules to the global state module simplifies import order hassles
2222 from carbon import util, state, events, instrumentation
2323 from carbon.log import carbonLogObserver
24 from carbon.exceptions import CarbonConfigException
2425 state.events = events
2526 state.instrumentation = instrumentation
2627
3435 parent.setComponent(ILogObserver, carbonLogObserver)
3536
3637
37
3838 def createBaseService(config):
3939 from carbon.conf import settings
4040 from carbon.protocols import (MetricLineReceiver, MetricPickleReceiver,
5151 amqp_port = settings.get("AMQP_PORT", 5672)
5252 amqp_user = settings.get("AMQP_USER", "guest")
5353 amqp_password = settings.get("AMQP_PASSWORD", "guest")
54 amqp_verbose = settings.get("AMQP_VERBOSE", False)
55 amqp_vhost = settings.get("AMQP_VHOST", "/")
56 amqp_spec = settings.get("AMQP_SPEC", None)
54 amqp_verbose = settings.get("AMQP_VERBOSE", False)
55 amqp_vhost = settings.get("AMQP_VHOST", "/")
56 amqp_spec = settings.get("AMQP_SPEC", None)
5757 amqp_exchange_name = settings.get("AMQP_EXCHANGE", "graphite")
58
5958
6059 for interface, port, protocol in ((settings.LINE_RECEIVER_INTERFACE,
6160 settings.LINE_RECEIVER_PORT,
9392 service.setServiceParent(root_service)
9493
9594 if settings.USE_WHITELIST:
96 from carbon.regexlist import WhiteList,BlackList
95 from carbon.regexlist import WhiteList, BlackList
9796 WhiteList.read_from(settings["whitelist"])
9897 BlackList.read_from(settings["blacklist"])
9998
159158 RewriteRuleManager.read_from(settings["rewrite-rules"])
160159
161160 if not settings.DESTINATIONS:
162 raise Exception("Required setting DESTINATIONS is missing from carbon.conf")
161 raise CarbonConfigException("Required setting DESTINATIONS is missing from carbon.conf")
163162
164163 for destination in util.parseDestinations(settings.DESTINATIONS):
165164 client_manager.startClient(destination)
168167
169168
170169 def createRelayService(config):
171 from carbon.routers import RelayRulesRouter, ConsistentHashingRouter
170 from carbon.routers import RelayRulesRouter, ConsistentHashingRouter, AggregatedConsistentHashingRouter
172171 from carbon.client import CarbonClientManager
173172 from carbon.conf import settings
174173 from carbon import events
180179 router = RelayRulesRouter(settings["relay-rules"])
181180 elif settings.RELAY_METHOD == 'consistent-hashing':
182181 router = ConsistentHashingRouter(settings.REPLICATION_FACTOR)
182 elif settings.RELAY_METHOD == 'aggregated-consistent-hashing':
183 from carbon.aggregator.rules import RuleManager
184 RuleManager.read_from(settings["aggregation-rules"])
185 router = AggregatedConsistentHashingRouter(RuleManager, settings.REPLICATION_FACTOR)
183186
184187 client_manager = CarbonClientManager(router)
185188 client_manager.setServiceParent(root_service)
188191 events.metricGenerated.addHandler(client_manager.sendDatapoint)
189192
190193 if not settings.DESTINATIONS:
191 raise Exception("Required setting DESTINATIONS is missing from carbon.conf")
194 raise CarbonConfigException("Required setting DESTINATIONS is missing from carbon.conf")
192195
193196 for destination in util.parseDestinations(settings.DESTINATIONS):
194197 client_manager.startClient(destination)
1111 See the License for the specific language governing permissions and
1212 limitations under the License."""
1313
14 import os, re
14 import os
15 import re
1516 import whisper
1617
17 from os.path import join, exists
18 from os.path import join, exists, sep
1819 from carbon.conf import OrderedConfigParser, settings
1920 from carbon.util import pickle
2021 from carbon import log
21
22 from carbon.exceptions import CarbonConfigException
2223
2324 STORAGE_SCHEMAS_CONFIG = join(settings.CONF_DIR, 'storage-schemas.conf')
2425 STORAGE_AGGREGATION_CONFIG = join(settings.CONF_DIR, 'storage-aggregation.conf')
2526 STORAGE_LISTS_DIR = join(settings.CONF_DIR, 'lists')
2627
28
2729 def getFilesystemPath(metric):
28 return join(settings.LOCAL_DATA_DIR, metric.replace('.','/')) + '.wsp'
30 metric_path = metric.replace('.', sep).lstrip(sep) + '.wsp'
31 return join(settings.LOCAL_DATA_DIR, metric_path)
2932
3033
3134 class Schema:
3336 raise NotImplementedError()
3437
3538 def matches(self, metric):
36 return bool( self.test(metric) )
39 return bool(self.test(metric))
3740
3841
3942 class DefaultSchema(Schema):
9194
9295 class Archive:
9396
94 def __init__(self,secondsPerPoint,points):
97 def __init__(self, secondsPerPoint, points):
9598 self.secondsPerPoint = int(secondsPerPoint)
9699 self.points = int(points)
97100
98101 def __str__(self):
99 return "Archive = (Seconds per point: %d, Datapoints to save: %d)" % (self.secondsPerPoint, self.points)
102 return "Archive = (Seconds per point: %d, Datapoints to save: %d)" % (self.secondsPerPoint, self.points)
100103
101104 def getTuple(self):
102 return (self.secondsPerPoint,self.points)
105 return (self.secondsPerPoint, self.points)
103106
104107 @staticmethod
105108 def fromString(retentionDef):
113116 config.read(STORAGE_SCHEMAS_CONFIG)
114117
115118 for section in config.sections():
116 options = dict( config.items(section) )
119 options = dict(config.items(section))
117120 matchAll = options.get('match-all')
118121 pattern = options.get('pattern')
119122 listName = options.get('list')
120123
121124 retentions = options['retentions'].split(',')
122 archives = [ Archive.fromString(s) for s in retentions ]
123
125 archives = [Archive.fromString(s) for s in retentions]
126
124127 if matchAll:
125128 mySchema = DefaultSchema(section, archives)
126129
129132
130133 elif listName:
131134 mySchema = ListSchema(section, listName, archives)
132
135
133136 archiveList = [a.getTuple() for a in archives]
134137
135138 try:
136139 whisper.validateArchiveList(archiveList)
137140 schemaList.append(mySchema)
138 except InvalidConfiguration, e:
139 log.msg("Invalid schemas found in %s: %s" % (section, e.message) )
140
141 except whisper.InvalidConfiguration, e:
142 log.msg("Invalid schemas found in %s: %s" % (section, e))
143
141144 schemaList.append(defaultSchema)
142145 return schemaList
143146
149152
150153 try:
151154 config.read(STORAGE_AGGREGATION_CONFIG)
152 except IOError:
155 except (IOError, CarbonConfigException):
153156 log.msg("%s not found, ignoring." % STORAGE_AGGREGATION_CONFIG)
154157
155158 for section in config.sections():
156 options = dict( config.items(section) )
159 options = dict(config.items(section))
157160 matchAll = options.get('match-all')
158161 pattern = options.get('pattern')
159162 listName = options.get('list')
168171 if aggregationMethod is not None:
169172 assert aggregationMethod in whisper.aggregationMethods
170173 except:
171 log.msg("Invalid schemas found in %s." % section )
174 log.msg("Invalid schemas found in %s." % section)
172175 continue
173176
174177 archives = (xFilesFactor, aggregationMethod)
187190 schemaList.append(defaultAggregation)
188191 return schemaList
189192
190 defaultArchive = Archive(60, 60 * 24 * 7) #default retention for unclassified data (7 days of minutely data)
193 defaultArchive = Archive(60, 60 * 24 * 7) # default retention for unclassified data (7 days of minutely data)
191194 defaultSchema = DefaultSchema('default', [defaultArchive])
192195 defaultAggregation = DefaultSchema('default', (None, None))
0 import sys
0 import copy
11 import os
22 import pwd
3
4 from os.path import abspath, basename, dirname, join
3 import sys
4
5 from os.path import abspath, basename, dirname
56 try:
67 from cStringIO import StringIO
78 except ImportError:
1314 import pickle
1415 USING_CPICKLE = False
1516
17 from time import sleep, time
1618 from twisted.python.util import initgroups
1719 from twisted.scripts.twistd import runApp
1820 from twisted.scripts._twistd_unix import daemonize
1921
2022
21 daemonize = daemonize # Backwards compatibility
23 daemonize = daemonize # Backwards compatibility
2224
2325
2426 def dropprivs(user):
6971 twistd_options.append("--profile")
7072 if options.pidfile:
7173 twistd_options.extend(["--pidfile", options.pidfile])
74 if options.umask:
75 twistd_options.extend(["--umask", options.umask])
7276
7377 # Now for the plugin-specific options.
7478 twistd_options.append(program)
7882
7983 for option_name, option_value in vars(options).items():
8084 if (option_value is not None and
81 option_name not in ("debug", "profile", "pidfile")):
85 option_name not in ("debug", "profile", "pidfile", "umask")):
8286 twistd_options.extend(["--%s" % option_name.replace("_", "-"),
8387 option_value])
8488
104108 else:
105109 raise ValueError("Invalid destination string \"%s\"" % dest_string)
106110
107 destinations.append( (server, int(port), instance) )
111 destinations.append((server, int(port), instance))
108112
109113 return destinations
110
111114
112115
113116 # This whole song & dance is due to pickle being insecure
118121 if USING_CPICKLE:
119122 class SafeUnpickler(object):
120123 PICKLE_SAFE = {
121 'copy_reg' : set(['_reconstructor']),
122 '__builtin__' : set(['object']),
124 'copy_reg': set(['_reconstructor']),
125 '__builtin__': set(['object']),
123126 }
124127
125128 @classmethod
141144 else:
142145 class SafeUnpickler(pickle.Unpickler):
143146 PICKLE_SAFE = {
144 'copy_reg' : set(['_reconstructor']),
145 '__builtin__' : set(['object']),
147 'copy_reg': set(['_reconstructor']),
148 '__builtin__': set(['object']),
146149 }
150
147151 def find_class(self, module, name):
148152 if not module in self.PICKLE_SAFE:
149153 raise pickle.UnpicklingError('Attempting to unpickle unsafe module %s' % module)
152156 if not name in self.PICKLE_SAFE[module]:
153157 raise pickle.UnpicklingError('Attempting to unpickle unsafe class %s' % name)
154158 return getattr(mod, name)
155
159
156160 @classmethod
157161 def loads(cls, pickle_string):
158162 return cls(StringIO(pickle_string)).load()
159
163
160164
161165 def get_unpickler(insecure=False):
162166 if insecure:
163167 return pickle
164168 else:
165169 return SafeUnpickler
170
171
172 class TokenBucket(object):
173 '''This is a basic tokenbucket rate limiter implementation for use in
174 enforcing various configurable rate limits'''
175 def __init__(self, capacity, fill_rate):
176 '''Capacity is the total number of tokens the bucket can hold, fill rate is
177 the rate in tokens (or fractional tokens) to be added to the bucket per
178 second.'''
179 self.capacity = float(capacity)
180 self._tokens = float(capacity)
181 self.fill_rate = float(fill_rate)
182 self.timestamp = time()
183
184 def drain(self, cost, blocking=False):
185 '''Given a number of tokens (or fractions) drain will return True and
186 drain the number of tokens from the bucket if the capacity allows,
187 otherwise we return false and leave the contents of the bucket.'''
188 if cost <= self.tokens:
189 self._tokens -= cost
190 return True
191 else:
192 if blocking:
193 tokens_needed = cost - self._tokens
194 seconds_per_token = 1 / self.fill_rate
195 seconds_left = seconds_per_token * self.fill_rate
196 sleep(self.timestamp + seconds_left - time())
197 self._tokens -= cost
198 return True
199 return False
200
201 @property
202 def tokens(self):
203 '''The tokens property will return the current number of tokens in the
204 bucket.'''
205 if self._tokens < self.capacity:
206 now = time()
207 delta = self.fill_rate * (now - self.timestamp)
208 self._tokens = min(self.capacity, self._tokens + delta)
209 self.timestamp = now
210 return self._tokens
211
212
213 class defaultdict(dict):
214 def __init__(self, default_factory=None, *a, **kw):
215 if (default_factory is not None and not hasattr(default_factory, '__call__')):
216 raise TypeError('first argument must be callable')
217 dict.__init__(self, *a, **kw)
218 self.default_factory = default_factory
219
220 def __getitem__(self, key):
221 try:
222 return dict.__getitem__(self, key)
223 except KeyError:
224 return self.__missing__(key)
225
226 def __missing__(self, key):
227 if self.default_factory is None:
228 raise KeyError(key)
229 self[key] = value = self.default_factory()
230 return value
231
232 def __reduce__(self):
233 if self.default_factory is None:
234 args = tuple()
235 else:
236 args = self.default_factory,
237 return type(self), args, None, None, self.iteritems()
238
239 def copy(self):
240 return self.__copy__()
241
242 def __copy__(self):
243 return type(self)(self.default_factory, self)
244
245 def __deepcopy__(self, memo):
246 return type(self)(self.default_factory, copy.deepcopy(self.items()))
247
248 def __repr__(self):
249 return 'defaultdict(%s, %s)' % (self.default_factory, dict.__repr__(self))
1111 See the License for the specific language governing permissions and
1212 limitations under the License."""
1313
14
1514 import os
1615 import time
17 from os.path import join, exists, dirname, basename
16 from os.path import exists, dirname
1817
1918 import whisper
2019 from carbon import state
2120 from carbon.cache import MetricCache
22 from carbon.storage import getFilesystemPath, loadStorageSchemas, loadAggregationSchemas
21 from carbon.storage import getFilesystemPath, loadStorageSchemas,\
22 loadAggregationSchemas
2323 from carbon.conf import settings
2424 from carbon import log, events, instrumentation
25 from carbon.util import TokenBucket
2526
2627 from twisted.internet import reactor
2728 from twisted.internet.task import LoopingCall
2829 from twisted.application.service import Service
2930
3031
31 lastCreateInterval = 0
32 createCount = 0
33 schemas = loadStorageSchemas()
34 agg_schemas = loadAggregationSchemas()
32 SCHEMAS = loadStorageSchemas()
33 AGGREGATION_SCHEMAS = loadAggregationSchemas()
3534 CACHE_SIZE_LOW_WATERMARK = settings.MAX_CACHE_SIZE * 0.95
3635
3736
37 # Inititalize token buckets so that we can enforce rate limits on creates and
38 # updates if the config wants them.
39 CREATE_BUCKET = None
40 UPDATE_BUCKET = None
41 if settings.MAX_CREATES_PER_MINUTE != float('inf'):
42 capacity = settings.MAX_CREATES_PER_MINUTE
43 fill_rate = float(settings.MAX_CREATES_PER_MINUTE) / 60
44 CREATE_BUCKET = TokenBucket(capacity, fill_rate)
45
46 if settings.MAX_UPDATES_PER_SECOND != float('inf'):
47 capacity = settings.MAX_UPDATES_PER_SECOND
48 fill_rate = settings.MAX_UPDATES_PER_SECOND
49 UPDATE_BUCKET = TokenBucket(capacity, fill_rate)
50
51
3852 def optimalWriteOrder():
39 "Generates metrics with the most cached values first and applies a soft rate limit on new metrics"
40 global lastCreateInterval
41 global createCount
42 metrics = MetricCache.counts()
43
44 t = time.time()
45 metrics.sort(key=lambda item: item[1], reverse=True) # by queue size, descending
46 log.msg("Sorted %d cache queues in %.6f seconds" % (len(metrics), time.time() - t))
47
48 for metric, queueSize in metrics:
53 """Generates metrics with the most cached values first and applies a soft
54 rate limit on new metrics"""
55 while MetricCache:
56 (metric, datapoints) = MetricCache.pop()
4957 if state.cacheTooFull and MetricCache.size < CACHE_SIZE_LOW_WATERMARK:
5058 events.cacheSpaceAvailable()
5159
5260 dbFilePath = getFilesystemPath(metric)
5361 dbFileExists = exists(dbFilePath)
5462
55 if not dbFileExists:
56 createCount += 1
57 now = time.time()
58
59 if now - lastCreateInterval >= 60:
60 lastCreateInterval = now
61 createCount = 1
62
63 elif createCount >= settings.MAX_CREATES_PER_MINUTE:
64 # dropping queued up datapoints for new metrics prevents filling up the entire cache
65 # when a bunch of new metrics are received.
66 try:
67 MetricCache.pop(metric)
68 except KeyError:
69 pass
70
71 continue
72
73 try: # metrics can momentarily disappear from the MetricCache due to the implementation of MetricCache.store()
74 datapoints = MetricCache.pop(metric)
75 except KeyError:
76 log.msg("MetricCache contention, skipping %s update for now" % metric)
77 continue # we simply move on to the next metric when this race condition occurs
63 if not dbFileExists and CREATE_BUCKET:
64 # If our tokenbucket has enough tokens available to create a new metric
65 # file then yield the metric data to complete that operation. Otherwise
66 # we'll just drop the metric on the ground and move on to the next
67 # metric.
68 # XXX This behavior should probably be configurable to no tdrop metrics
69 # when rate limitng unless our cache is too big or some other legit
70 # reason.
71 if CREATE_BUCKET.drain(1):
72 yield (metric, datapoints, dbFilePath, dbFileExists)
73 continue
7874
7975 yield (metric, datapoints, dbFilePath, dbFileExists)
8076
8177
8278 def writeCachedDataPoints():
8379 "Write datapoints until the MetricCache is completely empty"
84 updates = 0
85 lastSecond = 0
8680
8781 while MetricCache:
8882 dataWritten = False
9488 archiveConfig = None
9589 xFilesFactor, aggregationMethod = None, None
9690
97 for schema in schemas:
91 for schema in SCHEMAS:
9892 if schema.matches(metric):
9993 log.creates('new metric %s matched schema %s' % (metric, schema.name))
10094 archiveConfig = [archive.getTuple() for archive in schema.archives]
10195 break
10296
103 for schema in agg_schemas:
97 for schema in AGGREGATION_SCHEMAS:
10498 if schema.matches(metric):
10599 log.creates('new metric %s matched aggregation schema %s' % (metric, schema.name))
106100 xFilesFactor, aggregationMethod = schema.archives
110104 raise Exception("No storage schema matched the metric '%s', check your storage-schemas.conf file." % metric)
111105
112106 dbDir = dirname(dbFilePath)
113 os.system("mkdir -p -m 755 '%s'" % dbDir)
114
115 log.creates("creating database file %s (archive=%s xff=%s agg=%s)" %
107 try:
108 if not exists(dbDir):
109 os.makedirs(dbDir, 0755)
110 except OSError, e:
111 log.err("%s" % e)
112 log.creates("creating database file %s (archive=%s xff=%s agg=%s)" %
116113 (dbFilePath, archiveConfig, xFilesFactor, aggregationMethod))
117 whisper.create(dbFilePath, archiveConfig, xFilesFactor, aggregationMethod, settings.WHISPER_SPARSE_CREATE)
118 os.chmod(dbFilePath, 0755)
114 whisper.create(
115 dbFilePath,
116 archiveConfig,
117 xFilesFactor,
118 aggregationMethod,
119 settings.WHISPER_SPARSE_CREATE,
120 settings.WHISPER_FALLOCATE_CREATE)
119121 instrumentation.increment('creates')
120
122 # If we've got a rate limit configured lets makes sure we enforce it
123 if UPDATE_BUCKET:
124 UPDATE_BUCKET.drain(1, blocking=True)
121125 try:
122126 t1 = time.time()
123127 whisper.update_many(dbFilePath, datapoints)
124 t2 = time.time()
125 updateTime = t2 - t1
128 updateTime = time.time() - t1
126129 except:
127130 log.msg("Error writing to %s" % (dbFilePath))
128131 log.err()
131134 pointCount = len(datapoints)
132135 instrumentation.increment('committedPoints', pointCount)
133136 instrumentation.append('updateTimes', updateTime)
134
135137 if settings.LOG_UPDATES:
136138 log.updates("wrote %d datapoints for %s in %.5f seconds" % (pointCount, metric, updateTime))
137
138 # Rate limit update operations
139 thisSecond = int(t2)
140
141 if thisSecond != lastSecond:
142 lastSecond = thisSecond
143 updates = 0
144 else:
145 updates += 1
146 if updates >= settings.MAX_UPDATES_PER_SECOND:
147 time.sleep( int(t2 + 1) - t2 )
148139
149140 # Avoid churning CPU when only new metrics are in the cache
150141 if not dataWritten:
157148 writeCachedDataPoints()
158149 except:
159150 log.err()
160
161 time.sleep(1) # The writer thread only sleeps when the cache is empty or an error occurs
151 time.sleep(1) # The writer thread only sleeps when the cache is empty or an error occurs
162152
163153
164154 def reloadStorageSchemas():
165 global schemas
155 global SCHEMAS
166156 try:
167 schemas = loadStorageSchemas()
157 SCHEMAS = loadStorageSchemas()
168158 except:
169 log.msg("Failed to reload storage schemas")
159 log.msg("Failed to reload storage SCHEMAS")
170160 log.err()
171161
162
172163 def reloadAggregationSchemas():
173 global agg_schemas
164 global AGGREGATION_SCHEMAS
174165 try:
175 schemas = loadAggregationSchemas()
166 AGGREGATION_SCHEMAS = loadAggregationSchemas()
176167 except:
177 log.msg("Failed to reload aggregation schemas")
168 log.msg("Failed to reload aggregation SCHEMAS")
178169 log.err()
170
171
172 def shutdownModifyUpdateSpeed():
173 try:
174 settings.MAX_UPDATES_PER_SECOND = settings.MAX_UPDATES_PER_SECOND_ON_SHUTDOWN
175 log.msg("Carbon shutting down. Changed the update rate to: " + str(settings.MAX_UPDATES_PER_SECOND_ON_SHUTDOWN))
176 except KeyError:
177 log.msg("Carbon shutting down. Update rate not changed")
179178
180179
181180 class WriterService(Service):
187186 def startService(self):
188187 self.storage_reload_task.start(60, False)
189188 self.aggregation_reload_task.start(60, False)
189 reactor.addSystemEventTrigger('before', 'shutdown', shutdownModifyUpdateSpeed)
190190 reactor.callInThread(writeForever)
191191 Service.startService(self)
192192
00 #!/usr/bin/env python
11
22 import os
3 import platform
34 from glob import glob
45
56 if os.environ.get('USE_SETUPTOOLS'):
1415 storage_dirs = [ ('storage/whisper',[]), ('storage/lists',[]),
1516 ('storage/log',[]), ('storage/rrd',[]) ]
1617 conf_files = [ ('conf', glob('conf/*.example')) ]
17 #XXX Need a way to have these work for bdist_rpm but be left alone for everything else
18 #init_scripts = [ ('/etc/init.d', ['distro/redhat/init.d/carbon-cache',
19 # 'distro/redhat/init.d/carbon-relay',
20 # 'distro/redhat/init.d/carbon-aggregator']) ]
18
19 install_files = storage_dirs + conf_files
20
21 # If we are building on RedHat, let's use the redhat init scripts.
22 if platform.dist()[0] == 'redhat':
23 init_scripts = [ ('/etc/init.d', ['distro/redhat/init.d/carbon-cache',
24 'distro/redhat/init.d/carbon-relay',
25 'distro/redhat/init.d/carbon-aggregator']) ]
26 install_files += init_scripts
27
2128
2229 setup(
2330 name='carbon',
24 version='0.9.10',
25 url='https://launchpad.net/graphite',
31 version='0.9.12',
32 url='http://graphite-project.github.com',
2633 author='Chris Davis',
2734 author_email='chrismd@gmail.com',
2835 license='Apache Software License 2.0',
3138 package_dir={'' : 'lib'},
3239 scripts=glob('bin/*'),
3340 package_data={ 'carbon' : ['*.xml'] },
34 data_files=storage_dirs + conf_files, # + init_scripts,
41 data_files=install_files,
3542 install_requires=['twisted', 'txamqp'],
3643 **setup_kwargs
3744 )