Codebase list irker / debian/1.7+dfsg-2 irkerd
debian/1.7+dfsg-2

Tree @debian/1.7+dfsg-2 (Download .tar.gz)

irkerd @debian/1.7+dfsg-2raw · history · blame

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
#!/usr/bin/env python
"""
irkerd - a simple IRC multiplexer daemon

Listens for JSON objects of the form {'to':<irc-url>, 'privmsg':<text>}
and relays messages to IRC channels. Each request must be followed by
a newline.

The <text> must be a string.  The value of the 'to' attribute can be a
string containing an IRC URL (e.g. 'irc://chat.freenet.net/botwar') or
a list of such strings; in the latter case the message is broadcast to
all listed channels.  Note that the channel portion of the URL need
*not* have a leading '#' unless the channel name itself does.

Options: -d sets the debug-message level (probably only of interest to
developers). The -V option prints the program version and exits.

Design and code by Eric S. Raymond <esr@thyrsus.com>. See the project
resource page at <http://www.catb.org/~esr/irker/>.

Requires Python 2.6 and the irc client library at version >= 2.0.2: see

http://pypi.python.org/pypi/irc/
"""
# These things might need tuning

HOST = "localhost"
PORT = 6659

NAMESTYLE = "irker%03d"		# IRC nick template - must contain '%d'
XMIT_TTL = (3 * 60 * 60)	# Time to live, seconds from last transmit
PING_TTL = (15 * 60)		# Time to live, seconds from last PING
DISCONNECT_TTL = (24 * 60 * 60)	# Time to live, seconds from last connect
UNSEEN_TTL = 60			# Time to live, seconds since first request
CHANNEL_MAX = 18		# Max channels open per socket (default)
ANTI_FLOOD_DELAY = 0.5		# Anti-flood delay after transmissions, seconds
ANTI_BUZZ_DELAY = 0.09		# Anti-buzz delay after queue-empty check

# No user-serviceable parts below this line

version = "1.7"

# This black magic imports support for green threads (coroutines),
# then has kinky sex with the import library internals, replacing
# "threading" with a coroutine-using imposter.  Threads then become
# ultra-light-weight and cooperatively scheduled.
try:
    import eventlet
    eventlet.monkey_patch()
    green_threads = True
    # With greenlets we don't worry about thread exhaustion, only the
    # file descriptor limit (typically 1024 on modern Unixes). Thus we
    # can handle a lot more concurrent sessions and generate less
    # join/leave spam under heavy load.
    CONNECTION_MAX = 1000
except ImportError:
    # Threads are more expensive if we have to use OS-level ones
    # rather than greenlets.  We need to avoid pushing thread limits
    # as well as fd limits.  See security.txt for discussion.
    CONNECTION_MAX = 200
    green_threads = False

import sys, getopt, urlparse, time, random, socket, signal
import threading, Queue, SocketServer
import irc.client, logging
try:
    import simplejson as json	# Faster, also makes us Python-2.4-compatible
except ImportError:
    import json

# Sketch of implementation:
#
# One Irker object manages multiple IRC sessions.  It holds a map of
# Dispatcher objects, one per (server, port) combination, which are
# responsible for routing messages to one of any number of Connection
# objects that do the actual socket conversations.  The reason for the
# Dispatcher layer is that IRC daemons limit the number of channels a
# client (that is, from the daemon's point of view, a socket) can be
# joined to, so each session to a server needs a flock of Connection
# instances each with its own socket.
#
# Connections are timed out and removed when either they haven't seen a
# PING for a while (indicating that the server may be stalled or down)
# or there has been no message traffic to them for a while, or
# even if the queue is nonempty but efforts to connect have failed for
# a long time.
#
# There are multiple threads. One accepts incoming traffic from all servers.
# Each Connection also has a consumer thread and a thread-safe message queue.
# The program main appends messages to queues as JSON requests are received;
# the consumer threads try to ship them to servers.  When a socket write
# stalls, it only blocks an individual consumer thread; if it stalls long
# enough, the session will be timed out.
#
# Message delivery is thus not reliable in the face of network stalls,
# but this was considered acceptable because IRC (notoriously) has the
# same problem - there is little point in reliable delivery to a relay
# that is down or unreliable.
#
# This code uses only NICK, JOIN, MODE, and PRIVMSG. It is strictly
# compliant to RFC1459, except for the interpretation and use of the
# DEAF and CHANLIMIT and (obsolete) MAXCHANNELS features.  CHANLIMIT
# is as described in the Internet RFC draft
# draft-brocklesby-irc-isupport-03 at <http://www.mirc.com/isupport.html>.

