Codebase list logbook / 61db662a-aa54-48c2-8f42-facbd50b4153/main tests / test_queues.py
61db662a-aa54-48c2-8f42-facbd50b4153/main

Tree @61db662a-aa54-48c2-8f42-facbd50b4153/main (Download .tar.gz)

test_queues.py @61db662a-aa54-48c2-8f42-facbd50b4153/mainraw · history · blame

# -*- coding: utf-8 -*-
import os
import socket
import time

from .utils import require_module, missing, LETTERS

import logbook
from logbook.helpers import u

import pytest


@require_module('zmq')
def test_zeromq_handler(logger, handlers, subscriber):
    tests = [
        u('Logging something'),
        u('Something with umlauts äöü'),
        u('Something else for good measure'),
    ]
    for test in tests:
        for handler in handlers:
            with handler:
                logger.warn(test)
                record = subscriber.recv()
                assert record.message == test
                assert record.channel == logger.name


@require_module('zmq')
def test_zeromq_background_thread(logger, handlers, subscriber):
    test_handler = logbook.TestHandler()
    controller = subscriber.dispatch_in_background(test_handler)

    for handler in handlers:
        with handler:
            logger.warn('This is a warning')
            logger.error('This is an error')

    # stop the controller.  This will also stop the loop and join the
    # background process.  Before that we give it a fraction of a second
    # to get all results
    time.sleep(0.5)
    controller.stop()

    assert test_handler.has_warning('This is a warning')
    assert test_handler.has_error('This is an error')


@missing('zmq')
def test_missing_zeromq():
    from logbook.queues import ZeroMQHandler, ZeroMQSubscriber
    with pytest.raises(RuntimeError):
        ZeroMQHandler('tcp://127.0.0.1:42000')
    with pytest.raises(RuntimeError):
        ZeroMQSubscriber('tcp://127.0.0.1:42000')


class MultiProcessingHandlerSendBack(object):
    def __init__(self, queue):
        self.queue = queue

    def __call__(self):
        from logbook.queues import MultiProcessingHandler
        handler = MultiProcessingHandler(self.queue)
        handler.push_thread()
        try:
            logbook.warn('Hello World')
        finally:
            handler.pop_thread()


@require_module('multiprocessing')
def test_multi_processing_handler():
    if os.getenv('APPVEYOR') == 'True':
        pytest.skip('Test hangs on AppVeyor CI')
    from multiprocessing import Process, Queue
    from logbook.queues import MultiProcessingSubscriber
    queue = Queue(-1)
    test_handler = logbook.TestHandler()
    subscriber = MultiProcessingSubscriber(queue)

    p = Process(target=MultiProcessingHandlerSendBack(queue))
    p.start()
    p.join()

    with test_handler:
        subscriber.dispatch_once()
        assert test_handler.has_warning('Hello World')


class BatchTestHandler(logbook.TestHandler):
    def __init__(self, *args, **kwargs):
        super(BatchTestHandler, self).__init__(*args, **kwargs)
        self.batches = []

    def emit(self, record):
        super(BatchTestHandler, self).emit(record)
        self.batches.append([record])

    def emit_batch(self, records, reason):
        for record in records:
            super(BatchTestHandler, self).emit(record)
        self.batches.append(records)


def test_threaded_wrapper_handler(logger):
    from logbook.queues import ThreadedWrapperHandler
    test_handler = BatchTestHandler()
    with ThreadedWrapperHandler(test_handler) as handler:
        logger.warn('Just testing')
        logger.error('More testing')

    # give it some time to sync up
    handler.close()

    assert (not handler.controller.running)
    assert len(test_handler.records) == 2
    assert len(test_handler.batches) == 2
    assert all((len(records) == 1 for records in test_handler.batches))
    assert test_handler.has_warning('Just testing')
    assert test_handler.has_error('More testing')


def test_threaded_wrapper_handler_emit():
    from logbook.queues import ThreadedWrapperHandler
    test_handler = BatchTestHandler()
    with ThreadedWrapperHandler(test_handler) as handler:
        lr = logbook.LogRecord('Test Logger', logbook.WARNING, 'Just testing')
        test_handler.emit(lr)
        lr = logbook.LogRecord('Test Logger', logbook.ERROR, 'More testing')
        test_handler.emit(lr)

    # give it some time to sync up
    handler.close()

    assert (not handler.controller.running)
    assert len(test_handler.records) == 2
    assert len(test_handler.batches) == 2
    assert all((len(records) == 1 for records in test_handler.batches))
    assert test_handler.has_warning('Just testing')
    assert test_handler.has_error('More testing')


def test_threaded_wrapper_handler_emit_batched():
    from logbook.queues import ThreadedWrapperHandler
    test_handler = BatchTestHandler()
    with ThreadedWrapperHandler(test_handler) as handler:
        test_handler.emit_batch([
            logbook.LogRecord('Test Logger', logbook.WARNING, 'Just testing'),
            logbook.LogRecord('Test Logger', logbook.ERROR, 'More testing'),
        ], 'group')

    # give it some time to sync up
    handler.close()

    assert (not handler.controller.running)
    assert len(test_handler.records) == 2
    assert len(test_handler.batches) == 1
    (records, ) = test_handler.batches
    assert len(records) == 2
    assert test_handler.has_warning('Just testing')
    assert test_handler.has_error('More testing')


@require_module('execnet')
def test_execnet_handler():
    def run_on_remote(channel):
        import logbook
        from logbook.queues import ExecnetChannelHandler
        handler = ExecnetChannelHandler(channel)
        log = logbook.Logger('Execnet')
        handler.push_application()
        log.info('Execnet works')

    import execnet
    gw = execnet.makegateway()
    channel = gw.remote_exec(run_on_remote)
    from logbook.queues import ExecnetChannelSubscriber
    subscriber = ExecnetChannelSubscriber(channel)
    record = subscriber.recv()
    assert record.msg == 'Execnet works'
    gw.exit()


class SubscriberGroupSendBack(object):
    def __init__(self, message, queue):
        self.message = message
        self.queue = queue

    def __call__(self):
        from logbook.queues import MultiProcessingHandler
        with MultiProcessingHandler(self.queue):
            logbook.warn(self.message)


@require_module('multiprocessing')
def test_subscriber_group():
    if os.getenv('APPVEYOR') == 'True':
        pytest.skip('Test hangs on AppVeyor CI')
    from multiprocessing import Process, Queue
    from logbook.queues import MultiProcessingSubscriber, SubscriberGroup
    a_queue = Queue(-1)
    b_queue = Queue(-1)
    subscriber = SubscriberGroup([
        MultiProcessingSubscriber(a_queue),
        MultiProcessingSubscriber(b_queue)
    ])

    for _ in range(10):
        p1 = Process(target=SubscriberGroupSendBack('foo', a_queue))
        p2 = Process(target=SubscriberGroupSendBack('bar', b_queue))
        p1.start()
        p2.start()
        p1.join()
        p2.join()
        messages = [subscriber.recv().message for i in (1, 2)]
        assert sorted(messages) == ['bar', 'foo']


@require_module('redis')
def test_redis_handler():
    import redis
    from logbook.queues import RedisHandler

    KEY = 'redis-{}'.format(os.getpid())
    FIELDS = ['message', 'host']
    r = redis.Redis(decode_responses=True)
    redis_handler = RedisHandler(key=KEY, level=logbook.INFO, bubble=True)
    # We don't want output for the tests, so we can wrap everything in a
    # NullHandler
    null_handler = logbook.NullHandler()

    # Check default values
    with null_handler.applicationbound():
        with redis_handler:
            logbook.info(LETTERS)

    key, message = r.blpop(KEY)
    # Are all the fields in the record?
    for field in FIELDS:
        assert message.find(field)
    assert key == KEY
    assert message.find(LETTERS)

    # Change the key of the handler and check on redis
    KEY = 'test_another_key-{}'.format(os.getpid())
    redis_handler.key = KEY

    with null_handler.applicationbound():
        with redis_handler:
            logbook.info(LETTERS)

    key, message = r.blpop(KEY)
    assert key == KEY

    # Check that extra fields are added if specified when creating the handler
    FIELDS.append('type')
    extra_fields = {'type': 'test'}
    del(redis_handler)
    redis_handler = RedisHandler(key=KEY, level=logbook.INFO,
                                 extra_fields=extra_fields, bubble=True)

    with null_handler.applicationbound():
        with redis_handler:
            logbook.info(LETTERS)

    key, message = r.blpop(KEY)
    for field in FIELDS:
        assert message.find(field)
    assert message.find('test')

    # And finally, check that fields are correctly added if appended to the
    # log message
    FIELDS.append('more_info')
    with null_handler.applicationbound():
        with redis_handler:
            logbook.info(LETTERS, more_info='This works')

    key, message = r.blpop(KEY)
    for field in FIELDS:
        assert message.find(field)
    assert message.find('This works')


@require_module('redis')
def test_redis_handler_lpush():
    """
    Test if lpush stores messages in the right order
    new items should be first on list
    """
    import redis
    from logbook.queues import RedisHandler
    null_handler = logbook.NullHandler()

    KEY = 'lpushed-'.format(os.getpid())
    redis_handler = RedisHandler(key=KEY, push_method='lpush',
                                 level=logbook.INFO, bubble=True)

    with null_handler.applicationbound():
        with redis_handler:
            logbook.info("old item")
            logbook.info("new item")

    time.sleep(1.5)

    r = redis.Redis(decode_responses=True)
    logs = r.lrange(KEY, 0, -1)
    assert logs
    assert "new item" in logs[0]
    r.delete(KEY)


@require_module('redis')
def test_redis_handler_rpush():
    """
    Test if rpush stores messages in the right order
    old items should be first on list
    """
    import redis
    from logbook.queues import RedisHandler
    null_handler = logbook.NullHandler()

    KEY = 'rpushed-' + str(os.getpid())
    redis_handler = RedisHandler(key=KEY, push_method='rpush',
                                 level=logbook.INFO, bubble=True)

    with null_handler.applicationbound():
        with redis_handler:
            logbook.info("old item")
            logbook.info("new item")

    time.sleep(1.5)

    r = redis.Redis(decode_responses=True)
    logs = r.lrange(KEY, 0, -1)
    assert logs
    assert "old item" in logs[0]
    r.delete(KEY)


@pytest.fixture
def handlers(handlers_subscriber):
    return handlers_subscriber[0]


@pytest.fixture
def subscriber(handlers_subscriber):
    return handlers_subscriber[1]


@pytest.fixture
def handlers_subscriber(multi):
    from logbook.queues import ZeroMQHandler, ZeroMQSubscriber

    # Get an unused port
    tempsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    tempsock.bind(('127.0.0.1', 0))
    host, unused_port = tempsock.getsockname()
    tempsock.close()

    # Retrieve the ZeroMQ handler and subscriber
    uri = 'tcp://%s:%d' % (host, unused_port)
    if multi:
        handlers = [ZeroMQHandler(uri, multi=True) for _ in range(3)]
    else:
        handlers = [ZeroMQHandler(uri)]
    subscriber = ZeroMQSubscriber(uri, multi=multi)
    # Enough time to start
    time.sleep(0.1)
    return handlers, subscriber


@pytest.fixture(params=[True, False])
def multi(request):
    return request.param