class Connection:
    def __init__(self, irkerd, servername, port):
        self.irker = irkerd
        self.servername = servername
        self.port = port
        self.nick_trial = None
        self.connection = None
        self.status = None
        self.last_xmit = time.time()
        self.last_ping = time.time()
        self.channels_joined = []
        self.channel_limits = {}
        # The consumer thread
        self.queue = Queue.Queue()
        self.thread = None
    def nickname(self, n=None):
        "Return a name for the nth server connection."
        if n is None:
            n = self.nick_trial
        return (NAMESTYLE % n)
    def handle_ping(self):
        "Register the fact that the server has pinged this connection."
        self.last_ping = time.time()
    def handle_welcome(self):
        "The server says we're OK, with a non-conflicting nick."
        self.status = "ready"
        self.irker.debug(1, "nick %s accepted" % self.nickname())
    def handle_badnick(self):
        "The server says our nick has a conflict."
        self.irker.debug(1, "nick %s rejected" % self.nickname())
        # Randomness prevents a malicious user or bot from antcipating the
        # next trial name in order to block us from completing the handshake.
        self.nick_trial += random.randint(1, 3)
        self.connection.nick(self.nickname())
    def handle_disconnect(self):
        "Server disconnected us for flooding or some other reason."
        self.connection = None
    def handle_kick(self, outof):
        "We've been kicked."
        self.status = "handshaking"
        try:
            self.channels_joined.remove(outof)
        except ValueError:
            self.irker.logerr("kicked by %s from %s that's not joined"
                              % (self.servername, outof))
        qcopy = []
        while not self.queue.empty():
            (channel, message) = self.queue.get()
            if channel != outof:
                qcopy.append((channel, message))
        for (channel, message) in qcopy:
            self.queue.put((channel, message))
        self.status = "ready"
    def enqueue(self, channel, message):
        "Enque a message for transmission."
        if self.thread is None or not self.thread.is_alive():
            self.status = "unseen"
            self.thread = threading.Thread(target=self.dequeue)
            self.thread.setDaemon(True)
            self.thread.start()
        self.queue.put((channel, message))
    def dequeue(self):
        "Try to ship pending messages from the queue."
        try:
            while True:
                # We want to be kind to the IRC servers and not hold unused
                # sockets open forever, so they have a time-to-live.  The
                # loop is coded this particular way so that we can drop
                # the actual server connection when its time-to-live
                # expires, then reconnect and resume transmission if the
                # queue fills up again.
                if self.queue.empty():
                    # Queue is empty, at some point we want to time out
                    # the connection rather than holding a socket open in
                    # the server forever.
                    now = time.time()
                    if (now > self.last_xmit + XMIT_TTL \
                           or now > self.last_ping + PING_TTL) \
                           and self.status != "disconnected":
                        self.irker.debug(1, "timing out inactive connection to %s at %s" % (self.servername, time.asctime()))
                        self.connection.context = None
                        self.connection.quit("transmission timeout")
                        self.connection.close()
                        self.connection = None
                        self.status = "disconnected"
                    else:
                        # Prevent this thread from hogging the CPU by pausing
                        # for just a little bit after the queue-empty check.
                        # As long as this is less that the duration of a human
                        # reflex arc it is highly unlikely any human will ever
                        # notice.
                        time.sleep(ANTI_BUZZ_DELAY)
                elif not self.connection:
                    # Queue is nonempty but server isn't connected.
                    self.connection = self.irker.irc.server()
                    self.connection.context = self
                    # Try to avoid colliding with other instances
                    self.nick_trial = random.randint(1, 990)
                    self.channels_joined = []
                    # This will throw irc.client.ServerConnectionError on failure
                    try:
                        self.connection.connect(self.servername,
                                            self.port,
                                            nickname=self.nickname(),
                                            username="irker",
                                            ircname="irker relaying client")
                        self.status = "handshaking"
                        self.irker.debug(1, "XMIT_TTL bump (%s connection) at %s" % (self.servername, time.asctime()))
                        self.last_xmit = time.time()
                    except irc.client.ServerConnectionError:
                        self.status = "disconnected"
                elif self.status == "handshaking":
                    # Don't buzz on the empty-queue test while we're
                    # handshaking
                    time.sleep(ANTI_BUZZ_DELAY)
                elif self.status == "disconnected" \
                         and time.time() > self.last_xmit + DISCONNECT_TTL:
                    # Queue is nonempty, but the IRC server might be
                    # down. Letting failed connections retain queue
                    # space forever would be a memory leak.
                    self.status = "expired"
                    break
                elif self.status == "unseen" \
                         and time.time() > self.last_xmit + UNSEEN_TTL:
                    # Nasty people could attempt a denial-of-service
                    # attack by flooding us with requests with invalid
                    # servernames. We guard against this by rapidly
                    # expiring connections that have a nonempty queue but
                    # have never had a successful open.
                    self.status = "expired"
                    break
                elif self.status == "ready":
                    (channel, message) = self.queue.get()
                    if channel not in self.channels_joined:
                        self.channels_joined.append(channel)
                        self.connection.join(channel)
                        self.irker.debug(1, "joining %s on %s." % (channel, self.servername))
                    for segment in message.split("\n"):
                        self.connection.privmsg(channel, segment)
                        time.sleep(ANTI_FLOOD_DELAY)
                    self.last_xmit = time.time()
                    self.irker.debug(1, "XMIT_TTL bump (%s transmission) at %s" % (self.servername, time.asctime()))
                    self.queue.task_done()
        except:
            (exc_type, _exc_value, _exc_traceback) = sys.exc_info()
            self.irker.logerr("exception %s in thread for %s" % \
                              (exc_type, self.servername))
    def live(self):
        "Should this connection not be scavenged?"
        return self.status != "expired"
    def joined_to(self, channel):
        "Is this connection joined to the specified channel?"
        return channel in self.channels_joined
    def accepting(self, channel):
        "Can this connection accept a join of this channel?"
        if self.channel_limits:
            match_count = 0
            for already in self.channels_joined:
                if already[0] == channel[0]:
                    match_count += 1
            return match_count < self.channel_limits.get(channel[0], CHANNEL_MAX)
        else:
            return len(self.channels_joined) < CHANNEL_MAX

class Target():
    "Represent a transmission target."
    def __init__(self, url):
        parsed = urlparse.urlparse(url)
        irchost, _, ircport = parsed.netloc.partition(':')
        if not ircport:
            ircport = 6667
        self.servername = irchost
        # IRC channel names are case-insensitive.  If we don't smash
        # case here we may run into problems later. There was a bug
        # observed on irc.rizon.net where an irkerd user specified #Channel,
        # got kicked, and irkerd crashed because the server returned
        # "#channel" in the notification that our kick handler saw.
        self.channel = parsed.path.lstrip('/').lower()
        if self.channel and self.channel[0] not in "#&+":
            self.channel = "#" + self.channel
        self.port = int(ircport)
    def valid(self):
        "Both components must be present for a valid target."
        return self.servername and self.channel
    def server(self):
        "Return a hashable tuple representing the destination server."
        return (self.servername, self.port)

class Dispatcher:
    "Manage connections to a particular server-port combination."
    def __init__(self, irkerd, servername, port):
        self.irker = irkerd
        self.servername = servername
        self.port = port
        self.connections = []
    def dispatch(self, channel, message):
        "Dispatch messages for our server-port combination."
        connections = [x for x in self.connections if x.live()]
        eligibles = [x for x in connections if x.joined_to(channel)] \
                    or [x for x in connections if x.accepting(channel)]
        if not eligibles:
            newconn = Connection(self.irker,
                                 self.servername,
                                 self.port)
            self.connections.append(newconn)
            eligibles = [newconn]
        eligibles[0].enqueue(channel, message)
    def live(self):
        "Does this server-port combination have any live connections?"
        self.connections = [x for x in self.connections if x.live()]
        return len(self.connections) > 0
    def last_xmit(self):
        "Return the time of the most recent transmission."
        return max([x.last_xmit for x in self.connections])

class Irker:
    "Persistent IRC multiplexer."
    def __init__(self, debuglevel=0):
        self.debuglevel = debuglevel
        self.irc = irc.client.IRC()
        self.irc.add_global_handler("ping", self._handle_ping)
        self.irc.add_global_handler("welcome", self._handle_welcome)
        self.irc.add_global_handler("erroneusnickname", self._handle_badnick)
        self.irc.add_global_handler("nicknameinuse", self._handle_badnick)
        self.irc.add_global_handler("nickcollision", self._handle_badnick)
        self.irc.add_global_handler("unavailresource", self._handle_badnick)
        self.irc.add_global_handler("featurelist", self._handle_features)
        self.irc.add_global_handler("disconnect", self._handle_disconnect)
        self.irc.add_global_handler("kick", self._handle_kick)
        thread = threading.Thread(target=self.irc.process_forever)
        thread.setDaemon(True)
        self.irc._thread = thread
        thread.start()
        self.servers = {}
    def logerr(self, errmsg):
        "Log a processing error."
        sys.stderr.write("irkerd: " + errmsg + "\n")
    def debug(self, level, errmsg):
        "Debugging information."
        if self.debuglevel >= level:
            sys.stderr.write("irkerd: %s\n" % errmsg)
    def _handle_ping(self, connection, _event):
        "PING arrived, bump the last-received time for the connection."
        if connection.context:
            connection.context.handle_ping()
    def _handle_welcome(self, connection, _event):
        "Welcome arrived, nick accepted for this connection."
        if connection.context:
            connection.context.handle_welcome()
    def _handle_badnick(self, connection, _event):
        "Nick not accepted for this connection."
        if connection.context:
            connection.context.handle_badnick()
    def _handle_features(self, connection, event):
        "Determine if and how we can set deaf mode."
        if connection.context:
            cxt = connection.context
            for lump in event.arguments():
                if lump.startswith("DEAF="):
                    connection.mode(cxt.nickname(), "+"+lump[5:])
                elif lump.startswith("MAXCHANNELS="):
                    m = int(lump[12:])
                    for pref in "#&+":
                        cxt.channel_limits[pref] = m
                    self.debug(1, "%s maxchannels is %d"
                               % (connection.server, m))
                elif lump.startswith("CHANLIMIT=#:"):
                    limits = lump[10:].split(",")
                    try:
                        for token in limits:
                            (prefixes, limit) = token.split(":")
                            limit = int(limit)
                            for c in prefixes:
                                cxt.channel_limits[c] = limit
                        self.debug(1, "%s channel limit map is %s"
                                   % (connection.server, cxt.channel_limits))
                    except ValueError:
                        self.logerr("ill-formed CHANLIMIT property")
    def _handle_disconnect(self, connection, _event):
        "Server hung up the connection."
        self.debug(1, "server %s disconnected" % connection.server)
        if connection.context:
            connection.context.handle_disconnect()
    def _handle_kick(self, connection, event):
        "Server hung up the connection."
        self.debug(1, "irker has been kicked from %s on %s" % (event.target(), connection.server))
        if connection.context:
            connection.context.handle_kick(event.target())
    def handle(self, line):
        "Perform a JSON relay request."
        try:
            request = json.loads(line.strip())
            if not isinstance(request, dict):
                self.logerr("request is not a JSON dictionary: %r" % request)
            elif "to" not in request or "privmsg" not in request:
                self.logerr("malformed request - 'to' or 'privmsg' missing: %r" % request)
            else:
                channels = request['to']
                message = request['privmsg']
                if type(channels) not in (type([]), type(""), type(u"")):
                    self.logerr("malformed request - unexpected channel type: %r" % channels)
                if type(message) not in (type(""), type(u"")):
                    self.logerr("malformed request - unexpected message type: %r" % message)
                else:
                    if type(channels) != type([]):
                        channels = [channels]
                    for url in channels:
                        if not type(url) in (type(""), type(u"")): 
                            self.logerr("malformed request - URL has unexpected type: %r" % url)
                        else:
                            target = Target(url)
                            if not target.valid():
                                return
                            if target.server() not in self.servers:
                                self.servers[target.server()] = Dispatcher(self, target.servername, target.port)
                            self.servers[target.server()].dispatch(target.channel, message)
                            # GC dispatchers with no active connections
                            servernames = self.servers.keys()
                            for servername in servernames:
                                if not self.servers[servername].live():
                                    del self.servers[servername]
                            # If we might be pushing a resource limit
                            # even after garbage collection, remove a
                            # session.  The goal here is to head off
                            # DoS attacks that aim at exhausting
                            # thread space or file descriptors.  The
                            # cost is that attempts to DoS this
                            # service will cause lots of join/leave
                            # spam as we scavenge old channels after
                            # connecting to new ones. The particular
                            # method used for selecting a session to
                            # be terminated doesn't matter much; we
                            # choose the one longest idle on the
                            # assumption that message activity is likely
                            # to be clumpy.
                            oldest = None
                            oldtime = float("inf")
                            if len(self.servers) >= CONNECTION_MAX:
                                for (name, server) in self.servers.items():
                                    if server.last_xmit() < oldtime:
                                        oldest = name
                                        oldtime = server.last_xmit()
                                del self.servers[oldest]
        except ValueError:
            self.logerr("can't recognize JSON on input: %r" % line)
        except RuntimeError:
            self.logerr("wildly malformed JSON blew the parser stack.")

class IrkerTCPHandler(SocketServer.StreamRequestHandler):
    def handle(self):
        while True:
            line = self.rfile.readline()
            if not line:
                break
            irker.handle(line.strip())

class IrkerUDPHandler(SocketServer.BaseRequestHandler):
    def handle(self):
        data = self.request[0].strip()
        #socket = self.request[1]
        irker.handle(data)

if __name__ == '__main__':
    debuglvl = 0
    (options, arguments) = getopt.getopt(sys.argv[1:], "d:V")
    for (opt, val) in options:
        if opt == '-d':		# Enable debug/progress messages
            debuglvl = int(val)
            if debuglvl > 1:
                logging.basicConfig(level=logging.DEBUG)
        elif opt == '-V':	# Emit version and exit
            sys.stdout.write("irkerd version %s\n" % version)
            sys.exit(0)
    irker = Irker(debuglevel=debuglvl)
    irker.debug(1, "irkerd version %s" % version)
    try:
        tcpserver = SocketServer.TCPServer((HOST, PORT), IrkerTCPHandler)
        udpserver = SocketServer.UDPServer((HOST, PORT), IrkerUDPHandler)
        for server in [tcpserver, udpserver]:
            server = threading.Thread(target=server.serve_forever)
            server.setDaemon(True)
            server.start()
        try:
            signal.pause()
        except KeyboardInterrupt:
            raise SystemExit(1)
    except socket.error, e:
        sys.stderr.write("irkerd: server launch failed: %r\n" % e)

# end