New Upstream Release - python-rq

Ready changes


Merged new upstream version: 1.15.1 (was: 1.15).

Resulting package

Built on 2023-06-24T08:35 (took 5m33s)

The resulting binary packages can be installed (if you have the apt repository enabled) by running one of:

apt install -t fresh-releases python3-rq

Lintian Result


-        self.assertTrue(queue.is_empty())
-        runner = CliRunner()
-        result = runner.invoke(main, ['enqueue', '-u', self.redis_url, 'tests.fixtures.say_hello'])
-        self.assert_normal_execution(result)
-        prefix = 'Enqueued tests.fixtures.say_hello() with job-id \''
-        suffix = '\'.\n'
-        self.assertTrue(result.output.startswith(prefix))
-        self.assertTrue(result.output.endswith(suffix))
-        job_id = result.output[len(prefix) : -len(suffix)]
-        queue_key = 'rq:queue:default'
-        self.assertEqual(self.connection.llen(queue_key), 1)
-        self.assertEqual(self.connection.lrange(queue_key, 0, -1)[0].decode('ascii'), job_id)
-        worker = Worker(queue)
-        self.assertEqual(Job(job_id).result, 'Hi there, Stranger!')
-    def test_cli_enqueue_with_serializer(self):
-        """rq enqueue -u <url> -S rq.serializers.JSONSerializer tests.fixtures.say_hello"""
-        queue = Queue(connection=self.connection, serializer=JSONSerializer)
-        self.assertTrue(queue.is_empty())
-        runner = CliRunner()
-        result = runner.invoke(
-            main, ['enqueue', '-u', self.redis_url, '-S', 'rq.serializers.JSONSerializer', 'tests.fixtures.say_hello']
-        )
-        self.assert_normal_execution(result)
-        prefix = 'Enqueued tests.fixtures.say_hello() with job-id \''
-        suffix = '\'.\n'
-        self.assertTrue(result.output.startswith(prefix))
-        self.assertTrue(result.output.endswith(suffix))
-        job_id = result.output[len(prefix) : -len(suffix)]
-        queue_key = 'rq:queue:default'
-        self.assertEqual(self.connection.llen(queue_key), 1)
-        self.assertEqual(self.connection.lrange(queue_key, 0, -1)[0].decode('ascii'), job_id)
-        worker = Worker(queue, serializer=JSONSerializer)
-        self.assertEqual(Job(job_id, serializer=JSONSerializer).result, 'Hi there, Stranger!')
-    def test_cli_enqueue_args(self):
-        """rq enqueue -u <url> tests.fixtures.echo hello ':[1, {"key": "value"}]' json:=["abc"] nojson=def"""
-        queue = Queue(connection=self.connection)
-        self.assertTrue(queue.is_empty())
-        runner = CliRunner()
-        result = runner.invoke(
-            main,
-            [
-                'enqueue',
-                '-u',
-                self.redis_url,
-                'tests.fixtures.echo',
-                'hello',
-                ':[1, {"key": "value"}]',
-                ':@tests/test.json',
-                '%1, 2',
-                'json:=[3.0, true]',
-                'nojson=abc',
-                'file=@tests/test.json',
-            ],
-        )
-        self.assert_normal_execution(result)
-        job_id = self.connection.lrange('rq:queue:default', 0, -1)[0].decode('ascii')
-        worker = Worker(queue)
-        args, kwargs = Job(job_id).result
-        self.assertEqual(args, ('hello', [1, {'key': 'value'}], {"test": True}, (1, 2)))
-        self.assertEqual(kwargs, {'json': [3.0, True], 'nojson': 'abc', 'file': '{\n    "test": true\n}\n'})
-    def test_cli_enqueue_schedule_in(self):
-        """rq enqueue -u <url> tests.fixtures.say_hello --schedule-in 1s"""
-        queue = Queue(connection=self.connection)
-        registry = ScheduledJobRegistry(queue=queue)
-        worker = Worker(queue)
-        scheduler = RQScheduler(queue, self.connection)
-        self.assertTrue(len(queue) == 0)
-        self.assertTrue(len(registry) == 0)
-        runner = CliRunner()
-        result = runner.invoke(
-            main, ['enqueue', '-u', self.redis_url, 'tests.fixtures.say_hello', '--schedule-in', '10s']
-        )
-        self.assert_normal_execution(result)
-        scheduler.acquire_locks()
-        scheduler.enqueue_scheduled_jobs()
-        self.assertTrue(len(queue) == 0)
-        self.assertTrue(len(registry) == 1)
-        self.assertFalse(
-        sleep(11)
-        scheduler.enqueue_scheduled_jobs()
-        self.assertTrue(len(queue) == 1)
-        self.assertTrue(len(registry) == 0)
-        self.assertTrue(
-    def test_cli_enqueue_schedule_at(self):
-        """
-        rq enqueue -u <url> tests.fixtures.say_hello --schedule-at 2021-01-01T00:00:00
-        rq enqueue -u <url> tests.fixtures.say_hello --schedule-at 2100-01-01T00:00:00
-        """
-        queue = Queue(connection=self.connection)
-        registry = ScheduledJobRegistry(queue=queue)
-        worker = Worker(queue)
-        scheduler = RQScheduler(queue, self.connection)
-        self.assertTrue(len(queue) == 0)
-        self.assertTrue(len(registry) == 0)
-        runner = CliRunner()
-        result = runner.invoke(
-            main, ['enqueue', '-u', self.redis_url, 'tests.fixtures.say_hello', '--schedule-at', '2021-01-01T00:00:00']
-        )
-        self.assert_normal_execution(result)
-        scheduler.acquire_locks()
-        self.assertTrue(len(queue) == 0)
-        self.assertTrue(len(registry) == 1)
-        scheduler.enqueue_scheduled_jobs()
-        self.assertTrue(len(queue) == 1)
-        self.assertTrue(len(registry) == 0)
-        self.assertTrue(
-        self.assertTrue(len(queue) == 0)
-        self.assertTrue(len(registry) == 0)
-        result = runner.invoke(
-            main, ['enqueue', '-u', self.redis_url, 'tests.fixtures.say_hello', '--schedule-at', '2100-01-01T00:00:00']
-        )
-        self.assert_normal_execution(result)
-        self.assertTrue(len(queue) == 0)
-        self.assertTrue(len(registry) == 1)
-        scheduler.enqueue_scheduled_jobs()
-        self.assertTrue(len(queue) == 0)
-        self.assertTrue(len(registry) == 1)
-        self.assertFalse(
-    def test_cli_enqueue_retry(self):
-        """rq enqueue -u <url> tests.fixtures.say_hello --retry-max 3 --retry-interval 10 --retry-interval 20
-        --retry-interval 40"""
-        queue = Queue(connection=self.connection)
-        self.assertTrue(queue.is_empty())
-        runner = CliRunner()
-        result = runner.invoke(
-            main,
-            [
-                'enqueue',
-                '-u',
-                self.redis_url,
-                'tests.fixtures.say_hello',
-                '--retry-max',
-                '3',
-                '--retry-interval',
-                '10',
-                '--retry-interval',
-                '20',
-                '--retry-interval',
-                '40',
-            ],
-        )
-        self.assert_normal_execution(result)
-        job = Job.fetch(
-            self.connection.lrange('rq:queue:default', 0, -1)[0].decode('ascii'), connection=self.connection
-        )
-        self.assertEqual(job.retries_left, 3)
-        self.assertEqual(job.retry_intervals, [10, 20, 40])
-    def test_cli_enqueue_errors(self):
-        """
-        rq enqueue -u <url> tests.fixtures.echo :invalid_json
-        rq enqueue -u <url> tests.fixtures.echo %invalid_eval_statement
-        rq enqueue -u <url> tests.fixtures.echo key=value key=value
-        rq enqueue -u <url> tests.fixtures.echo --schedule-in 1s --schedule-at 2000-01-01T00:00:00
-        rq enqueue -u <url> tests.fixtures.echo @not_existing_file
-        """
-        runner = CliRunner()
-        result = runner.invoke(main, ['enqueue', '-u', self.redis_url, 'tests.fixtures.echo', ':invalid_json'])
-        self.assertNotEqual(result.exit_code, 0)
-        self.assertIn('Unable to parse 1. non keyword argument as JSON.', result.output)
-        result = runner.invoke(
-            main, ['enqueue', '-u', self.redis_url, 'tests.fixtures.echo', '%invalid_eval_statement']
-        )
-        self.assertNotEqual(result.exit_code, 0)
-        self.assertIn('Unable to eval 1. non keyword argument as Python object.', result.output)
-        result = runner.invoke(main, ['enqueue', '-u', self.redis_url, 'tests.fixtures.echo', 'key=value', 'key=value'])
-        self.assertNotEqual(result.exit_code, 0)
-        self.assertIn('You can\'t specify multiple values for the same keyword.', result.output)
-        result = runner.invoke(
-            main,
-            [
-                'enqueue',
-                '-u',
-                self.redis_url,
-                'tests.fixtures.echo',
-                '--schedule-in',
-                '1s',
-                '--schedule-at',
-                '2000-01-01T00:00:00',
-            ],
-        )
-        self.assertNotEqual(result.exit_code, 0)
-        self.assertIn('You can\'t specify both --schedule-in and --schedule-at', result.output)
-        result = runner.invoke(main, ['enqueue', '-u', self.redis_url, 'tests.fixtures.echo', '@not_existing_file'])
-        self.assertNotEqual(result.exit_code, 0)
-        self.assertIn('Not found', result.output)
-    def test_parse_schedule(self):
-        """executes the rq.cli.helpers.parse_schedule function"""
-        self.assertEqual(parse_schedule(None, '2000-01-23T23:45:01'), datetime(2000, 1, 23, 23, 45, 1))
-        start = + timedelta(minutes=5)
-        middle = parse_schedule('5m', None)
-        end = + timedelta(minutes=5)
-        self.assertGreater(middle, start)
-        self.assertLess(middle, end)
-    def test_parse_function_arg(self):
-        """executes the rq.cli.helpers.parse_function_arg function"""
-        self.assertEqual(parse_function_arg('abc', 0), (None, 'abc'))
-        self.assertEqual(parse_function_arg(':{"json": true}', 1), (None, {'json': True}))
-        self.assertEqual(parse_function_arg('%1, 2', 2), (None, (1, 2)))
-        self.assertEqual(parse_function_arg('key=value', 3), ('key', 'value'))
-        self.assertEqual(parse_function_arg('jsonkey:=["json", "value"]', 4), ('jsonkey', ['json', 'value']))
-        self.assertEqual(parse_function_arg('evalkey%=1.2', 5), ('evalkey', 1.2))
-        self.assertEqual(parse_function_arg(':@tests/test.json', 6), (None, {'test': True}))
-        self.assertEqual(parse_function_arg('@tests/test.json', 7), (None, '{\n    "test": true\n}\n'))
-    def test_cli_enqueue_doc_test(self):
-        """tests the examples of the documentation"""
-        runner = CliRunner()
-        id = str(uuid4())
-        result = runner.invoke(main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', 'abc'])
-        self.assert_normal_execution(result)
-        job = Job.fetch(id)
-        self.assertEqual((job.args, job.kwargs), (['abc'], {}))
-        id = str(uuid4())
-        result = runner.invoke(
-            main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', 'abc=def']
-        )
-        self.assert_normal_execution(result)
-        job = Job.fetch(id)
-        self.assertEqual((job.args, job.kwargs), ([], {'abc': 'def'}))
-        id = str(uuid4())
-        result = runner.invoke(
-            main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', ':{"json": "abc"}']
-        )
-        self.assert_normal_execution(result)
-        job = Job.fetch(id)
-        self.assertEqual((job.args, job.kwargs), ([{'json': 'abc'}], {}))
-        id = str(uuid4())
-        result = runner.invoke(
-            main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', 'key:={"json": "abc"}']
-        )
-        self.assert_normal_execution(result)
-        job = Job.fetch(id)
-        self.assertEqual((job.args, job.kwargs), ([], {'key': {'json': 'abc'}}))
-        id = str(uuid4())
-        result = runner.invoke(main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', '%1, 2'])
-        self.assert_normal_execution(result)
-        job = Job.fetch(id)
-        self.assertEqual((job.args, job.kwargs), ([(1, 2)], {}))
-        id = str(uuid4())
-        result = runner.invoke(main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', '%None'])
-        self.assert_normal_execution(result)
-        job = Job.fetch(id)
-        self.assertEqual((job.args, job.kwargs), ([None], {}))
-        id = str(uuid4())
-        result = runner.invoke(main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', '%True'])
-        self.assert_normal_execution(result)
-        job = Job.fetch(id)
-        self.assertEqual((job.args, job.kwargs), ([True], {}))
-        id = str(uuid4())
-        result = runner.invoke(
-            main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', 'key%=(1, 2)']
-        )
-        self.assert_normal_execution(result)
-        job = Job.fetch(id)
-        self.assertEqual((job.args, job.kwargs), ([], {'key': (1, 2)}))
-        id = str(uuid4())
-        result = runner.invoke(
-            main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', 'key%={"foo": True}']
-        )
-        self.assert_normal_execution(result)
-        job = Job.fetch(id)
-        self.assertEqual((job.args, job.kwargs), ([], {'key': {"foo": True}}))
-        id = str(uuid4())
-        result = runner.invoke(
-            main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', '@tests/test.json']
-        )
-        self.assert_normal_execution(result)
-        job = Job.fetch(id)
-        self.assertEqual((job.args, job.kwargs), ([open('tests/test.json', 'r').read()], {}))
-        id = str(uuid4())
-        result = runner.invoke(
-            main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', 'key=@tests/test.json']
-        )
-        self.assert_normal_execution(result)
-        job = Job.fetch(id)
-        self.assertEqual((job.args, job.kwargs), ([], {'key': open('tests/test.json', 'r').read()}))
-        id = str(uuid4())
-        result = runner.invoke(
-            main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', ':@tests/test.json']
-        )
-        self.assert_normal_execution(result)
-        job = Job.fetch(id)
-        self.assertEqual((job.args, job.kwargs), ([json.loads(open('tests/test.json', 'r').read())], {}))
-        id = str(uuid4())
-        result = runner.invoke(
-            main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', 'key:=@tests/test.json']
-        )
-        self.assert_normal_execution(result)
-        job = Job.fetch(id)
-        self.assertEqual((job.args, job.kwargs), ([], {'key': json.loads(open('tests/test.json', 'r').read())}))
-class WorkerPoolCLITestCase(CLITestCase):
-    def test_worker_pool_burst_and_num_workers(self):
-        """rq worker-pool -u <url> -b -n 3"""
-        runner = CliRunner()
-        result = runner.invoke(main, ['worker-pool', '-u', self.redis_url, '-b', '-n', '3'])
-        self.assert_normal_execution(result)
-    def test_serializer_and_queue_argument(self):
-        """rq worker-pool foo bar -u <url> -b"""
-        queue = Queue('foo', connection=self.connection, serializer=JSONSerializer)
-        job = queue.enqueue(say_hello, 'Hello')
-        queue = Queue('bar', connection=self.connection, serializer=JSONSerializer)
-        job_2 = queue.enqueue(say_hello, 'Hello')
-        runner = CliRunner()
-        runner.invoke(
-            main,
-            ['worker-pool', 'foo', 'bar', '-u', self.redis_url, '-b', '--serializer', 'rq.serializers.JSONSerializer'],
-        )
-        self.assertEqual(job.get_status(refresh=True), JobStatus.FINISHED)
-        self.assertEqual(job_2.get_status(refresh=True), JobStatus.FINISHED)
-    def test_worker_class_argument(self):
-        """rq worker-pool -u <url> -b --worker-class rq.Worker"""
-        runner = CliRunner()
-        result = runner.invoke(main, ['worker-pool', '-u', self.redis_url, '-b', '--worker-class', 'rq.Worker'])
-        self.assert_normal_execution(result)
-        result = runner.invoke(
-            main, ['worker-pool', '-u', self.redis_url, '-b', '--worker-class', 'rq.worker.SimpleWorker']
-        )
-        self.assert_normal_execution(result)
-        # This one fails because the worker class doesn't exist
-        result = runner.invoke(
-            main, ['worker-pool', '-u', self.redis_url, '-b', '--worker-class', 'rq.worker.NonExistantWorker']
-        )
-        self.assertNotEqual(result.exit_code, 0)
-    def test_job_class_argument(self):
-        """rq worker-pool -u <url> -b --job-class rq.job.Job"""
-        runner = CliRunner()
-        result = runner.invoke(main, ['worker-pool', '-u', self.redis_url, '-b', '--job-class', 'rq.job.Job'])
-        self.assert_normal_execution(result)
-        # This one fails because Job class doesn't exist
-        result = runner.invoke(
-            main, ['worker-pool', '-u', self.redis_url, '-b', '--job-class', 'rq.job.NonExistantJob']
-        )
-        self.assertNotEqual(result.exit_code, 0)
diff --git a/tests/ b/tests/
deleted file mode 100644
index 355b72a..0000000
--- a/tests/
+++ /dev/null
@@ -1,102 +0,0 @@
-import time
-from multiprocessing import Process
-from redis import Redis
-from rq import Queue, Worker
-from rq.command import send_command, send_kill_horse_command, send_shutdown_command, send_stop_job_command
-from rq.exceptions import InvalidJobOperation, NoSuchJobError
-from rq.serializers import JSONSerializer
-from rq.worker import WorkerStatus
-from tests import RQTestCase
-from tests.fixtures import _send_kill_horse_command, _send_shutdown_command, long_running_job
-def start_work(queue_name, worker_name, connection_kwargs):
-    worker = Worker(queue_name, name=worker_name, connection=Redis(**connection_kwargs))
-def start_work_burst(queue_name, worker_name, connection_kwargs):
-    worker = Worker(queue_name, name=worker_name, connection=Redis(**connection_kwargs), serializer=JSONSerializer)
-class TestCommands(RQTestCase):
-    def test_shutdown_command(self):
-        """Ensure that shutdown command works properly."""
-        connection = self.testconn
-        worker = Worker('foo', connection=connection)
-        p = Process(
-            target=_send_shutdown_command, args=(, connection.connection_pool.connection_kwargs.copy())
-        )
-        p.start()
-        p.join(1)
-    def test_kill_horse_command(self):
-        """Ensure that shutdown command works properly."""
-        connection = self.testconn
-        queue = Queue('foo', connection=connection)
-        job = queue.enqueue(long_running_job, 4)
-        worker = Worker('foo', connection=connection)
-        p = Process(
-            target=_send_kill_horse_command, args=(, connection.connection_pool.connection_kwargs.copy())
-        )
-        p.start()
-        p.join(1)
-        job.refresh()
-        self.assertTrue( in queue.failed_job_registry)
-        p = Process(target=start_work, args=('foo',, connection.connection_pool.connection_kwargs.copy()))
-        p.start()
-        p.join(2)
-        send_kill_horse_command(connection,
-        worker.refresh()
-        # Since worker is not busy, command will be ignored
-        self.assertEqual(worker.get_state(), WorkerStatus.IDLE)
-        send_shutdown_command(connection,
-    def test_stop_job_command(self):
-        """Ensure that stop_job command works properly."""
-        connection = self.testconn
-        queue = Queue('foo', connection=connection, serializer=JSONSerializer)
-        job = queue.enqueue(long_running_job, 3)
-        worker = Worker('foo', connection=connection, serializer=JSONSerializer)
-        # If job is not executing, an error is raised
-        with self.assertRaises(InvalidJobOperation):
-            send_stop_job_command(connection,, serializer=JSONSerializer)
-        # An exception is raised if job ID is invalid
-        with self.assertRaises(NoSuchJobError):
-            send_stop_job_command(connection, job_id='1', serializer=JSONSerializer)
-        p = Process(
-            target=start_work_burst, args=('foo',, connection.connection_pool.connection_kwargs.copy())
-        )
-        p.start()
-        p.join(1)
-        time.sleep(0.1)
-        send_command(connection,, 'stop-job', job_id=1)
-        time.sleep(0.25)
-        # Worker still working due to job_id mismatch
-        worker.refresh()
-        self.assertEqual(worker.get_state(), WorkerStatus.BUSY)
-        send_stop_job_command(connection,, serializer=JSONSerializer)
-        time.sleep(0.25)
-        # Job status is set appropriately
-        self.assertTrue(job.is_stopped)
-        # Worker has stopped working
-        worker.refresh()
-        self.assertEqual(worker.get_state(), WorkerStatus.IDLE)
diff --git a/tests/ b/tests/
deleted file mode 100644
index 5ac76d6..0000000
--- a/tests/
+++ /dev/null
@@ -1,51 +0,0 @@
-from redis import ConnectionPool, Redis, SSLConnection, UnixDomainSocketConnection
-from rq import Connection, Queue
-from rq.connections import parse_connection
-from tests import RQTestCase, find_empty_redis_database
-from tests.fixtures import do_nothing
-def new_connection():
-    return find_empty_redis_database()
-class TestConnectionInheritance(RQTestCase):
-    def test_connection_detection(self):
-        """Automatic detection of the connection."""
-        q = Queue()
-        self.assertEqual(q.connection, self.testconn)
-    def test_connection_stacking(self):
-        """Connection stacking."""
-        conn1 = Redis(db=4)
-        conn2 = Redis(db=5)
-        with Connection(conn1):
-            q1 = Queue()
-            with Connection(conn2):
-                q2 = Queue()
-        self.assertNotEqual(q1.connection, q2.connection)
-    def test_connection_pass_thru(self):
-        """Connection passed through from queues to jobs."""
-        q1 = Queue(connection=self.testconn)
-        with Connection(new_connection()):
-            q2 = Queue()
-        job1 = q1.enqueue(do_nothing)
-        job2 = q2.enqueue(do_nothing)
-        self.assertEqual(q1.connection, job1.connection)
-        self.assertEqual(q2.connection, job2.connection)
-    def test_parse_connection(self):
-        """Test parsing the connection"""
-        conn_class, pool_class, pool_kwargs = parse_connection(Redis(ssl=True))
-        self.assertEqual(conn_class, Redis)
-        self.assertEqual(pool_class, SSLConnection)
-        path = '/tmp/redis.sock'
-        pool = ConnectionPool(connection_class=UnixDomainSocketConnection, path=path)
-        conn_class, pool_class, pool_kwargs = parse_connection(Redis(connection_pool=pool))
-        self.assertEqual(conn_class, Redis)
-        self.assertEqual(pool_class, UnixDomainSocketConnection)
-        self.assertEqual(pool_kwargs, {"path": path})
diff --git a/tests/ b/tests/
deleted file mode 100644
index 69ddde1..0000000
--- a/tests/
+++ /dev/null
@@ -1,279 +0,0 @@
-from unittest import mock
-from redis import Redis
-from rq.decorators import job
-from rq.job import Job, Retry
-from rq.queue import Queue
-from rq.worker import DEFAULT_RESULT_TTL
-from tests import RQTestCase
-from tests.fixtures import decorated_job
-class TestDecorator(RQTestCase):
-    def setUp(self):
-        super().setUp()
-    def test_decorator_preserves_functionality(self):
-        """Ensure that a decorated function's functionality is still preserved."""
-        self.assertEqual(decorated_job(1, 2), 3)
-    def test_decorator_adds_delay_attr(self):
-        """Ensure that decorator adds a delay attribute to function that returns
-        a Job instance when called.
-        """
-        self.assertTrue(hasattr(decorated_job, 'delay'))
-        result = decorated_job.delay(1, 2)
-        self.assertTrue(isinstance(result, Job))
-        # Ensure that job returns the right result when performed
-        self.assertEqual(result.perform(), 3)
-    def test_decorator_accepts_queue_name_as_argument(self):
-        """Ensure that passing in queue name to the decorator puts the job in
-        the right queue.
-        """
-        @job(queue='queue_name')
-        def hello():
-            return 'Hi'
-        result = hello.delay()
-        self.assertEqual(result.origin, 'queue_name')
-    def test_decorator_accepts_result_ttl_as_argument(self):
-        """Ensure that passing in result_ttl to the decorator sets the
-        result_ttl on the job
-        """
-        # Ensure default
-        result = decorated_job.delay(1, 2)
-        self.assertEqual(result.result_ttl, DEFAULT_RESULT_TTL)
-        @job('default', result_ttl=10)
-        def hello():
-            return 'Why hello'
-        result = hello.delay()
-        self.assertEqual(result.result_ttl, 10)
-    def test_decorator_accepts_ttl_as_argument(self):
-        """Ensure that passing in ttl to the decorator sets the ttl on the job"""
-        # Ensure default
-        result = decorated_job.delay(1, 2)
-        self.assertEqual(result.ttl, None)
-        @job('default', ttl=30)
-        def hello():
-            return 'Hello'
-        result = hello.delay()
-        self.assertEqual(result.ttl, 30)
-    def test_decorator_accepts_meta_as_argument(self):
-        """Ensure that passing in meta to the decorator sets the meta on the job"""
-        # Ensure default
-        result = decorated_job.delay(1, 2)
-        self.assertEqual(result.meta, {})
-        test_meta = {
-            'metaKey1': 1,
-            'metaKey2': 2,
-        }
-        @job('default', meta=test_meta)
-        def hello():
-            return 'Hello'
-        result = hello.delay()
-        self.assertEqual(result.meta, test_meta)
-    def test_decorator_accepts_result_depends_on_as_argument(self):
-        """Ensure that passing in depends_on to the decorator sets the
-        correct dependency on the job
-        """
-        # Ensure default
-        result = decorated_job.delay(1, 2)
-        self.assertEqual(result.dependency, None)
-        self.assertEqual(result._dependency_id, None)
-        @job(queue='queue_name')
-        def foo():
-            return 'Firstly'
-        foo_job = foo.delay()
-        @job(queue='queue_name', depends_on=foo_job)
-        def bar():
-            return 'Secondly'
-        bar_job = bar.delay()
-        self.assertEqual(foo_job._dependency_ids, [])
-        self.assertIsNone(foo_job._dependency_id)
-        self.assertEqual(foo_job.dependency, None)
-        self.assertEqual(bar_job.dependency, foo_job)
-        self.assertEqual(,
-    def test_decorator_delay_accepts_depends_on_as_argument(self):
-        """Ensure that passing in depends_on to the delay method of
-        a decorated function overrides the depends_on set in the
-        constructor.
-        """
-        # Ensure default
-        result = decorated_job.delay(1, 2)
-        self.assertEqual(result.dependency, None)
-        self.assertEqual(result._dependency_id, None)
-        @job(queue='queue_name')
-        def foo():
-            return 'Firstly'
-        @job(queue='queue_name')
-        def bar():
-            return 'Firstly'
-        foo_job = foo.delay()
-        bar_job = bar.delay()
-        @job(queue='queue_name', depends_on=foo_job)
-        def baz():
-            return 'Secondly'
-        baz_job = bar.delay(depends_on=bar_job)
-        self.assertIsNone(foo_job._dependency_id)
-        self.assertIsNone(bar_job._dependency_id)
-        self.assertEqual(foo_job._dependency_ids, [])
-        self.assertEqual(bar_job._dependency_ids, [])
-        self.assertEqual(baz_job._dependency_id,
-        self.assertEqual(baz_job.dependency, bar_job)
-        self.assertEqual(,
-    def test_decorator_accepts_on_failure_function_as_argument(self):
-        """Ensure that passing in on_failure function to the decorator sets the
-        correct on_failure function on the job.
-        """
-        # Only functions and builtins are supported as callback
-        @job('default', on_failure=Job.fetch)
-        def foo():
-            return 'Foo'
-        with self.assertRaises(ValueError):
-            result = foo.delay()
-        @job('default', on_failure=print)
-        def hello():
-            return 'Hello'
-        result = hello.delay()
-        result_job = Job.fetch(, connection=self.testconn)
-        self.assertEqual(result_job.failure_callback, print)
-    def test_decorator_accepts_on_success_function_as_argument(self):
-        """Ensure that passing in on_failure function to the decorator sets the
-        correct on_success function on the job.
-        """
-        # Only functions and builtins are supported as callback
-        @job('default', on_failure=Job.fetch)
-        def foo():
-            return 'Foo'
-        with self.assertRaises(ValueError):
-            result = foo.delay()
-        @job('default', on_success=print)
-        def hello():
-            return 'Hello'
-        result = hello.delay()
-        result_job = Job.fetch(, connection=self.testconn)
-        self.assertEqual(result_job.success_callback, print)
-    @mock.patch('rq.queue.resolve_connection')
-    def test_decorator_connection_laziness(self, resolve_connection):
-        """Ensure that job decorator resolve connection in `lazy` way"""
-        resolve_connection.return_value = Redis()
-        @job(queue='queue_name')
-        def foo():
-            return 'do something'
-        self.assertEqual(resolve_connection.call_count, 0)
-        foo()
-        self.assertEqual(resolve_connection.call_count, 0)
-        foo.delay()
-        self.assertEqual(resolve_connection.call_count, 1)
-    def test_decorator_custom_queue_class(self):
-        """Ensure that a custom queue class can be passed to the job decorator"""
-        class CustomQueue(Queue):
-            pass
-        CustomQueue.enqueue_call = mock.MagicMock(spec=lambda *args, **kwargs: None, name='enqueue_call')
-        custom_decorator = job(queue='default', queue_class=CustomQueue)
-        self.assertIs(custom_decorator.queue_class, CustomQueue)
-        @custom_decorator
-        def custom_queue_class_job(x, y):
-            return x + y
-        custom_queue_class_job.delay(1, 2)
-        self.assertEqual(CustomQueue.enqueue_call.call_count, 1)
-    def test_decorate_custom_queue(self):
-        """Ensure that a custom queue instance can be passed to the job decorator"""
-        class CustomQueue(Queue):
-            pass
-        CustomQueue.enqueue_call = mock.MagicMock(spec=lambda *args, **kwargs: None, name='enqueue_call')
-        queue = CustomQueue()
-        @job(queue=queue)
-        def custom_queue_job(x, y):
-            return x + y
-        custom_queue_job.delay(1, 2)
-        self.assertEqual(queue.enqueue_call.call_count, 1)
-    def test_decorator_custom_failure_ttl(self):
-        """Ensure that passing in failure_ttl to the decorator sets the
-        failure_ttl on the job
-        """
-        # Ensure default
-        result = decorated_job.delay(1, 2)
-        self.assertEqual(result.failure_ttl, None)
-        @job('default', failure_ttl=10)
-        def hello():
-            return 'Why hello'
-        result = hello.delay()
-        self.assertEqual(result.failure_ttl, 10)
-    def test_decorator_custom_retry(self):
-        """Ensure that passing in retry to the decorator sets the
-        retry on the job
-        """
-        # Ensure default
-        result = decorated_job.delay(1, 2)
-        self.assertEqual(result.retries_left, None)
-        self.assertEqual(result.retry_intervals, None)
-        @job('default', retry=Retry(3, [2]))
-        def hello():
-            return 'Why hello'
-        result = hello.delay()
-        self.assertEqual(result.retries_left, 3)
-        self.assertEqual(result.retry_intervals, [2])
diff --git a/tests/ b/tests/
deleted file mode 100644
index b4e2842..0000000
--- a/tests/
+++ /dev/null
@@ -1,198 +0,0 @@
-from rq import Queue, SimpleWorker, Worker
-from rq.job import Dependency, Job, JobStatus
-from tests import RQTestCase
-from tests.fixtures import check_dependencies_are_met, div_by_zero, say_hello
-class TestDependencies(RQTestCase):
-    def test_allow_failure_is_persisted(self):
-        """Ensure that job.allow_dependency_failures is properly set
-        when providing Dependency object to depends_on."""
-        dep_job = Job.create(func=say_hello)
-        # default to False, maintaining current behavior
-        job = Job.create(func=say_hello, depends_on=Dependency([dep_job]))
-        Job.fetch(, connection=self.testconn)
-        self.assertFalse(job.allow_dependency_failures)
-        job = Job.create(func=say_hello, depends_on=Dependency([dep_job], allow_failure=True))
-        job = Job.fetch(, connection=self.testconn)
-        self.assertTrue(job.allow_dependency_failures)
-        jobs = Job.fetch_many([], connection=self.testconn)
-        self.assertTrue(jobs[0].allow_dependency_failures)
-    def test_job_dependency(self):
-        """Enqueue dependent jobs only when appropriate"""
-        q = Queue(connection=self.testconn)
-        w = SimpleWorker([q], connection=q.connection)
-        # enqueue dependent job when parent successfully finishes
-        parent_job = q.enqueue(say_hello)
-        job = q.enqueue_call(say_hello, depends_on=parent_job)
-        job = Job.fetch(, connection=self.testconn)
-        self.assertEqual(job.get_status(), JobStatus.FINISHED)
-        q.empty()
-        # don't enqueue dependent job when parent fails
-        parent_job = q.enqueue(div_by_zero)
-        job = q.enqueue_call(say_hello, depends_on=parent_job)
-        job = Job.fetch(, connection=self.testconn)
-        self.assertNotEqual(job.get_status(), JobStatus.FINISHED)
-        q.empty()
-        # don't enqueue dependent job when Dependency.allow_failure=False (the default)
-        parent_job = q.enqueue(div_by_zero)
-        dependency = Dependency(jobs=parent_job)
-        job = q.enqueue_call(say_hello, depends_on=dependency)
-        job = Job.fetch(, connection=self.testconn)
-        self.assertNotEqual(job.get_status(), JobStatus.FINISHED)
-        # enqueue dependent job when Dependency.allow_failure=True
-        parent_job = q.enqueue(div_by_zero)
-        dependency = Dependency(jobs=parent_job, allow_failure=True)
-        job = q.enqueue_call(say_hello, depends_on=dependency)
-        job = Job.fetch(, connection=self.testconn)
-        self.assertTrue(job.allow_dependency_failures)
-        job = Job.fetch(, connection=self.testconn)
-        self.assertEqual(job.get_status(), JobStatus.FINISHED)
-        # When a failing job has multiple dependents, only enqueue those
-        # with allow_failure=True
-        parent_job = q.enqueue(div_by_zero)
-        job_allow_failure = q.enqueue(say_hello, depends_on=Dependency(jobs=parent_job, allow_failure=True))
-        job = q.enqueue(say_hello, depends_on=Dependency(jobs=parent_job, allow_failure=False))
-, max_jobs=1)
-        self.assertEqual(parent_job.get_status(), JobStatus.FAILED)
-        self.assertEqual(job_allow_failure.get_status(), JobStatus.QUEUED)
-        self.assertEqual(job.get_status(), JobStatus.DEFERRED)
-        q.empty()
-        # only enqueue dependent job when all dependencies have finished/failed
-        first_parent_job = q.enqueue(div_by_zero)
-        second_parent_job = q.enqueue(say_hello)
-        dependencies = Dependency(jobs=[first_parent_job, second_parent_job], allow_failure=True)
-        job = q.enqueue_call(say_hello, depends_on=dependencies)
-, max_jobs=1)
-        self.assertEqual(first_parent_job.get_status(), JobStatus.FAILED)
-        self.assertEqual(second_parent_job.get_status(), JobStatus.QUEUED)
-        self.assertEqual(job.get_status(), JobStatus.DEFERRED)
-        # When second job finishes, dependent job should be queued
-, max_jobs=1)
-        self.assertEqual(second_parent_job.get_status(), JobStatus.FINISHED)
-        self.assertEqual(job.get_status(), JobStatus.QUEUED)
-        job = Job.fetch(, connection=self.testconn)
-        self.assertEqual(job.get_status(), JobStatus.FINISHED)
-        # Test dependant is enqueued at front
-        q.empty()
-        parent_job = q.enqueue(say_hello)
-        q.enqueue(say_hello, job_id='fake_job_id_1', depends_on=Dependency(jobs=[parent_job]))
-        q.enqueue(say_hello, job_id='fake_job_id_2', depends_on=Dependency(jobs=[parent_job], enqueue_at_front=True))
-, max_jobs=1)
-        self.assertEqual(q.job_ids, ["fake_job_id_2", "fake_job_id_1"])
-    def test_multiple_jobs_with_dependencies(self):
-        """Enqueue dependent jobs only when appropriate"""
-        q = Queue(connection=self.testconn)
-        w = SimpleWorker([q], connection=q.connection)
-        # Multiple jobs are enqueued with correct status
-        parent_job = q.enqueue(say_hello)
-        job_no_deps = Queue.prepare_data(say_hello)
-        job_with_deps = Queue.prepare_data(say_hello, depends_on=parent_job)
-        jobs = q.enqueue_many([job_no_deps, job_with_deps])
-        self.assertEqual(jobs[0].get_status(), JobStatus.QUEUED)
-        self.assertEqual(jobs[1].get_status(), JobStatus.DEFERRED)
-, max_jobs=1)
-        self.assertEqual(jobs[1].get_status(), JobStatus.QUEUED)
-        job_with_met_deps = Queue.prepare_data(say_hello, depends_on=parent_job)
-        jobs = q.enqueue_many([job_with_met_deps])
-        self.assertEqual(jobs[0].get_status(), JobStatus.QUEUED)
-        q.empty()
-    def test_dependency_list_in_depends_on(self):
-        """Enqueue with Dependency list in depends_on"""
-        q = Queue(connection=self.testconn)
-        w = SimpleWorker([q], connection=q.connection)
-        # enqueue dependent job when parent successfully finishes
-        parent_job1 = q.enqueue(say_hello)
-        parent_job2 = q.enqueue(say_hello)
-        job = q.enqueue_call(say_hello, depends_on=[Dependency([parent_job1]), Dependency([parent_job2])])
-        self.assertEqual(job.get_status(), JobStatus.FINISHED)
-    def test_enqueue_job_dependency(self):
-        """Enqueue via Queue.enqueue_job() with depencency"""
-        q = Queue(connection=self.testconn)
-        w = SimpleWorker([q], connection=q.connection)
-        # enqueue dependent job when parent successfully finishes
-        parent_job = Job.create(say_hello)
-        job = Job.create(say_hello, depends_on=parent_job)
-        q.enqueue_job(job)
-        self.assertEqual(job.get_status(), JobStatus.DEFERRED)
-        q.enqueue_job(parent_job)
-        self.assertEqual(parent_job.get_status(), JobStatus.FINISHED)
-        self.assertEqual(job.get_status(), JobStatus.FINISHED)
-    def test_dependencies_are_met_if_parent_is_canceled(self):
-        """When parent job is canceled, it should be treated as failed"""
-        queue = Queue(connection=self.testconn)
-        job = queue.enqueue(say_hello)
-        job.set_status(JobStatus.CANCELED)
-        dependent_job = queue.enqueue(say_hello, depends_on=job)
-        # dependencies_are_met() should return False, whether or not
-        # parent_job is provided
-        self.assertFalse(dependent_job.dependencies_are_met(job))
-        self.assertFalse(dependent_job.dependencies_are_met())
-    def test_can_enqueue_job_if_dependency_is_deleted(self):
-        queue = Queue(connection=self.testconn)
-        dependency_job = queue.enqueue(say_hello, result_ttl=0)
-        w = Worker([queue])
-        assert queue.enqueue(say_hello, depends_on=dependency_job)
-    def test_dependencies_are_met_if_dependency_is_deleted(self):
-        queue = Queue(connection=self.testconn)
-        dependency_job = queue.enqueue(say_hello, result_ttl=0)
-        dependent_job = queue.enqueue(say_hello, depends_on=dependency_job)
-        w = Worker([queue])
-, max_jobs=1)
-        assert dependent_job.dependencies_are_met()
-        assert dependent_job.get_status() == JobStatus.QUEUED
-    def test_dependencies_are_met_at_execution_time(self):
-        queue = Queue(connection=self.testconn)
-        queue.empty()
-        queue.enqueue(say_hello, job_id="A")
-        queue.enqueue(say_hello, job_id="B")
-        job_c = queue.enqueue(check_dependencies_are_met, job_id="C", depends_on=["A", "B"])
-        job_c.dependencies_are_met()
-        w = Worker([queue])
-        assert job_c.result
diff --git a/tests/ b/tests/
deleted file mode 100644
index 1517b80..0000000
--- a/tests/
+++ /dev/null
@@ -1,15 +0,0 @@
-from rq import Queue
-from tests import RQTestCase, fixtures
-class TestFixtures(RQTestCase):
-    def test_rpush_fixture(self):
-        fixtures.rpush('foo', 'bar')
-        assert self.testconn.lrange('foo', 0, 0)[0].decode() == 'bar'
-    def test_start_worker_fixture(self):
-        queue = Queue(name='testing', connection=self.testconn)
-        queue.enqueue(fixtures.say_hello)
-        conn_kwargs = self.testconn.connection_pool.connection_kwargs
-        fixtures.start_worker(, conn_kwargs, 'w1', True)
-        assert not
diff --git a/tests/ b/tests/
deleted file mode 100644
index c351b77..0000000
--- a/tests/
+++ /dev/null
@@ -1,89 +0,0 @@
-from unittest import mock
-from rq.cli.helpers import get_redis_from_config
-from tests import RQTestCase
-class TestHelpers(RQTestCase):
-    @mock.patch('rq.cli.helpers.Sentinel')
-    def test_get_redis_from_config(self, sentinel_class_mock):
-        """Ensure Redis connection params are properly parsed"""
-        settings = {'REDIS_URL': 'redis://localhost:1/1'}
-        # Ensure REDIS_URL is read
-        redis = get_redis_from_config(settings)
-        connection_kwargs = redis.connection_pool.connection_kwargs
-        self.assertEqual(connection_kwargs['db'], 1)
-        self.assertEqual(connection_kwargs['port'], 1)
-        settings = {
-            'REDIS_URL': 'redis://localhost:1/1',
-            'REDIS_HOST': 'foo',
-            'REDIS_DB': 2,
-            'REDIS_PORT': 2,
-            'REDIS_PASSWORD': 'bar',
-        }
-        # Ensure REDIS_URL is preferred
-        redis = get_redis_from_config(settings)
-        connection_kwargs = redis.connection_pool.connection_kwargs
-        self.assertEqual(connection_kwargs['db'], 1)
-        self.assertEqual(connection_kwargs['port'], 1)
-        # Ensure fall back to regular connection parameters
-        settings['REDIS_URL'] = None
-        redis = get_redis_from_config(settings)
-        connection_kwargs = redis.connection_pool.connection_kwargs
-        self.assertEqual(connection_kwargs['host'], 'foo')
-        self.assertEqual(connection_kwargs['db'], 2)
-        self.assertEqual(connection_kwargs['port'], 2)
-        self.assertEqual(connection_kwargs['password'], 'bar')
-        # Add Sentinel to the settings
-        settings.update(
-            {
-                'SENTINEL': {
-                    'INSTANCES': [
-                        ('', 26379),
-                        ('', 26379),
-                        ('', 26379),
-                    ],
-                    'MASTER_NAME': 'master',
-                    'DB': 2,
-                    'USERNAME': 'redis-user',
-                    'PASSWORD': 'redis-secret',
-                    'SOCKET_TIMEOUT': None,
-                    'CONNECTION_KWARGS': {
-                        'ssl_ca_path': None,
-                    },
-                    'SENTINEL_KWARGS': {
-                        'username': 'sentinel-user',
-                        'password': 'sentinel-secret',
-                    },
-                },
-            }
-        )
-        # Ensure SENTINEL is preferred against REDIS_* parameters
-        redis = get_redis_from_config(settings)
-        sentinel_init_sentinels_args = sentinel_class_mock.call_args[0]
-        sentinel_init_sentinel_kwargs = sentinel_class_mock.call_args[1]
-        self.assertEqual(
-            sentinel_init_sentinels_args,
-            ([('', 26379), ('', 26379), ('', 26379)],),
-        )
-        self.assertDictEqual(
-            sentinel_init_sentinel_kwargs,
-            {
-                'db': 2,
-                'ssl': False,
-                'username': 'redis-user',
-                'password': 'redis-secret',
-                'socket_timeout': None,
-                'ssl_ca_path': None,
-                'sentinel_kwargs': {
-                    'username': 'sentinel-user',
-                    'password': 'sentinel-secret',
-                },
-            },
-        )
diff --git a/tests/ b/tests/
deleted file mode 100644
index 29c309f..0000000
--- a/tests/
+++ /dev/null
@@ -1,1221 +0,0 @@
-import json
-import queue
-import time
-import zlib
-from datetime import datetime, timedelta
-from pickle import dumps, loads
-from redis import WatchError
-from rq.defaults import CALLBACK_TIMEOUT
-from rq.exceptions import DeserializationError, InvalidJobOperation, NoSuchJobError
-from rq.job import Callback, Dependency, Job, JobStatus, cancel_job, get_current_job
-from rq.queue import Queue
-from rq.registry import (
-    CanceledJobRegistry,
-    DeferredJobRegistry,
-    FailedJobRegistry,
-    FinishedJobRegistry,
-    ScheduledJobRegistry,
-    StartedJobRegistry,
-from rq.serializers import JSONSerializer
-from rq.utils import as_text, utcformat, utcnow
-from rq.worker import Worker
-from tests import RQTestCase, fixtures
-class TestJob(RQTestCase):
-    def test_unicode(self):
-        """Unicode in job description [issue405]"""
-        job = Job.create(
-            'myfunc',
-            args=[12, "☃"],
-            kwargs=dict(snowman="☃", null=None),
-        )
-        self.assertEqual(
-            job.description,
-            "myfunc(12, '☃', null=None, snowman='☃')",
-        )
-    def test_create_empty_job(self):
-        """Creation of new empty jobs."""
-        job = Job()
-        job.description = 'test job'
-        # Jobs have a random UUID and a creation date
-        self.assertIsNotNone(
-        self.assertIsNotNone(job.created_at)
-        self.assertEqual(str(job), "<Job %s: test job>" %
-        # ...and nothing else
-        self.assertEqual(job.origin, '')
-        self.assertIsNone(job.enqueued_at)
-        self.assertIsNone(job.started_at)
-        self.assertIsNone(job.ended_at)
-        self.assertIsNone(job.result)
-        self.assertIsNone(job.exc_info)
-        with self.assertRaises(DeserializationError):
-            job.func
-        with self.assertRaises(DeserializationError):
-            job.instance
-        with self.assertRaises(DeserializationError):
-            job.args
-        with self.assertRaises(DeserializationError):
-            job.kwargs
-    def test_create_param_errors(self):
-        """Creation of jobs may result in errors"""
-        self.assertRaises(TypeError, Job.create, fixtures.say_hello, args="string")
-        self.assertRaises(TypeError, Job.create, fixtures.say_hello, kwargs="string")
-        self.assertRaises(TypeError, Job.create, func=42)
-    def test_create_typical_job(self):
-        """Creation of jobs for function calls."""
-        job = Job.create(func=fixtures.some_calculation, args=(3, 4), kwargs=dict(z=2))
-        # Jobs have a random UUID
-        self.assertIsNotNone(
-        self.assertIsNotNone(job.created_at)
-        self.assertIsNotNone(job.description)
-        self.assertIsNone(job.instance)
-        # Job data is set...
-        self.assertEqual(job.func, fixtures.some_calculation)
-        self.assertEqual(job.args, (3, 4))
-        self.assertEqual(job.kwargs, {'z': 2})
-        # ...but metadata is not
-        self.assertEqual(job.origin, '')
-        self.assertIsNone(job.enqueued_at)
-        self.assertIsNone(job.result)
-    def test_create_instance_method_job(self):
-        """Creation of jobs for instance methods."""
-        n = fixtures.Number(2)
-        job = Job.create(func=n.div, args=(4,))
-        # Job data is set
-        self.assertEqual(job.func, n.div)
-        self.assertEqual(job.instance, n)
-        self.assertEqual(job.args, (4,))
-    def test_create_job_with_serializer(self):
-        """Creation of jobs with serializer for instance methods."""
-        # Test using json serializer
-        n = fixtures.Number(2)
-        job = Job.create(func=n.div, args=(4,), serializer=json)
-        self.assertIsNotNone(job.serializer)
-        self.assertEqual(job.func, n.div)
-        self.assertEqual(job.instance, n)
-        self.assertEqual(job.args, (4,))
-    def test_create_job_from_string_function(self):
-        """Creation of jobs using string specifier."""
-        job = Job.create(func='tests.fixtures.say_hello', args=('World',))
-        # Job data is set
-        self.assertEqual(job.func, fixtures.say_hello)
-        self.assertIsNone(job.instance)
-        self.assertEqual(job.args, ('World',))
-    def test_create_job_from_callable_class(self):
-        """Creation of jobs using a callable class specifier."""
-        kallable = fixtures.CallableObject()
-        job = Job.create(func=kallable)
-        self.assertEqual(job.func, kallable.__call__)
-        self.assertEqual(job.instance, kallable)
-    def test_job_properties_set_data_property(self):
-        """Data property gets derived from the job tuple."""
-        job = Job()
-        job.func_name = 'foo'
-        fname, instance, args, kwargs = loads(
-        self.assertEqual(fname, job.func_name)
-        self.assertEqual(instance, None)
-        self.assertEqual(args, ())
-        self.assertEqual(kwargs, {})
-    def test_data_property_sets_job_properties(self):
-        """Job tuple gets derived lazily from data property."""
-        job = Job()
- = dumps(('foo', None, (1, 2, 3), {'bar': 'qux'}))
-        self.assertEqual(job.func_name, 'foo')
-        self.assertEqual(job.instance, None)
-        self.assertEqual(job.args, (1, 2, 3))
-        self.assertEqual(job.kwargs, {'bar': 'qux'})
-    def test_save(self):  # noqa
-        """Storing jobs."""
-        job = Job.create(func=fixtures.some_calculation, args=(3, 4), kwargs=dict(z=2))
-        # Saving creates a Redis hash
-        self.assertEqual(self.testconn.exists(job.key), False)
-        self.assertEqual(self.testconn.type(job.key), b'hash')
-        # Saving writes pickled job data
-        unpickled_data = loads(zlib.decompress(self.testconn.hget(job.key, 'data')))
-        self.assertEqual(unpickled_data[0], 'tests.fixtures.some_calculation')
-    def test_fetch(self):
-        """Fetching jobs."""
-        # Prepare test
-        self.testconn.hset(
-            'rq:job:some_id', 'data', "(S'tests.fixtures.some_calculation'\nN(I3\nI4\nt(dp1\nS'z'\nI2\nstp2\n."
-        )
-        self.testconn.hset('rq:job:some_id', 'created_at', '2012-02-07T22:13:24.123456Z')
-        # Fetch returns a job
-        job = Job.fetch('some_id')
-        self.assertEqual(, 'some_id')
-        self.assertEqual(job.func_name, 'tests.fixtures.some_calculation')
-        self.assertIsNone(job.instance)
-        self.assertEqual(job.args, (3, 4))
-        self.assertEqual(job.kwargs, dict(z=2))
-        self.assertEqual(job.created_at, datetime(2012, 2, 7, 22, 13, 24, 123456))
-    def test_fetch_many(self):
-        """Fetching many jobs at once."""
-        data = {
-            'func': fixtures.some_calculation,
-            'args': (3, 4),
-            'kwargs': dict(z=2),
-            'connection': self.testconn,
-        }
-        job = Job.create(**data)
-        job2 = Job.create(**data)
-        jobs = Job.fetch_many([,, 'invalid_id'], self.testconn)
-        self.assertEqual(jobs, [job, job2, None])
-    def test_persistence_of_empty_jobs(self):  # noqa
-        """Storing empty jobs."""
-        job = Job()
-        with self.assertRaises(ValueError):
-    def test_persistence_of_typical_jobs(self):
-        """Storing typical jobs."""
-        job = Job.create(func=fixtures.some_calculation, args=(3, 4), kwargs=dict(z=2))
-        stored_date = self.testconn.hget(job.key, 'created_at').decode('utf-8')
-        self.assertEqual(stored_date, utcformat(job.created_at))
-        # ... and no other keys are stored
-        self.assertEqual(
-            {
-                b'created_at',
-                b'data',
-                b'description',
-                b'ended_at',
-                b'last_heartbeat',
-                b'started_at',
-                b'worker_name',
-                b'success_callback_name',
-                b'failure_callback_name',
-                b'stopped_callback_name',
-            },
-            set(self.testconn.hkeys(job.key)),
-        )
-        self.assertEqual(job.last_heartbeat, None)
-        self.assertEqual(job.last_heartbeat, None)
-        ts = utcnow()
-        job.heartbeat(ts, 0)
-        self.assertEqual(job.last_heartbeat, ts)
-    def test_persistence_of_parent_job(self):
-        """Storing jobs with parent job, either instance or key."""
-        parent_job = Job.create(func=fixtures.some_calculation)
-        job = Job.create(func=fixtures.some_calculation, depends_on=parent_job)
-        stored_job = Job.fetch(
-        self.assertEqual(stored_job._dependency_id,
-        self.assertEqual(stored_job._dependency_ids, [])
-        self.assertEqual(,
-        self.assertEqual(stored_job.dependency, parent_job)
-        job = Job.create(func=fixtures.some_calculation,
-        stored_job = Job.fetch(
-        self.assertEqual(stored_job._dependency_id,
-        self.assertEqual(stored_job._dependency_ids, [])
-        self.assertEqual(,
-        self.assertEqual(stored_job.dependency, parent_job)
-    def test_persistence_of_callbacks(self):
-        """Storing jobs with success and/or failure callbacks."""
-        job = Job.create(
-            func=fixtures.some_calculation,
-            on_success=Callback(fixtures.say_hello, timeout=10),
-            on_failure=fixtures.say_pid,
-            on_stopped=fixtures.say_hello,
-        )  # deprecated callable
-        stored_job = Job.fetch(
-        self.assertEqual(fixtures.say_hello, stored_job.success_callback)
-        self.assertEqual(10, stored_job.success_callback_timeout)
-        self.assertEqual(fixtures.say_pid, stored_job.failure_callback)
-        self.assertEqual(fixtures.say_hello, stored_job.stopped_callback)
-        self.assertEqual(CALLBACK_TIMEOUT, stored_job.failure_callback_timeout)
-        self.assertEqual(CALLBACK_TIMEOUT, stored_job.stopped_callback_timeout)
-        # None(s)
-        job = Job.create(func=fixtures.some_calculation, on_failure=None)
-        stored_job = Job.fetch(
-        self.assertIsNone(stored_job.success_callback)
-        self.assertEqual(CALLBACK_TIMEOUT, job.success_callback_timeout)  # timeout should be never none
-        self.assertEqual(CALLBACK_TIMEOUT, stored_job.success_callback_timeout)
-        self.assertIsNone(stored_job.failure_callback)
-        self.assertEqual(CALLBACK_TIMEOUT, job.failure_callback_timeout)  # timeout should be never none
-        self.assertEqual(CALLBACK_TIMEOUT, stored_job.failure_callback_timeout)
-        self.assertEqual(CALLBACK_TIMEOUT, job.stopped_callback_timeout)  # timeout should be never none
-        self.assertIsNone(stored_job.stopped_callback)
-    def test_store_then_fetch(self):
-        """Store, then fetch."""
-        job = Job.create(func=fixtures.some_calculation, timeout='1h', args=(3, 4), kwargs=dict(z=2))
-        job2 = Job.fetch(
-        self.assertEqual(job.func, job2.func)
-        self.assertEqual(job.args, job2.args)
-        self.assertEqual(job.kwargs, job2.kwargs)
-        self.assertEqual(job.timeout, job2.timeout)
-        # Mathematical equation
-        self.assertEqual(job, job2)
-    def test_fetching_can_fail(self):
-        """Fetching fails for non-existing jobs."""
-        with self.assertRaises(NoSuchJobError):
-            Job.fetch('b4a44d44-da16-4620-90a6-798e8cd72ca0')
-    def test_fetching_unreadable_data(self):
-        """Fetching succeeds on unreadable data, but lazy props fail."""
-        # Set up
-        job = Job.create(func=fixtures.some_calculation, args=(3, 4), kwargs=dict(z=2))
-        # Just replace the data hkey with some random noise
-        self.testconn.hset(job.key, 'data', 'this is no pickle string')
-        job.refresh()
-        for attr in ('func_name', 'instance', 'args', 'kwargs'):
-            with self.assertRaises(Exception):
-                getattr(job, attr)
-    def test_job_is_unimportable(self):
-        """Jobs that cannot be imported throw exception on access."""
-        job = Job.create(func=fixtures.say_hello, args=('Lionel',))
-        # Now slightly modify the job to make it unimportable (this is
-        # equivalent to a worker not having the most up-to-date source code
-        # and unable to import the function)
-        job_data =
-        unimportable_data = job_data.replace(b'say_hello', b'nay_hello')
-        self.testconn.hset(job.key, 'data', zlib.compress(unimportable_data))
-        job.refresh()
-        with self.assertRaises(ValueError):
-            job.func  # accessing the func property should fail
-    def test_compressed_exc_info_handling(self):
-        """Jobs handle both compressed and uncompressed exc_info"""
-        exception_string = 'Some exception'
-        job = Job.create(func=fixtures.say_hello, args=('Lionel',))
-        job._exc_info = exception_string
-        # exc_info is stored in compressed format
-        exc_info = self.testconn.hget(job.key, 'exc_info')
-        self.assertEqual(as_text(zlib.decompress(exc_info)), exception_string)
-        job.refresh()
-        self.assertEqual(job.exc_info, exception_string)
-        # Uncompressed exc_info is also handled
-        self.testconn.hset(job.key, 'exc_info', exception_string)
-        job.refresh()
-        self.assertEqual(job.exc_info, exception_string)
-    def test_compressed_job_data_handling(self):
-        """Jobs handle both compressed and uncompressed data"""
-        job = Job.create(func=fixtures.say_hello, args=('Lionel',))
-        # Job data is stored in compressed format
-        job_data =
-        self.assertEqual(zlib.compress(job_data), self.testconn.hget(job.key, 'data'))
-        self.testconn.hset(job.key, 'data', job_data)
-        job.refresh()
-        self.assertEqual(, job_data)
-    def test_custom_meta_is_persisted(self):
-        """Additional meta data on jobs are stored persisted correctly."""
-        job = Job.create(func=fixtures.say_hello, args=('Lionel',))
-        job.meta['foo'] = 'bar'
-        raw_data = self.testconn.hget(job.key, 'meta')
-        self.assertEqual(loads(raw_data)['foo'], 'bar')
-        job2 = Job.fetch(
-        self.assertEqual(job2.meta['foo'], 'bar')
-    def test_get_meta(self):
-        """Test get_meta() function"""
-        job = Job.create(func=fixtures.say_hello, args=('Lionel',))
-        job.meta['foo'] = 'bar'
-        self.assertEqual(job.get_meta()['foo'], 'bar')
-        # manually write different data in meta
-        self.testconn.hset(job.key, 'meta', dumps({'fee': 'boo'}))
-        # check if refresh=False keeps old data
-        self.assertEqual(job.get_meta(False)['foo'], 'bar')
-        # check if meta is updated
-        self.assertEqual(job.get_meta()['fee'], 'boo')
-    def test_custom_meta_is_rewriten_by_save_meta(self):
-        """New meta data can be stored by save_meta."""
-        job = Job.create(func=fixtures.say_hello, args=('Lionel',))
-        serialized = job.to_dict()
-        job.meta['foo'] = 'bar'
-        job.save_meta()
-        raw_meta = self.testconn.hget(job.key, 'meta')
-        self.assertEqual(loads(raw_meta)['foo'], 'bar')
-        job2 = Job.fetch(
-        self.assertEqual(job2.meta['foo'], 'bar')
-        # nothing else was changed
-        serialized2 = job2.to_dict()
-        serialized2.pop('meta')
-        self.assertDictEqual(serialized, serialized2)
-    def test_unpickleable_result(self):
-        """Unpickleable job result doesn't crash and job.refresh()"""
-        job = Job.create(func=fixtures.say_hello, args=('Lionel',))
-        job._result = queue.Queue()
-        self.assertEqual(self.testconn.hget(job.key, 'result').decode('utf-8'), 'Unserializable return value')
-        job = Job.fetch(
-        self.assertEqual(job.result, 'Unserializable return value')
-    def test_result_ttl_is_persisted(self):
-        """Ensure that job's result_ttl is set properly"""
-        job = Job.create(func=fixtures.say_hello, args=('Lionel',), result_ttl=10)
-        Job.fetch(, connection=self.testconn)
-        self.assertEqual(job.result_ttl, 10)
-        job = Job.create(func=fixtures.say_hello, args=('Lionel',))
-        Job.fetch(, connection=self.testconn)
-        self.assertEqual(job.result_ttl, None)
-    def test_failure_ttl_is_persisted(self):
-        """Ensure job.failure_ttl is set and restored properly"""
-        job = Job.create(func=fixtures.say_hello, args=('Lionel',), failure_ttl=15)
-        Job.fetch(, connection=self.testconn)
-        self.assertEqual(job.failure_ttl, 15)
-        job = Job.create(func=fixtures.say_hello, args=('Lionel',))
-        Job.fetch(, connection=self.testconn)
-        self.assertEqual(job.failure_ttl, None)
-    def test_description_is_persisted(self):
-        """Ensure that job's custom description is set properly"""
-        job = Job.create(func=fixtures.say_hello, args=('Lionel',), description='Say hello!')
-        Job.fetch(, connection=self.testconn)
-        self.assertEqual(job.description, 'Say hello!')
-        # Ensure job description is constructed from function call string
-        job = Job.create(func=fixtures.say_hello, args=('Lionel',))
-        Job.fetch(, connection=self.testconn)
-        self.assertEqual(job.description, "tests.fixtures.say_hello('Lionel')")
-    def test_dependency_parameter_constraints(self):
-        """Ensures the proper constraints are in place for values passed in as job references."""
-        dep_job = Job.create(func=fixtures.say_hello)
-        # raise error on empty jobs
-        self.assertRaises(ValueError, Dependency, jobs=[])
-        # raise error on non-str/Job value in jobs iterable
-        self.assertRaises(ValueError, Dependency, jobs=[dep_job, 1])
-    def test_multiple_dependencies_are_accepted_and_persisted(self):
-        """Ensure job._dependency_ids accepts different input formats, and
-        is set and restored properly"""
-        job_A = Job.create(func=fixtures.some_calculation, args=(3, 1, 4), id="A")
-        job_B = Job.create(func=fixtures.some_calculation, args=(2, 7, 2), id="B")
-        # No dependencies
-        job = Job.create(func=fixtures.say_hello)
-        Job.fetch(, connection=self.testconn)
-        self.assertEqual(job._dependency_ids, [])
-        # Various ways of specifying dependencies
-        cases = [
-            ["A", ["A"]],
-            [job_A, ["A"]],
-            [["A", "B"], ["A", "B"]],
-            [[job_A, job_B], ["A", "B"]],
-            [["A", job_B], ["A", "B"]],
-            [("A", "B"), ["A", "B"]],
-            [(job_A, job_B), ["A", "B"]],
-            [(job_A, "B"), ["A", "B"]],
-            [Dependency("A"), ["A"]],
-            [Dependency(job_A), ["A"]],
-            [Dependency(["A", "B"]), ["A", "B"]],
-            [Dependency([job_A, job_B]), ["A", "B"]],
-            [Dependency(["A", job_B]), ["A", "B"]],
-            [Dependency(("A", "B")), ["A", "B"]],
-            [Dependency((job_A, job_B)), ["A", "B"]],
-            [Dependency((job_A, "B")), ["A", "B"]],
-        ]
-        for given, expected in cases:
-            job = Job.create(func=fixtures.say_hello, depends_on=given)
-            Job.fetch(, connection=self.testconn)
-            self.assertEqual(job._dependency_ids, expected)
-    def test_prepare_for_execution(self):
-        """job.prepare_for_execution works properly"""
-        job = Job.create(func=fixtures.say_hello)
-        with self.testconn.pipeline() as pipeline:
-            job.prepare_for_execution("worker_name", pipeline)
-            pipeline.execute()
-        job.refresh()
-        self.assertEqual(job.worker_name, "worker_name")
-        self.assertEqual(job.get_status(), JobStatus.STARTED)
-        self.assertIsNotNone(job.last_heartbeat)
-        self.assertIsNotNone(job.started_at)
-    def test_job_access_outside_job_fails(self):
-        """The current job is accessible only within a job context."""
-        self.assertIsNone(get_current_job())
-    def test_job_access_within_job_function(self):
-        """The current job is accessible within the job function."""
-        q = Queue()
-        job = q.enqueue(fixtures.access_self)
-        w = Worker([q])
-        # access_self calls get_current_job() and executes successfully
-        self.assertEqual(job.get_status(), JobStatus.FINISHED)
-    def test_job_access_within_synchronous_job_function(self):
-        queue = Queue(is_async=False)
-        queue.enqueue(fixtures.access_self)
-    def test_job_async_status_finished(self):
-        queue = Queue(is_async=False)
-        job = queue.enqueue(fixtures.say_hello)
-        self.assertEqual(job.result, 'Hi there, Stranger!')
-        self.assertEqual(job.get_status(), JobStatus.FINISHED)
-    def test_enqueue_job_async_status_finished(self):
-        queue = Queue(is_async=False)
-        job = Job.create(func=fixtures.say_hello)
-        job = queue.enqueue_job(job)
-        self.assertEqual(job.result, 'Hi there, Stranger!')
-        self.assertEqual(job.get_status(), JobStatus.FINISHED)
-    def test_get_result_ttl(self):
-        """Getting job result TTL."""
-        job_result_ttl = 1
-        default_ttl = 2
-        job = Job.create(func=fixtures.say_hello, result_ttl=job_result_ttl)
-        self.assertEqual(job.get_result_ttl(default_ttl=default_ttl), job_result_ttl)
-        job = Job.create(func=fixtures.say_hello)
-        self.assertEqual(job.get_result_ttl(default_ttl=default_ttl), default_ttl)
-    def test_get_job_ttl(self):
-        """Getting job TTL."""
-        ttl = 1
-        job = Job.create(func=fixtures.say_hello, ttl=ttl)
-        self.assertEqual(job.get_ttl(), ttl)
-        job = Job.create(func=fixtures.say_hello)
-        self.assertEqual(job.get_ttl(), None)
-    def test_ttl_via_enqueue(self):
-        ttl = 1
-        queue = Queue(connection=self.testconn)
-        job = queue.enqueue(fixtures.say_hello, ttl=ttl)
-        self.assertEqual(job.get_ttl(), ttl)
-    def test_never_expire_during_execution(self):
-        """Test what happens when job expires during execution"""
-        ttl = 1
-        queue = Queue(connection=self.testconn)
-        job = queue.enqueue(fixtures.long_running_job, args=(2,), ttl=ttl)
-        self.assertEqual(job.get_ttl(), ttl)
-        job.perform()
-        self.assertEqual(job.get_ttl(), ttl)
-        self.assertTrue(job.exists(
-        self.assertEqual(job.result, 'Done sleeping...')
-    def test_cleanup(self):
-        """Test that jobs and results are expired properly."""
-        job = Job.create(func=fixtures.say_hello)
-        # Jobs with negative TTLs don't expire
-        job.cleanup(ttl=-1)
-        self.assertEqual(self.testconn.ttl(job.key), -1)
-        # Jobs with positive TTLs are eventually deleted
-        job.cleanup(ttl=100)
-        self.assertEqual(self.testconn.ttl(job.key), 100)
-        # Jobs with 0 TTL are immediately deleted
-        job.cleanup(ttl=0)
-        self.assertRaises(NoSuchJobError, Job.fetch,, self.testconn)
-    def test_cleanup_expires_dependency_keys(self):
-        dependency_job = Job.create(func=fixtures.say_hello)
-        dependent_job = Job.create(func=fixtures.say_hello, depends_on=dependency_job)
-        dependent_job.register_dependency()
-        dependent_job.cleanup(ttl=100)
-        dependency_job.cleanup(ttl=100)
-        self.assertEqual(self.testconn.ttl(dependent_job.dependencies_key), 100)
-        self.assertEqual(self.testconn.ttl(dependency_job.dependents_key), 100)
-    def test_job_get_position(self):
-        queue = Queue(connection=self.testconn)
-        job = queue.enqueue(fixtures.say_hello)
-        job2 = queue.enqueue(fixtures.say_hello)
-        job3 = Job(fixtures.say_hello)
-        self.assertEqual(0, job.get_position())
-        self.assertEqual(1, job2.get_position())
-        self.assertEqual(None, job3.get_position())
-    def test_job_with_dependents_delete_parent(self):
-        """job.delete() deletes itself from Redis but not dependents.
-        Wthout a save, the dependent job is never saved into redis. The delete
-        method will get and pass a NoSuchJobError.
-        """
-        queue = Queue(connection=self.testconn, serializer=JSONSerializer)
-        job = queue.enqueue(fixtures.say_hello)
-        job2 = Job.create(func=fixtures.say_hello, depends_on=job, serializer=JSONSerializer)
-        job2.register_dependency()
-        job.delete()
-        self.assertFalse(self.testconn.exists(job.key))
-        self.assertFalse(self.testconn.exists(job.dependents_key))
-        # By default, dependents are not deleted, but The job is in redis only
-        # if it was saved!
-        self.assertFalse(self.testconn.exists(job2.key))
-        self.assertNotIn(, queue.get_job_ids())
-    def test_job_delete_removes_itself_from_registries(self):
-        """job.delete() should remove itself from job registries"""
-        job = Job.create(
-            func=fixtures.say_hello,
-            status=JobStatus.FAILED,
-            connection=self.testconn,
-            origin='default',
-            serializer=JSONSerializer,
-        )
-        registry = FailedJobRegistry(connection=self.testconn, serializer=JSONSerializer)
-        registry.add(job, 500)
-        job.delete()
-        self.assertFalse(job in registry)
-        job = Job.create(
-            func=fixtures.say_hello,
-            status=JobStatus.STOPPED,
-            connection=self.testconn,
-            origin='default',
-            serializer=JSONSerializer,
-        )
-        registry = FailedJobRegistry(connection=self.testconn, serializer=JSONSerializer)
-        registry.add(job, 500)
-        job.delete()
-        self.assertFalse(job in registry)
-        job = Job.create(
-            func=fixtures.say_hello,
-            status=JobStatus.FINISHED,
-            connection=self.testconn,
-            origin='default',
-            serializer=JSONSerializer,
-        )
-        registry = FinishedJobRegistry(connection=self.testconn, serializer=JSONSerializer)
-        registry.add(job, 500)
-        job.delete()
-        self.assertFalse(job in registry)
-        job = Job.create(
-            func=fixtures.say_hello,
-            status=JobStatus.STARTED,
-            connection=self.testconn,
-            origin='default',
-            serializer=JSONSerializer,
-        )
-        registry = StartedJobRegistry(connection=self.testconn, serializer=JSONSerializer)
-        registry.add(job, 500)
-        job.delete()
-        self.assertFalse(job in registry)
-        job = Job.create(
-            func=fixtures.say_hello,
-            status=JobStatus.DEFERRED,
-            connection=self.testconn,
-            origin='default',
-            serializer=JSONSerializer,
-        )
-        registry = DeferredJobRegistry(connection=self.testconn, serializer=JSONSerializer)
-        registry.add(job, 500)
-        job.delete()
-        self.assertFalse(job in registry)
-        job = Job.create(
-            func=fixtures.say_hello,
-            status=JobStatus.SCHEDULED,
-            connection=self.testconn,
-            origin='default',
-            serializer=JSONSerializer,
-        )
-        registry = ScheduledJobRegistry(connection=self.testconn, serializer=JSONSerializer)
-        registry.add(job, 500)
-        job.delete()
-        self.assertFalse(job in registry)
-    def test_job_with_dependents_delete_parent_with_saved(self):
-        """job.delete() deletes itself from Redis but not dependents. If the
-        dependent job was saved, it will remain in redis."""
-        queue = Queue(connection=self.testconn, serializer=JSONSerializer)
-        job = queue.enqueue(fixtures.say_hello)
-        job2 = Job.create(func=fixtures.say_hello, depends_on=job, serializer=JSONSerializer)
-        job2.register_dependency()
-        job.delete()
-        self.assertFalse(self.testconn.exists(job.key))
-        self.assertFalse(self.testconn.exists(job.dependents_key))
-        # By default, dependents are not deleted, but The job is in redis only
-        # if it was saved!
-        self.assertTrue(self.testconn.exists(job2.key))
-        self.assertNotIn(, queue.get_job_ids())
-    def test_job_with_dependents_deleteall(self):
-        """job.delete() deletes itself from Redis. Dependents need to be
-        deleted explicitly."""
-        queue = Queue(connection=self.testconn, serializer=JSONSerializer)
-        job = queue.enqueue(fixtures.say_hello)
-        job2 = Job.create(func=fixtures.say_hello, depends_on=job, serializer=JSONSerializer)
-        job2.register_dependency()
-        job.delete(delete_dependents=True)
-        self.assertFalse(self.testconn.exists(job.key))
-        self.assertFalse(self.testconn.exists(job.dependents_key))
-        self.assertFalse(self.testconn.exists(job2.key))
-        self.assertNotIn(, queue.get_job_ids())
-    def test_job_with_dependents_delete_all_with_saved(self):
-        """job.delete() deletes itself from Redis. Dependents need to be
-        deleted explictely. Without a save, the dependent job is never saved
-        into redis. The delete method will get and pass a NoSuchJobError.
-        """
-        queue = Queue(connection=self.testconn, serializer=JSONSerializer)
-        job = queue.enqueue(fixtures.say_hello)
-        job2 = Job.create(func=fixtures.say_hello, depends_on=job, serializer=JSONSerializer)
-        job2.register_dependency()
-        job.delete(delete_dependents=True)
-        self.assertFalse(self.testconn.exists(job.key))
-        self.assertFalse(self.testconn.exists(job.dependents_key))
-        self.assertFalse(self.testconn.exists(job2.key))
-        self.assertNotIn(, queue.get_job_ids())
-    def test_dependent_job_creates_dependencies_key(self):
-        queue = Queue(connection=self.testconn)
-        dependency_job = queue.enqueue(fixtures.say_hello)
-        dependent_job = Job.create(func=fixtures.say_hello, depends_on=dependency_job)
-        dependent_job.register_dependency()
-        self.assertTrue(self.testconn.exists(dependent_job.dependencies_key))
-    def test_dependent_job_deletes_dependencies_key(self):
-        """
-        job.delete() deletes itself from Redis.
-        """
-        queue = Queue(connection=self.testconn, serializer=JSONSerializer)
-        dependency_job = queue.enqueue(fixtures.say_hello)
-        dependent_job = Job.create(func=fixtures.say_hello, depends_on=dependency_job, serializer=JSONSerializer)
-        dependent_job.register_dependency()
-        dependent_job.delete()
-        self.assertTrue(self.testconn.exists(dependency_job.key))
-        self.assertFalse(self.testconn.exists(dependent_job.dependencies_key))
-        self.assertFalse(self.testconn.exists(dependent_job.key))
-    def test_create_job_with_id(self):
-        """test creating jobs with a custom ID"""
-        queue = Queue(connection=self.testconn)
-        job = queue.enqueue(fixtures.say_hello, job_id="1234")
-        self.assertEqual(, "1234")
-        job.perform()
-        self.assertRaises(TypeError, queue.enqueue, fixtures.say_hello, job_id=1234)
-    def test_create_job_with_async(self):
-        """test creating jobs with async function"""
-        queue = Queue(connection=self.testconn)
-        async_job = queue.enqueue(fixtures.say_hello_async, job_id="async_job")
-        sync_job = queue.enqueue(fixtures.say_hello, job_id="sync_job")
-        self.assertEqual(, "async_job")
-        self.assertEqual(, "sync_job")
-        async_task_result = async_job.perform()
-        sync_task_result = sync_job.perform()
-        self.assertEqual(sync_task_result, async_task_result)
-    def test_get_call_string_unicode(self):
-        """test call string with unicode keyword arguments"""
-        queue = Queue(connection=self.testconn)
-        job = queue.enqueue(fixtures.echo, arg_with_unicode=fixtures.UnicodeStringObject())
-        self.assertIsNotNone(job.get_call_string())
-        job.perform()
-    def test_create_job_from_static_method(self):
-        """test creating jobs with static method"""
-        queue = Queue(connection=self.testconn)
-        job = queue.enqueue(fixtures.ClassWithAStaticMethod.static_method)
-        self.assertIsNotNone(job.get_call_string())
-        job.perform()
-    def test_create_job_with_ttl_should_have_ttl_after_enqueued(self):
-        """test creating jobs with ttl and checks if get_jobs returns it properly [issue502]"""
-        queue = Queue(connection=self.testconn)
-        queue.enqueue(fixtures.say_hello, job_id="1234", ttl=10)
-        job = queue.get_jobs()[0]
-        self.assertEqual(job.ttl, 10)
-    def test_create_job_with_ttl_should_expire(self):
-        """test if a job created with ttl expires [issue502]"""
-        queue = Queue(connection=self.testconn)
-        queue.enqueue(fixtures.say_hello, job_id="1234", ttl=1)
-        time.sleep(1.1)
-        self.assertEqual(0, len(queue.get_jobs()))
-    def test_create_and_cancel_job(self):
-        """Ensure job.cancel() works properly"""
-        queue = Queue(connection=self.testconn)
-        job = queue.enqueue(fixtures.say_hello)
-        self.assertEqual(1, len(queue.get_jobs()))
-        cancel_job(
-        self.assertEqual(0, len(queue.get_jobs()))
-        registry = CanceledJobRegistry(connection=self.testconn, queue=queue)
-        self.assertIn(job, registry)
-        self.assertEqual(job.get_status(), JobStatus.CANCELED)
-        # If job is deleted, it's also removed from CanceledJobRegistry
-        job.delete()
-        self.assertNotIn(job, registry)
-    def test_create_and_cancel_job_fails_already_canceled(self):
-        """Ensure job.cancel() fails on already canceled job"""
-        queue = Queue(connection=self.testconn)
-        job = queue.enqueue(fixtures.say_hello, job_id='fake_job_id')
-        self.assertEqual(1, len(queue.get_jobs()))
-        # First cancel should be fine
-        cancel_job(
-        self.assertEqual(0, len(queue.get_jobs()))
-        registry = CanceledJobRegistry(connection=self.testconn, queue=queue)
-        self.assertIn(job, registry)
-        self.assertEqual(job.get_status(), JobStatus.CANCELED)
-        # Second cancel should fail
-        self.assertRaisesRegex(
-            InvalidJobOperation, r'Cannot cancel already canceled job: fake_job_id', cancel_job,
-        )
-    def test_create_and_cancel_job_enqueue_dependents(self):
-        """Ensure job.cancel() works properly with enqueue_dependents=True"""
-        queue = Queue(connection=self.testconn)
-        dependency = queue.enqueue(fixtures.say_hello)
-        dependent = queue.enqueue(fixtures.say_hello, depends_on=dependency)
-        self.assertEqual(1, len(queue.get_jobs()))
-        self.assertEqual(1, len(queue.deferred_job_registry))
-        cancel_job(, enqueue_dependents=True)
-        self.assertEqual(1, len(queue.get_jobs()))
-        self.assertEqual(0, len(queue.deferred_job_registry))
-        registry = CanceledJobRegistry(connection=self.testconn, queue=queue)
-        self.assertIn(dependency, registry)
-        self.assertEqual(dependency.get_status(), JobStatus.CANCELED)
-        self.assertIn(dependent, queue.get_jobs())
-        self.assertEqual(dependent.get_status(), JobStatus.QUEUED)
-        # If job is deleted, it's also removed from CanceledJobRegistry
-        dependency.delete()
-        self.assertNotIn(dependency, registry)
-    def test_create_and_cancel_job_enqueue_dependents_in_registry(self):
-        """Ensure job.cancel() works properly with enqueue_dependents=True and when the job is in a registry"""
-        queue = Queue(connection=self.testconn)
-        dependency = queue.enqueue(fixtures.raise_exc)
-        dependent = queue.enqueue(fixtures.say_hello, depends_on=dependency)
-        print('# Post enqueue', self.testconn.smembers(dependency.dependents_key))
-        self.assertTrue(dependency.dependent_ids)
-        self.assertEqual(1, len(queue.get_jobs()))
-        self.assertEqual(1, len(queue.deferred_job_registry))
-        w = Worker([queue])
-, max_jobs=1)
-        self.assertTrue(dependency.dependent_ids)
-        print('# Post work', self.testconn.smembers(dependency.dependents_key))
-        dependency.refresh()
-        dependent.refresh()
-        self.assertEqual(0, len(queue.get_jobs()))
-        self.assertEqual(1, len(queue.deferred_job_registry))
-        self.assertEqual(1, len(queue.failed_job_registry))
-        print('# Pre cancel', self.testconn.smembers(dependency.dependents_key))
-        cancel_job(, enqueue_dependents=True)
-        dependency.refresh()
-        dependent.refresh()
-        print('#Post cancel', self.testconn.smembers(dependency.dependents_key))
-        self.assertEqual(1, len(queue.get_jobs()))
-        self.assertEqual(0, len(queue.deferred_job_registry))
-        self.assertEqual(0, len(queue.failed_job_registry))
-        self.assertEqual(1, len(queue.canceled_job_registry))
-        registry = CanceledJobRegistry(connection=self.testconn, queue=queue)
-        self.assertIn(dependency, registry)
-        self.assertEqual(dependency.get_status(), JobStatus.CANCELED)
-        self.assertNotIn(dependency, queue.failed_job_registry)
-        self.assertIn(dependent, queue.get_jobs())
-        self.assertEqual(dependent.get_status(), JobStatus.QUEUED)
-        # If job is deleted, it's also removed from CanceledJobRegistry
-        dependency.delete()
-        self.assertNotIn(dependency, registry)
-    def test_create_and_cancel_job_enqueue_dependents_with_pipeline(self):
-        """Ensure job.cancel() works properly with enqueue_dependents=True"""
-        queue = Queue(connection=self.testconn)
-        dependency = queue.enqueue(fixtures.say_hello)
-        dependent = queue.enqueue(fixtures.say_hello, depends_on=dependency)
-        self.assertEqual(1, len(queue.get_jobs()))
-        self.assertEqual(1, len(queue.deferred_job_registry))
-        self.testconn.set('some:key', b'some:value')
-        with self.testconn.pipeline() as pipe:
-  'some:key')
-            self.assertEqual(self.testconn.get('some:key'), b'some:value')
-            dependency.cancel(pipeline=pipe, enqueue_dependents=True)
-            pipe.set('some:key', b'some:other:value')
-            pipe.execute()
-        self.assertEqual(self.testconn.get('some:key'), b'some:other:value')
-        self.assertEqual(1, len(queue.get_jobs()))
-        self.assertEqual(0, len(queue.deferred_job_registry))
-        registry = CanceledJobRegistry(connection=self.testconn, queue=queue)
-        self.assertIn(dependency, registry)
-        self.assertEqual(dependency.get_status(), JobStatus.CANCELED)
-        self.assertIn(dependent, queue.get_jobs())
-        self.assertEqual(dependent.get_status(), JobStatus.QUEUED)
-        # If job is deleted, it's also removed from CanceledJobRegistry
-        dependency.delete()
-        self.assertNotIn(dependency, registry)
-    def test_create_and_cancel_job_with_serializer(self):
-        """test creating and using cancel_job (with serializer) deletes job properly"""
-        queue = Queue(connection=self.testconn, serializer=JSONSerializer)
-        job = queue.enqueue(fixtures.say_hello)
-        self.assertEqual(1, len(queue.get_jobs()))
-        cancel_job(, serializer=JSONSerializer)
-        self.assertEqual(0, len(queue.get_jobs()))
-    def test_dependents_key_for_should_return_prefixed_job_id(self):
-        """test redis key to store job dependents hash under"""
-        job_id = 'random'
-        key = Job.dependents_key_for(job_id=job_id)
-        assert key == Job.redis_job_namespace_prefix + job_id + ':dependents'
-    def test_key_for_should_return_prefixed_job_id(self):
-        """test redis key to store job hash under"""
-        job_id = 'random'
-        key = Job.key_for(job_id=job_id)
-        assert key == (Job.redis_job_namespace_prefix + job_id).encode('utf-8')
-    def test_dependencies_key_should_have_prefixed_job_id(self):
-        job_id = 'random'
-        job = Job(id=job_id)
-        expected_key = Job.redis_job_namespace_prefix + ":" + job_id + ':dependencies'
-        assert job.dependencies_key == expected_key
-    def test_fetch_dependencies_returns_dependency_jobs(self):
-        queue = Queue(connection=self.testconn)
-        dependency_job = queue.enqueue(fixtures.say_hello)
-        dependent_job = Job.create(func=fixtures.say_hello, depends_on=dependency_job)
-        dependent_job.register_dependency()
-        dependencies = dependent_job.fetch_dependencies(pipeline=self.testconn)
-        self.assertListEqual(dependencies, [dependency_job])
-    def test_fetch_dependencies_returns_empty_if_not_dependent_job(self):
-        dependent_job = Job.create(func=fixtures.say_hello)
-        dependent_job.register_dependency()
-        dependencies = dependent_job.fetch_dependencies(pipeline=self.testconn)
-        self.assertListEqual(dependencies, [])
-    def test_fetch_dependencies_raises_if_dependency_deleted(self):
-        queue = Queue(connection=self.testconn)
-        dependency_job = queue.enqueue(fixtures.say_hello)
-        dependent_job = Job.create(func=fixtures.say_hello, depends_on=dependency_job)
-        dependent_job.register_dependency()
-        dependency_job.delete()
-        self.assertNotIn(, [ for job in dependent_job.fetch_dependencies(pipeline=self.testconn)])
-    def test_fetch_dependencies_watches(self):
-        queue = Queue(connection=self.testconn)
-        dependency_job = queue.enqueue(fixtures.say_hello)
-        dependent_job = Job.create(func=fixtures.say_hello, depends_on=dependency_job)
-        dependent_job.register_dependency()
-        with self.testconn.pipeline() as pipeline:
-            dependent_job.fetch_dependencies(watch=True, pipeline=pipeline)
-            pipeline.multi()
-            with self.assertRaises(WatchError):
-                self.testconn.set(Job.key_for(, 'somethingelsehappened')
-                pipeline.touch(
-                pipeline.execute()
-    def test_dependencies_finished_returns_false_if_dependencies_queued(self):
-        queue = Queue(connection=self.testconn)
-        dependency_job_ids = [queue.enqueue(fixtures.say_hello).id for _ in range(5)]
-        dependent_job = Job.create(func=fixtures.say_hello)
-        dependent_job._dependency_ids = dependency_job_ids
-        dependent_job.register_dependency()
-        dependencies_finished = dependent_job.dependencies_are_met()
-        self.assertFalse(dependencies_finished)
-    def test_dependencies_finished_returns_true_if_no_dependencies(self):
-        dependent_job = Job.create(func=fixtures.say_hello)
-        dependent_job.register_dependency()
-        dependencies_finished = dependent_job.dependencies_are_met()
-        self.assertTrue(dependencies_finished)
-    def test_dependencies_finished_returns_true_if_all_dependencies_finished(self):
-        dependency_jobs = [Job.create(fixtures.say_hello) for _ in range(5)]
-        dependent_job = Job.create(func=fixtures.say_hello)
-        dependent_job._dependency_ids = [ for job in dependency_jobs]
-        dependent_job.register_dependency()
-        now = utcnow()
-        # Set ended_at timestamps
-        for i, job in enumerate(dependency_jobs):
-            job._status = JobStatus.FINISHED
-            job.ended_at = now - timedelta(seconds=i)
-        dependencies_finished = dependent_job.dependencies_are_met()
-        self.assertTrue(dependencies_finished)
-    def test_dependencies_finished_returns_false_if_unfinished_job(self):
-        dependency_jobs = [Job.create(fixtures.say_hello) for _ in range(2)]
-        dependency_jobs[0]._status = JobStatus.FINISHED
-        dependency_jobs[0].ended_at = utcnow()
-        dependency_jobs[0].save()
-        dependency_jobs[1]._status = JobStatus.STARTED
-        dependency_jobs[1].ended_at = None
-        dependency_jobs[1].save()
-        dependent_job = Job.create(func=fixtures.say_hello)
-        dependent_job._dependency_ids = [ for job in dependency_jobs]
-        dependent_job.register_dependency()
-        dependencies_finished = dependent_job.dependencies_are_met()
-        self.assertFalse(dependencies_finished)
-    def test_dependencies_finished_watches_job(self):
-        queue = Queue(connection=self.testconn)
-        dependency_job = queue.enqueue(fixtures.say_hello)
-        dependent_job = Job.create(func=fixtures.say_hello)
-        dependent_job._dependency_ids = []
-        dependent_job.register_dependency()
-        with self.testconn.pipeline() as pipeline:
-            dependent_job.dependencies_are_met(
-                pipeline=pipeline,
-            )
-            dependency_job.set_status(JobStatus.FAILED, pipeline=self.testconn)
-            pipeline.multi()
-            with self.assertRaises(WatchError):
-                pipeline.touch(Job.key_for(
-                pipeline.execute()
-    def test_execution_order_with_sole_dependency(self):
-        queue = Queue(connection=self.testconn)
-        key = 'test_job:job_order'
-        # When there are no dependencies, the two fast jobs ("A" and "B") run in the order enqueued.
-        # Worker 1 will be busy with the slow job, so worker 2 will complete both fast jobs.
-        job_slow = queue.enqueue(fixtures.rpush, args=[key, "slow", True, 0.5], job_id='slow_job')
-        job_A = queue.enqueue(fixtures.rpush, args=[key, "A", True])
-        job_B = queue.enqueue(fixtures.rpush, args=[key, "B", True])
-        fixtures.burst_two_workers(queue)
-        time.sleep(0.75)
-        jobs_completed = [v.decode() for v in self.testconn.lrange(key, 0, 2)]
-        self.assertEqual(queue.count, 0)
-        self.assertTrue(all(job.is_finished for job in [job_slow, job_A, job_B]))
-        self.assertEqual(jobs_completed, ["A:w2", "B:w2", "slow:w1"])
-        self.testconn.delete(key)
-        # When job "A" depends on the slow job, then job "B" finishes before "A".
-        # There is no clear requirement on which worker should take job "A", so we stay silent on that.
-        job_slow = queue.enqueue(fixtures.rpush, args=[key, "slow", True, 0.5], job_id='slow_job')
-        job_A = queue.enqueue(fixtures.rpush, args=[key, "A", False], depends_on='slow_job')
-        job_B = queue.enqueue(fixtures.rpush, args=[key, "B", True])
-        fixtures.burst_two_workers(queue)
-        time.sleep(0.75)
-        jobs_completed = [v.decode() for v in self.testconn.lrange(key, 0, 2)]
-        self.assertEqual(queue.count, 0)
-        self.assertTrue(all(job.is_finished for job in [job_slow, job_A, job_B]))
-        self.assertEqual(jobs_completed, ["B:w2", "slow:w1", "A"])
-    def test_execution_order_with_dual_dependency(self):
-        queue = Queue(connection=self.testconn)
-        key = 'test_job:job_order'
-        # When there are no dependencies, the two fast jobs ("A" and "B") run in the order enqueued.
-        job_slow_1 = queue.enqueue(fixtures.rpush, args=[key, "slow_1", True, 0.5], job_id='slow_1')
-        job_slow_2 = queue.enqueue(fixtures.rpush, args=[key, "slow_2", True, 0.75], job_id='slow_2')
-        job_A = queue.enqueue(fixtures.rpush, args=[key, "A", True])
-        job_B = queue.enqueue(fixtures.rpush, args=[key, "B", True])
-        fixtures.burst_two_workers(queue)
-        time.sleep(1)
-        jobs_completed = [v.decode() for v in self.testconn.lrange(key, 0, 3)]
-        self.assertEqual(queue.count, 0)
-        self.assertTrue(all(job.is_finished for job in [job_slow_1, job_slow_2, job_A, job_B]))
-        self.assertEqual(jobs_completed, ["slow_1:w1", "A:w1", "B:w1", "slow_2:w2"])
-        self.testconn.delete(key)
-        # This time job "A" depends on two slow jobs, while job "B" depends only on the faster of
-        # the two. Job "B" should be completed before job "A".
-        # There is no clear requirement on which worker should take job "A", so we stay silent on that.
-        job_slow_1 = queue.enqueue(fixtures.rpush, args=[key, "slow_1", True, 0.5], job_id='slow_1')
-        job_slow_2 = queue.enqueue(fixtures.rpush, args=[key, "slow_2", True, 0.75], job_id='slow_2')
-        job_A = queue.enqueue(fixtures.rpush, args=[key, "A", False], depends_on=['slow_1', 'slow_2'])
-        job_B = queue.enqueue(fixtures.rpush, args=[key, "B", True], depends_on=['slow_1'])
-        fixtures.burst_two_workers(queue)
-        time.sleep(1)
-        jobs_completed = [v.decode() for v in self.testconn.lrange(key, 0, 3)]
-        self.assertEqual(queue.count, 0)
-        self.assertTrue(all(job.is_finished for job in [job_slow_1, job_slow_2, job_A, job_B]))
-        self.assertEqual(jobs_completed, ["slow_1:w1", "B:w1", "slow_2:w2", "A"])
diff --git a/tests/ b/tests/
deleted file mode 100644
index 8cef010..0000000
--- a/tests/
+++ /dev/null
@@ -1,36 +0,0 @@
-import unittest
-from unittest.mock import patch
-from redis import Redis
-from rq.job import JobStatus
-from rq.maintenance import clean_intermediate_queue
-from rq.queue import Queue
-from rq.utils import get_version
-from rq.worker import Worker
-from tests import RQTestCase
-from tests.fixtures import say_hello
-class MaintenanceTestCase(RQTestCase):
-    @unittest.skipIf(get_version(Redis()) < (6, 2, 0), 'Skip if Redis server < 6.2.0')
-    def test_cleanup_intermediate_queue(self):
-        """Ensure jobs stuck in the intermediate queue are cleaned up."""
-        queue = Queue('foo', connection=self.testconn)
-        job = queue.enqueue(say_hello)
-        # If job execution fails after it's dequeued, job should be in the intermediate queue
-        # # and it's status is still QUEUED
-        with patch.object(Worker, 'execute_job'):
-            # mocked.execute_job.side_effect = Exception()
-            worker = Worker(queue, connection=self.testconn)
-            self.assertEqual(job.get_status(), JobStatus.QUEUED)
-            self.assertFalse( in queue.get_job_ids())
-            self.assertIsNotNone(self.testconn.lpos(queue.intermediate_queue_key,
-            # After cleaning up the intermediate queue, job status should be `FAILED`
-            # and job is also removed from the intermediate queue
-            clean_intermediate_queue(worker, queue)
-            self.assertEqual(job.get_status(), JobStatus.FAILED)
-            self.assertIsNone(self.testconn.lpos(queue.intermediate_queue_key,
diff --git a/tests/ b/tests/
deleted file mode 100644
index 4ae4e26..0000000
--- a/tests/
+++ /dev/null
@@ -1,802 +0,0 @@
-import json
-import unittest
-from datetime import datetime, timedelta, timezone
-from unittest.mock import patch
-from redis import Redis
-from rq import Queue, Retry
-from rq.job import Job, JobStatus
-from rq.registry import (
-    CanceledJobRegistry,
-    DeferredJobRegistry,
-    FailedJobRegistry,
-    FinishedJobRegistry,
-    ScheduledJobRegistry,
-    StartedJobRegistry,
-from rq.serializers import JSONSerializer
-from rq.utils import get_version
-from rq.worker import Worker
-from tests import RQTestCase
-from tests.fixtures import echo, say_hello
-class MultipleDependencyJob(Job):
-    """
-    Allows for the patching of `_dependency_ids` to simulate multi-dependency
-    support without modifying the public interface of `Job`
-    """
-    create_job = Job.create
-    @classmethod
-    def create(cls, *args, **kwargs):
-        dependency_ids = kwargs.pop('kwargs').pop('_dependency_ids')
-        _job = cls.create_job(*args, **kwargs)
-        _job._dependency_ids = dependency_ids
-        return _job
-class TestQueue(RQTestCase):
-    def test_create_queue(self):
-        """Creating queues."""
-        q = Queue('my-queue')
-        self.assertEqual(, 'my-queue')
-        self.assertEqual(str(q), '<Queue my-queue>')
-    def test_create_queue_with_serializer(self):
-        """Creating queues with serializer."""
-        # Test using json serializer
-        q = Queue('queue-with-serializer', serializer=json)
-        self.assertEqual(, 'queue-with-serializer')
-        self.assertEqual(str(q), '<Queue queue-with-serializer>')
-        self.assertIsNotNone(q.serializer)
-    def test_create_default_queue(self):
-        """Instantiating the default queue."""
-        q = Queue()
-        self.assertEqual(, 'default')
-    def test_equality(self):
-        """Mathematical equality of queues."""
-        q1 = Queue('foo')
-        q2 = Queue('foo')
-        q3 = Queue('bar')
-        self.assertEqual(q1, q2)
-        self.assertEqual(q2, q1)
-        self.assertNotEqual(q1, q3)
-        self.assertNotEqual(q2, q3)
-        self.assertGreater(q1, q3)
-        self.assertRaises(TypeError, lambda: q1 == 'some string')
-        self.assertRaises(TypeError, lambda: q1 < 'some string')
-    def test_empty_queue(self):
-        """Emptying queues."""
-        q = Queue('example')
-        self.testconn.rpush('rq:queue:example', 'foo')
-        self.testconn.rpush('rq:queue:example', 'bar')
-        self.assertEqual(q.is_empty(), False)
-        q.empty()
-        self.assertEqual(q.is_empty(), True)
-        self.assertIsNone(self.testconn.lpop('rq:queue:example'))
-    def test_empty_removes_jobs(self):
-        """Emptying a queue deletes the associated job objects"""
-        q = Queue('example')
-        job = q.enqueue(say_hello)
-        self.assertTrue(Job.exists(
-        q.empty()
-        self.assertFalse(Job.exists(
-    def test_queue_is_empty(self):
-        """Detecting empty queues."""
-        q = Queue('example')
-        self.assertEqual(q.is_empty(), True)
-        self.testconn.rpush('rq:queue:example', 'sentinel message')
-        self.assertEqual(q.is_empty(), False)
-    def test_queue_delete(self):
-        """Test queue.delete properly removes queue"""
-        q = Queue('example')
-        job = q.enqueue(say_hello)
-        job2 = q.enqueue(say_hello)
-        self.assertEqual(2, len(q.get_job_ids()))
-        q.delete()
-        self.assertEqual(0, len(q.get_job_ids()))
-        self.assertEqual(False, self.testconn.exists(job.key))
-        self.assertEqual(False, self.testconn.exists(job2.key))
-        self.assertEqual(0, len(self.testconn.smembers(Queue.redis_queues_keys)))
-        self.assertEqual(False, self.testconn.exists(q.key))
-    def test_queue_delete_but_keep_jobs(self):
-        """Test queue.delete properly removes queue but keeps the job keys in the redis store"""
-        q = Queue('example')
-        job = q.enqueue(say_hello)
-        job2 = q.enqueue(say_hello)
-        self.assertEqual(2, len(q.get_job_ids()))
-        q.delete(delete_jobs=False)
-        self.assertEqual(0, len(q.get_job_ids()))
-        self.assertEqual(True, self.testconn.exists(job.key))
-        self.assertEqual(True, self.testconn.exists(job2.key))
-        self.assertEqual(0, len(self.testconn.smembers(Queue.redis_queues_keys)))
-        self.assertEqual(False, self.testconn.exists(q.key))
-    def test_position(self):
-        """Test queue.delete properly removes queue but keeps the job keys in the redis store"""
-        q = Queue('example')
-        job = q.enqueue(say_hello)
-        job2 = q.enqueue(say_hello)
-        job3 = q.enqueue(say_hello)
-        self.assertEqual(0, q.get_job_position(
-        self.assertEqual(1, q.get_job_position(
-        self.assertEqual(2, q.get_job_position(job3))
-        self.assertEqual(None, q.get_job_position("no_real_job"))
-    def test_remove(self):
-        """Ensure queue.remove properly removes Job from queue."""
-        q = Queue('example', serializer=JSONSerializer)
-        job = q.enqueue(say_hello)
-        self.assertIn(, q.job_ids)
-        q.remove(job)
-        self.assertNotIn(, q.job_ids)
-        job = q.enqueue(say_hello)
-        self.assertIn(, q.job_ids)
-        q.remove(
-        self.assertNotIn(, q.job_ids)
-    def test_jobs(self):
-        """Getting jobs out of a queue."""
-        q = Queue('example')
-        self.assertEqual(, [])
-        job = q.enqueue(say_hello)
-        self.assertEqual(, [job])
-        # Deleting job removes it from queue
-        job.delete()
-        self.assertEqual(q.job_ids, [])
-    def test_compact(self):
-        """Queue.compact() removes non-existing jobs."""
-        q = Queue()
-        q.enqueue(say_hello, 'Alice')
-        q.enqueue(say_hello, 'Charlie')
-        self.testconn.lpush(q.key, '1', '2')
-        self.assertEqual(q.count, 4)
-        self.assertEqual(len(q), 4)
-        q.compact()
-        self.assertEqual(q.count, 2)
-        self.assertEqual(len(q), 2)
-    def test_enqueue(self):
-        """Enqueueing job onto queues."""
-        q = Queue()
-        self.assertEqual(q.is_empty(), True)
-        # say_hello spec holds which queue this is sent to
-        job = q.enqueue(say_hello, 'Nick', foo='bar')
-        job_id =
-        self.assertEqual(job.origin,
-        # Inspect data inside Redis
-        q_key = 'rq:queue:default'
-        self.assertEqual(self.testconn.llen(q_key), 1)
-        self.assertEqual(self.testconn.lrange(q_key, 0, -1)[0].decode('ascii'), job_id)
-    def test_enqueue_sets_metadata(self):
-        """Enqueueing job onto queues modifies meta data."""
-        q = Queue()
-        job = Job.create(func=say_hello, args=('Nick',), kwargs=dict(foo='bar'))
-        # Preconditions
-        self.assertIsNone(job.enqueued_at)
-        # Action
-        q.enqueue_job(job)
-        # Postconditions
-        self.assertIsNotNone(job.enqueued_at)
-    def test_pop_job_id(self):
-        """Popping job IDs from queues."""
-        # Set up
-        q = Queue()
-        uuid = '112188ae-4e9d-4a5b-a5b3-f26f2cb054da'
-        q.push_job_id(uuid)
-        # Pop it off the queue...
-        self.assertEqual(q.count, 1)
-        self.assertEqual(q.pop_job_id(), uuid)
-        # ...and assert the queue count when down
-        self.assertEqual(q.count, 0)
-    def test_dequeue_any(self):
-        """Fetching work from any given queue."""
-        fooq = Queue('foo', connection=self.testconn)
-        barq = Queue('bar', connection=self.testconn)
-        self.assertRaises(ValueError, Queue.dequeue_any, [fooq, barq], timeout=0, connection=self.testconn)
-        self.assertEqual(Queue.dequeue_any([fooq, barq], None), None)
-        # Enqueue a single item
-        barq.enqueue(say_hello)
-        job, queue = Queue.dequeue_any([fooq, barq], None)
-        self.assertEqual(job.func, say_hello)
-        self.assertEqual(queue, barq)
-        # Enqueue items on both queues
-        barq.enqueue(say_hello, 'for Bar')
-        fooq.enqueue(say_hello, 'for Foo')
-        job, queue = Queue.dequeue_any([fooq, barq], None)
-        self.assertEqual(queue, fooq)
-        self.assertEqual(job.func, say_hello)
-        self.assertEqual(job.origin,
-        self.assertEqual(job.args[0], 'for Foo', 'Foo should be dequeued first.')
-        job, queue = Queue.dequeue_any([fooq, barq], None)
-        self.assertEqual(queue, barq)
-        self.assertEqual(job.func, say_hello)
-        self.assertEqual(job.origin,
-        self.assertEqual(job.args[0], 'for Bar', 'Bar should be dequeued second.')
-    @unittest.skipIf(get_version(Redis()) < (6, 2, 0), 'Skip if Redis server < 6.2.0')
-    def test_dequeue_any_reliable(self):
-        """Dequeueing job from a single queue moves job to intermediate queue."""
-        foo_queue = Queue('foo', connection=self.testconn)
-        job_1 = foo_queue.enqueue(say_hello)
-        self.assertRaises(ValueError, Queue.dequeue_any, [foo_queue], timeout=0, connection=self.testconn)
-        # Job ID is not in intermediate queue
-        self.assertIsNone(self.testconn.lpos(foo_queue.intermediate_queue_key,
-        job, queue = Queue.dequeue_any([foo_queue], timeout=None, connection=self.testconn)
-        self.assertEqual(queue, foo_queue)
-        self.assertEqual(job.func, say_hello)
-        # After job is dequeued, the job ID is in the intermediate queue
-        self.assertEqual(self.testconn.lpos(foo_queue.intermediate_queue_key,, 0)
-        # Test the blocking version
-        foo_queue.enqueue(say_hello)
-        job, queue = Queue.dequeue_any([foo_queue], timeout=1, connection=self.testconn)
-        self.assertEqual(queue, foo_queue)
-        self.assertEqual(job.func, say_hello)
-        # After job is dequeued, the job ID is in the intermediate queue
-        self.assertEqual(self.testconn.lpos(foo_queue.intermediate_queue_key,, 1)
-    @unittest.skipIf(get_version(Redis()) < (6, 2, 0), 'Skip if Redis server < 6.2.0')
-    def test_intermediate_queue(self):
-        """Job should be stuck in intermediate queue if execution fails after dequeued."""
-        queue = Queue('foo', connection=self.testconn)
-        job = queue.enqueue(say_hello)
-        # If job execution fails after it's dequeued, job should be in the intermediate queue
-        # # and it's status is still QUEUED
-        with patch.object(Worker, 'execute_job'):
-            # mocked.execute_job.side_effect = Exception()
-            worker = Worker(queue, connection=self.testconn)
-            # Job status is still QUEUED even though it's already dequeued
-            self.assertEqual(job.get_status(refresh=True), JobStatus.QUEUED)
-            self.assertFalse( in queue.get_job_ids())
-            self.assertIsNotNone(self.testconn.lpos(queue.intermediate_queue_key,
-    def test_dequeue_any_ignores_nonexisting_jobs(self):
-        """Dequeuing (from any queue) silently ignores non-existing jobs."""
-        q = Queue('low')
-        uuid = '49f205ab-8ea3-47dd-a1b5-bfa186870fc8'
-        q.push_job_id(uuid)
-        # Dequeue simply ignores the missing job and returns None
-        self.assertEqual(q.count, 1)
-        self.assertEqual(Queue.dequeue_any([Queue(), Queue('low')], None), None)  # noqa
-        self.assertEqual(q.count, 0)
-    def test_enqueue_with_ttl(self):
-        """Negative TTL value is not allowed"""
-        queue = Queue()
-        self.assertRaises(ValueError, queue.enqueue, echo, 1, ttl=0)
-        self.assertRaises(ValueError, queue.enqueue, echo, 1, ttl=-1)
-    def test_enqueue_sets_status(self):
-        """Enqueueing a job sets its status to "queued"."""
-        q = Queue()
-        job = q.enqueue(say_hello)
-        self.assertEqual(job.get_status(), JobStatus.QUEUED)
-    def test_enqueue_meta_arg(self):
-        """enqueue() can set the job.meta contents."""
-        q = Queue()
-        job = q.enqueue(say_hello, meta={'foo': 'bar', 'baz': 42})
-        self.assertEqual(job.meta['foo'], 'bar')
-        self.assertEqual(job.meta['baz'], 42)
-    def test_enqueue_with_failure_ttl(self):
-        """enqueue() properly sets job.failure_ttl"""
-        q = Queue()
-        job = q.enqueue(say_hello, failure_ttl=10)
-        job.refresh()
-        self.assertEqual(job.failure_ttl, 10)
-    def test_job_timeout(self):
-        """Timeout can be passed via job_timeout argument"""
-        queue = Queue()
-        job = queue.enqueue(echo, 1, job_timeout=15)
-        self.assertEqual(job.timeout, 15)
-        # Not passing job_timeout will use queue._default_timeout
-        job = queue.enqueue(echo, 1)
-        self.assertEqual(job.timeout, queue._default_timeout)
-        # job_timeout = 0 is not allowed
-        self.assertRaises(ValueError, queue.enqueue, echo, 1, job_timeout=0)
-    def test_default_timeout(self):
-        """Timeout can be passed via job_timeout argument"""
-        queue = Queue()
-        job = queue.enqueue(echo, 1)
-        self.assertEqual(job.timeout, queue.DEFAULT_TIMEOUT)
-        job = Job.create(func=echo)
-        job = queue.enqueue_job(job)
-        self.assertEqual(job.timeout, queue.DEFAULT_TIMEOUT)
-        queue = Queue(default_timeout=15)
-        job = queue.enqueue(echo, 1)
-        self.assertEqual(job.timeout, 15)
-        job = Job.create(func=echo)
-        job = queue.enqueue_job(job)
-        self.assertEqual(job.timeout, 15)
-    def test_synchronous_timeout(self):
-        queue = Queue(is_async=False)
-        self.assertFalse(queue.is_async)
-        no_expire_job = queue.enqueue(echo, result_ttl=-1)
-        self.assertEqual(queue.connection.ttl(no_expire_job.key), -1)
-        delete_job = queue.enqueue(echo, result_ttl=0)
-        self.assertEqual(queue.connection.ttl(delete_job.key), -2)
-        keep_job = queue.enqueue(echo, result_ttl=100)
-        self.assertLessEqual(queue.connection.ttl(keep_job.key), 100)
-    def test_enqueue_explicit_args(self):
-        """enqueue() works for both implicit/explicit args."""
-        q = Queue()
-        # Implicit args/kwargs mode
-        job = q.enqueue(echo, 1, job_timeout=1, result_ttl=1, bar='baz')
-        self.assertEqual(job.timeout, 1)
-        self.assertEqual(job.result_ttl, 1)
-        self.assertEqual(job.perform(), ((1,), {'bar': 'baz'}))
-        # Explicit kwargs mode
-        kwargs = {
-            'timeout': 1,
-            'result_ttl': 1,
-        }
-        job = q.enqueue(echo, job_timeout=2, result_ttl=2, args=[1], kwargs=kwargs)
-        self.assertEqual(job.timeout, 2)
-        self.assertEqual(job.result_ttl, 2)
-        self.assertEqual(job.perform(), ((1,), {'timeout': 1, 'result_ttl': 1}))
-        # Explicit args and kwargs should also work with enqueue_at
-        time = + timedelta(seconds=10)
-        job = q.enqueue_at(time, echo, job_timeout=2, result_ttl=2, args=[1], kwargs=kwargs)
-        self.assertEqual(job.timeout, 2)
-        self.assertEqual(job.result_ttl, 2)
-        self.assertEqual(job.perform(), ((1,), {'timeout': 1, 'result_ttl': 1}))
-        # Positional arguments is not allowed if explicit args and kwargs are used
-        self.assertRaises(Exception, q.enqueue, echo, 1, kwargs=kwargs)
-    def test_all_queues(self):
-        """All queues"""
-        q1 = Queue('first-queue')
-        q2 = Queue('second-queue')
-        q3 = Queue('third-queue')
-        # Ensure a queue is added only once a job is enqueued
-        self.assertEqual(len(Queue.all()), 0)
-        q1.enqueue(say_hello)
-        self.assertEqual(len(Queue.all()), 1)
-        # Ensure this holds true for multiple queues
-        q2.enqueue(say_hello)
-        q3.enqueue(say_hello)
-        names = [ for q in Queue.all()]
-        self.assertEqual(len(Queue.all()), 3)
-        # Verify names
-        self.assertTrue('first-queue' in names)
-        self.assertTrue('second-queue' in names)
-        self.assertTrue('third-queue' in names)
-        # Now empty two queues
-        w = Worker([q2, q3])
-        # Queue.all() should still report the empty queues
-        self.assertEqual(len(Queue.all()), 3)
-    def test_all_custom_job(self):
-        class CustomJob(Job):
-            pass
-        q = Queue('all-queue')
-        q.enqueue(say_hello)
-        queues = Queue.all(job_class=CustomJob)
-        self.assertEqual(len(queues), 1)
-        self.assertIs(queues[0].job_class, CustomJob)
-    def test_from_queue_key(self):
-        """Ensure being able to get a Queue instance manually from Redis"""
-        q = Queue()
-        key = Queue.redis_queue_namespace_prefix + 'default'
-        reverse_q = Queue.from_queue_key(key)
-        self.assertEqual(q, reverse_q)
-    def test_from_queue_key_error(self):
-        """Ensure that an exception is raised if the queue prefix is wrong"""
-        key = 'some:weird:prefix:' + 'default'
-        self.assertRaises(ValueError, Queue.from_queue_key, key)
-    def test_enqueue_dependents(self):
-        """Enqueueing dependent jobs pushes all jobs in the depends set to the queue
-        and removes them from DeferredJobQueue."""
-        q = Queue()
-        parent_job = Job.create(func=say_hello)
-        job_1 = q.enqueue(say_hello, depends_on=parent_job)
-        job_2 = q.enqueue(say_hello, depends_on=parent_job)
-        registry = DeferredJobRegistry(, connection=self.testconn)
-        parent_job.set_status(JobStatus.FINISHED)
-        self.assertEqual(set(registry.get_job_ids()), set([,]))
-        # After dependents is enqueued, job_1 and job_2 should be in queue
-        self.assertEqual(q.job_ids, [])
-        q.enqueue_dependents(parent_job)
-        self.assertEqual(set(q.job_ids), set([,]))
-        self.assertFalse(self.testconn.exists(parent_job.dependents_key))
-        # DeferredJobRegistry should also be empty
-        self.assertEqual(registry.get_job_ids(), [])
-    def test_enqueue_dependents_on_multiple_queues(self):
-        """Enqueueing dependent jobs on multiple queues pushes jobs in the queues
-        and removes them from DeferredJobRegistry for each different queue."""
-        q_1 = Queue("queue_1")
-        q_2 = Queue("queue_2")
-        parent_job = Job.create(func=say_hello)
-        job_1 = q_1.enqueue(say_hello, depends_on=parent_job)
-        job_2 = q_2.enqueue(say_hello, depends_on=parent_job)
-        # Each queue has its own DeferredJobRegistry
-        registry_1 = DeferredJobRegistry(, connection=self.testconn)
-        self.assertEqual(set(registry_1.get_job_ids()), set([]))
-        registry_2 = DeferredJobRegistry(, connection=self.testconn)
-        parent_job.set_status(JobStatus.FINISHED)
-        self.assertEqual(set(registry_2.get_job_ids()), set([]))
-        # After dependents is enqueued, job_1 on queue_1 and
-        # job_2 should be in queue_2
-        self.assertEqual(q_1.job_ids, [])
-        self.assertEqual(q_2.job_ids, [])
-        q_1.enqueue_dependents(parent_job)
-        q_2.enqueue_dependents(parent_job)
-        self.assertEqual(set(q_1.job_ids), set([]))
-        self.assertEqual(set(q_2.job_ids), set([]))
-        self.assertFalse(self.testconn.exists(parent_job.dependents_key))
-        # DeferredJobRegistry should also be empty
-        self.assertEqual(registry_1.get_job_ids(), [])
-        self.assertEqual(registry_2.get_job_ids(), [])
-    def test_enqueue_job_with_dependency(self):
-        """Jobs are enqueued only when their dependencies are finished."""
-        # Job with unfinished dependency is not immediately enqueued
-        parent_job = Job.create(func=say_hello)
-        q = Queue()
-        job = q.enqueue_call(say_hello, depends_on=parent_job)
-        self.assertEqual(q.job_ids, [])
-        self.assertEqual(job.get_status(), JobStatus.DEFERRED)
-        # Jobs dependent on finished jobs are immediately enqueued
-        parent_job.set_status(JobStatus.FINISHED)
-        job = q.enqueue_call(say_hello, depends_on=parent_job)
-        self.assertEqual(q.job_ids, [])
-        self.assertEqual(job.timeout, Queue.DEFAULT_TIMEOUT)
-        self.assertEqual(job.get_status(), JobStatus.QUEUED)
-    def test_enqueue_job_with_dependency_and_pipeline(self):
-        """Jobs are enqueued only when their dependencies are finished, and by the caller when passing a pipeline."""
-        # Job with unfinished dependency is not immediately enqueued
-        parent_job = Job.create(func=say_hello)
-        q = Queue()
-        with q.connection.pipeline() as pipe:
-            job = q.enqueue_call(say_hello, depends_on=parent_job, pipeline=pipe)
-            self.assertEqual(q.job_ids, [])
-            self.assertEqual(job.get_status(refresh=False), JobStatus.DEFERRED)
-            # Not in registry before execute, since passed in pipeline
-            self.assertEqual(len(q.deferred_job_registry), 0)
-            pipe.execute()
-            # Only in registry after execute, since passed in pipeline
-        self.assertEqual(len(q.deferred_job_registry), 1)
-        # Jobs dependent on finished jobs are immediately enqueued
-        parent_job.set_status(JobStatus.FINISHED)
-        with q.connection.pipeline() as pipe:
-            job = q.enqueue_call(say_hello, depends_on=parent_job, pipeline=pipe)
-            # Pre execute conditions
-            self.assertEqual(q.job_ids, [])
-            self.assertEqual(job.timeout, Queue.DEFAULT_TIMEOUT)
-            self.assertEqual(job.get_status(refresh=False), JobStatus.QUEUED)
-            pipe.execute()
-        # Post execute conditions
-        self.assertEqual(q.job_ids, [])
-        self.assertEqual(job.timeout, Queue.DEFAULT_TIMEOUT)
-        self.assertEqual(job.get_status(refresh=False), JobStatus.QUEUED)
-    def test_enqueue_job_with_no_dependency_prior_watch_and_pipeline(self):
-        """Jobs are enqueued only when their dependencies are finished, and by the caller when passing a pipeline."""
-        q = Queue()
-        with q.connection.pipeline() as pipe:
-  'fake_key')  # Test watch then enqueue
-            job = q.enqueue_call(say_hello, pipeline=pipe)
-            self.assertEqual(q.job_ids, [])
-            self.assertEqual(job.get_status(refresh=False), JobStatus.QUEUED)
-            # Not in queue before execute, since passed in pipeline
-            self.assertEqual(len(q), 0)
-            # Make sure modifying key doesn't cause issues, if in multi mode won't fail
-            pipe.set(b'fake_key', b'fake_value')
-            pipe.execute()
-            # Only in registry after execute, since passed in pipeline
-        self.assertEqual(len(q), 1)
-    def test_enqueue_many_internal_pipeline(self):
-        """Jobs should be enqueued in bulk with an internal pipeline, enqueued in order provided
-        (but at_front still applies)"""
-        # Job with unfinished dependency is not immediately enqueued
-        q = Queue()
-        job_1_data = Queue.prepare_data(say_hello, job_id='fake_job_id_1', at_front=False)
-        job_2_data = Queue.prepare_data(say_hello, job_id='fake_job_id_2', at_front=False)
-        job_3_data = Queue.prepare_data(say_hello, job_id='fake_job_id_3', at_front=True)
-        jobs = q.enqueue_many(
-            [job_1_data, job_2_data, job_3_data],
-        )
-        for job in jobs:
-            self.assertEqual(job.get_status(refresh=False), JobStatus.QUEUED)
-        # Only in registry after execute, since passed in pipeline
-        self.assertEqual(len(q), 3)
-        self.assertEqual(q.job_ids, ['fake_job_id_3', 'fake_job_id_1', 'fake_job_id_2'])
-    def test_enqueue_many_with_passed_pipeline(self):
-        """Jobs should be enqueued in bulk with a passed pipeline, enqueued in order provided
-        (but at_front still applies)"""
-        # Job with unfinished dependency is not immediately enqueued
-        q = Queue()
-        with q.connection.pipeline() as pipe:
-            job_1_data = Queue.prepare_data(say_hello, job_id='fake_job_id_1', at_front=False)
-            job_2_data = Queue.prepare_data(say_hello, job_id='fake_job_id_2', at_front=False)
-            job_3_data = Queue.prepare_data(say_hello, job_id='fake_job_id_3', at_front=True)
-            jobs = q.enqueue_many([job_1_data, job_2_data, job_3_data], pipeline=pipe)
-            self.assertEqual(q.job_ids, [])
-            for job in jobs:
-                self.assertEqual(job.get_status(refresh=False), JobStatus.QUEUED)
-            pipe.execute()
-            # Only in registry after execute, since passed in pipeline
-            self.assertEqual(len(q), 3)
-            self.assertEqual(q.job_ids, ['fake_job_id_3', 'fake_job_id_1', 'fake_job_id_2'])
-    def test_enqueue_job_with_dependency_by_id(self):
-        """Can specify job dependency with job object or job id."""
-        parent_job = Job.create(func=say_hello)
-        q = Queue()
-        q.enqueue_call(say_hello,
-        self.assertEqual(q.job_ids, [])
-        # Jobs dependent on finished jobs are immediately enqueued
-        parent_job.set_status(JobStatus.FINISHED)
-        job = q.enqueue_call(say_hello,
-        self.assertEqual(q.job_ids, [])
-        self.assertEqual(job.timeout, Queue.DEFAULT_TIMEOUT)
-    def test_enqueue_job_with_dependency_and_timeout(self):
-        """Jobs remember their timeout when enqueued as a dependency."""
-        # Job with unfinished dependency is not immediately enqueued
-        parent_job = Job.create(func=say_hello)
-        q = Queue()
-        job = q.enqueue_call(say_hello, depends_on=parent_job, timeout=123)
-        self.assertEqual(q.job_ids, [])
-        self.assertEqual(job.timeout, 123)
-        # Jobs dependent on finished jobs are immediately enqueued
-        parent_job.set_status(JobStatus.FINISHED)
-        job = q.enqueue_call(say_hello, depends_on=parent_job, timeout=123)
-        self.assertEqual(q.job_ids, [])
-        self.assertEqual(job.timeout, 123)
-    def test_enqueue_job_with_multiple_queued_dependencies(self):
-        parent_jobs = [Job.create(func=say_hello) for _ in range(2)]
-        for job in parent_jobs:
-            job._status = JobStatus.QUEUED
-        q = Queue()
-        with patch('rq.queue.Job.create', new=MultipleDependencyJob.create):
-            job = q.enqueue(say_hello, depends_on=parent_jobs[0], _dependency_ids=[ for job in parent_jobs])
-            self.assertEqual(job.get_status(), JobStatus.DEFERRED)
-            self.assertEqual(q.job_ids, [])
-            self.assertEqual(job.fetch_dependencies(), parent_jobs)
-    def test_enqueue_job_with_multiple_finished_dependencies(self):
-        parent_jobs = [Job.create(func=say_hello) for _ in range(2)]
-        for job in parent_jobs:
-            job._status = JobStatus.FINISHED
-        q = Queue()
-        with patch('rq.queue.Job.create', new=MultipleDependencyJob.create):
-            job = q.enqueue(say_hello, depends_on=parent_jobs[0], _dependency_ids=[ for job in parent_jobs])
-            self.assertEqual(job.get_status(), JobStatus.QUEUED)
-            self.assertEqual(q.job_ids, [])
-            self.assertEqual(job.fetch_dependencies(), parent_jobs)
-    def test_enqueues_dependent_if_other_dependencies_finished(self):
-        parent_jobs = [Job.create(func=say_hello) for _ in range(3)]
-        parent_jobs[0]._status = JobStatus.STARTED
-        parent_jobs[0].save()
-        parent_jobs[1]._status = JobStatus.FINISHED
-        parent_jobs[1].save()
-        parent_jobs[2]._status = JobStatus.FINISHED
-        parent_jobs[2].save()
-        q = Queue()
-        with patch('rq.queue.Job.create', new=MultipleDependencyJob.create):
-            # dependent job deferred, b/c parent_job 0 is still 'started'
-            dependent_job = q.enqueue(
-                say_hello, depends_on=parent_jobs[0], _dependency_ids=[ for job in parent_jobs]
-            )
-            self.assertEqual(dependent_job.get_status(), JobStatus.DEFERRED)
-        # now set parent job 0 to 'finished'
-        parent_jobs[0].set_status(JobStatus.FINISHED)
-        q.enqueue_dependents(parent_jobs[0])
-        self.assertEqual(dependent_job.get_status(), JobStatus.QUEUED)
-        self.assertEqual(q.job_ids, [])
-    def test_does_not_enqueue_dependent_if_other_dependencies_not_finished(self):
-        started_dependency = Job.create(func=say_hello, status=JobStatus.STARTED)
-        queued_dependency = Job.create(func=say_hello, status=JobStatus.QUEUED)
-        q = Queue()
-        with patch('rq.queue.Job.create', new=MultipleDependencyJob.create):
-            dependent_job = q.enqueue(
-                say_hello,
-                depends_on=[started_dependency],
-                _dependency_ids=[,],
-            )
-            self.assertEqual(dependent_job.get_status(), JobStatus.DEFERRED)
-        q.enqueue_dependents(started_dependency)
-        self.assertEqual(dependent_job.get_status(), JobStatus.DEFERRED)
-        self.assertEqual(q.job_ids, [])
-    def test_fetch_job_successful(self):
-        """Fetch a job from a queue."""
-        q = Queue('example')
-        job_orig = q.enqueue(say_hello)
-        job_fetch: Job = q.fetch_job(  # type: ignore
-        self.assertIsNotNone(job_fetch)
-        self.assertEqual(,
-        self.assertEqual(job_orig.description, job_fetch.description)
-    def test_fetch_job_missing(self):
-        """Fetch a job from a queue which doesn't exist."""
-        q = Queue('example')
-        job = q.fetch_job('123')
-        self.assertIsNone(job)
-    def test_fetch_job_different_queue(self):
-        """Fetch a job from a queue which is in a different queue."""
-        q1 = Queue('example1')
-        q2 = Queue('example2')
-        job_orig = q1.enqueue(say_hello)
-        job_fetch = q2.fetch_job(
-        self.assertIsNone(job_fetch)
-        job_fetch = q1.fetch_job(
-        self.assertIsNotNone(job_fetch)
-    def test_getting_registries(self):
-        """Getting job registries from queue object"""
-        queue = Queue('example')
-        self.assertEqual(queue.scheduled_job_registry, ScheduledJobRegistry(queue=queue))
-        self.assertEqual(queue.started_job_registry, StartedJobRegistry(queue=queue))
-        self.assertEqual(queue.failed_job_registry, FailedJobRegistry(queue=queue))
-        self.assertEqual(queue.deferred_job_registry, DeferredJobRegistry(queue=queue))
-        self.assertEqual(queue.finished_job_registry, FinishedJobRegistry(queue=queue))
-        self.assertEqual(queue.canceled_job_registry, CanceledJobRegistry(queue=queue))
-    def test_getting_registries_with_serializer(self):
-        """Getting job registries from queue object (with custom serializer)"""
-        queue = Queue('example', serializer=JSONSerializer)
-        self.assertEqual(queue.scheduled_job_registry, ScheduledJobRegistry(queue=queue))
-        self.assertEqual(queue.started_job_registry, StartedJobRegistry(queue=queue))
-        self.assertEqual(queue.failed_job_registry, FailedJobRegistry(queue=queue))
-        self.assertEqual(queue.deferred_job_registry, DeferredJobRegistry(queue=queue))
-        self.assertEqual(queue.finished_job_registry, FinishedJobRegistry(queue=queue))
-        self.assertEqual(queue.canceled_job_registry, CanceledJobRegistry(queue=queue))
-        # Make sure we don't use default when queue has custom
-        self.assertEqual(queue.scheduled_job_registry.serializer, JSONSerializer)
-        self.assertEqual(queue.started_job_registry.serializer, JSONSerializer)
-        self.assertEqual(queue.failed_job_registry.serializer, JSONSerializer)
-        self.assertEqual(queue.deferred_job_registry.serializer, JSONSerializer)
-        self.assertEqual(queue.finished_job_registry.serializer, JSONSerializer)
-        self.assertEqual(queue.canceled_job_registry.serializer, JSONSerializer)
-    def test_enqueue_with_retry(self):
-        """Enqueueing with retry_strategy works"""
-        queue = Queue('example', connection=self.testconn)
-        job = queue.enqueue(say_hello, retry=Retry(max=3, interval=5))
-        job = Job.fetch(, connection=self.testconn)
-        self.assertEqual(job.retries_left, 3)
-        self.assertEqual(job.retry_intervals, [5])
-class TestJobScheduling(RQTestCase):
-    def test_enqueue_at(self):
-        """enqueue_at() creates a job in ScheduledJobRegistry"""
-        queue = Queue(connection=self.testconn)
-        scheduled_time = + timedelta(seconds=10)
-        job = queue.enqueue_at(scheduled_time, say_hello)
-        registry = ScheduledJobRegistry(queue=queue)
-        self.assertIn(job, registry)
-        self.assertTrue(registry.get_expiration_time(job), scheduled_time)
diff --git a/tests/ b/tests/
deleted file mode 100644
index 5dd0be6..0000000
--- a/tests/
+++ /dev/null
@@ -1,513 +0,0 @@
-from datetime import datetime, timedelta
-from unittest import mock
-from unittest.mock import ANY
-from rq.defaults import DEFAULT_FAILURE_TTL
-from rq.exceptions import AbandonedJobError, InvalidJobOperation
-from rq.job import Job, JobStatus, requeue_job
-from rq.queue import Queue
-from rq.registry import (
-    CanceledJobRegistry,
-    DeferredJobRegistry,
-    FailedJobRegistry,
-    FinishedJobRegistry,
-    StartedJobRegistry,
-    clean_registries,
-from rq.serializers import JSONSerializer
-from rq.utils import as_text, current_timestamp
-from rq.worker import Worker
-from tests import RQTestCase
-from tests.fixtures import div_by_zero, say_hello
-class CustomJob(Job):
-    """A custom job class just to test it"""
-class TestRegistry(RQTestCase):
-    def setUp(self):
-        super().setUp()
-        self.registry = StartedJobRegistry(connection=self.testconn)
-    def test_init(self):
-        """Registry can be instantiated with queue or name/Redis connection"""
-        queue = Queue('foo', connection=self.testconn)
-        registry = StartedJobRegistry(queue=queue)
-        self.assertEqual(,
-        self.assertEqual(registry.connection, queue.connection)
-        self.assertEqual(registry.serializer, queue.serializer)
-        registry = StartedJobRegistry('bar', self.testconn, serializer=JSONSerializer)
-        self.assertEqual(, 'bar')
-        self.assertEqual(registry.connection, self.testconn)
-        self.assertEqual(registry.serializer, JSONSerializer)
-    def test_key(self):
-        self.assertEqual(self.registry.key, 'rq:wip:default')
-    def test_custom_job_class(self):
-        registry = StartedJobRegistry(job_class=CustomJob)
-        self.assertFalse(registry.job_class == self.registry.job_class)
-    def test_contains(self):
-        registry = StartedJobRegistry(connection=self.testconn)
-        queue = Queue(connection=self.testconn)
-        job = queue.enqueue(say_hello)
-        self.assertFalse(job in registry)
-        self.assertFalse( in registry)
-        registry.add(job, 5)
-        self.assertTrue(job in registry)
-        self.assertTrue( in registry)
-    def test_get_expiration_time(self):
-        """registry.get_expiration_time() returns correct datetime objects"""
-        registry = StartedJobRegistry(connection=self.testconn)
-        queue = Queue(connection=self.testconn)
-        job = queue.enqueue(say_hello)
-        registry.add(job, 5)
-        time = registry.get_expiration_time(job)
-        expected_time = (datetime.utcnow() + timedelta(seconds=5)).replace(microsecond=0)
-        self.assertGreaterEqual(time, expected_time - timedelta(seconds=2))
-        self.assertLessEqual(time, expected_time + timedelta(seconds=2))
-    def test_add_and_remove(self):
-        """Adding and removing job to StartedJobRegistry."""
-        timestamp = current_timestamp()
-        queue = Queue(connection=self.testconn)
-        job = queue.enqueue(say_hello)
-        # Test that job is added with the right score
-        self.registry.add(job, 1000)
-        self.assertLess(self.testconn.zscore(self.registry.key,, timestamp + 1002)
-        # Ensure that a timeout of -1 results in a score of inf
-        self.registry.add(job, -1)
-        self.assertEqual(self.testconn.zscore(self.registry.key,, float('inf'))
-        # Ensure that job is removed from sorted set, but job key is not deleted
-        self.registry.remove(job)
-        self.assertIsNone(self.testconn.zscore(self.registry.key,
-        self.assertTrue(self.testconn.exists(job.key))
-        self.registry.add(job, -1)
-        # registry.remove() also accepts
-        self.registry.remove(
-        self.assertIsNone(self.testconn.zscore(self.registry.key,
-        self.registry.add(job, -1)
-        # delete_job = True deletes job key
-        self.registry.remove(job, delete_job=True)
-        self.assertIsNone(self.testconn.zscore(self.registry.key,
-        self.assertFalse(self.testconn.exists(job.key))
-        job = queue.enqueue(say_hello)
-        self.registry.add(job, -1)
-        # delete_job = True also works with
-        self.registry.remove(, delete_job=True)
-        self.assertIsNone(self.testconn.zscore(self.registry.key,
-        self.assertFalse(self.testconn.exists(job.key))
-    def test_add_and_remove_with_serializer(self):
-        """Adding and removing job to StartedJobRegistry (with serializer)."""
-        # delete_job = True also works with and custom serializer
-        queue = Queue(connection=self.testconn, serializer=JSONSerializer)
-        registry = StartedJobRegistry(connection=self.testconn, serializer=JSONSerializer)
-        job = queue.enqueue(say_hello)
-        registry.add(job, -1)
-        registry.remove(, delete_job=True)
-        self.assertIsNone(self.testconn.zscore(registry.key,
-        self.assertFalse(self.testconn.exists(job.key))
-    def test_get_job_ids(self):
-        """Getting job ids from StartedJobRegistry."""
-        timestamp = current_timestamp()
-        self.testconn.zadd(self.registry.key, {'foo': timestamp + 10})
-        self.testconn.zadd(self.registry.key, {'bar': timestamp + 20})
-        self.assertEqual(self.registry.get_job_ids(), ['foo', 'bar'])
-    def test_get_expired_job_ids(self):
-        """Getting expired job ids form StartedJobRegistry."""
-        timestamp = current_timestamp()
-        self.testconn.zadd(self.registry.key, {'foo': 1})
-        self.testconn.zadd(self.registry.key, {'bar': timestamp + 10})
-        self.testconn.zadd(self.registry.key, {'baz': timestamp + 30})
-        self.assertEqual(self.registry.get_expired_job_ids(), ['foo'])
-        self.assertEqual(self.registry.get_expired_job_ids(timestamp + 20), ['foo', 'bar'])
-        # CanceledJobRegistry does not implement get_expired_job_ids()
-        registry = CanceledJobRegistry(connection=self.testconn)
-        self.assertRaises(NotImplementedError, registry.get_expired_job_ids)
-    def test_cleanup_moves_jobs_to_failed_job_registry(self):
-        """Moving expired jobs to FailedJobRegistry."""
-        queue = Queue(connection=self.testconn)
-        failed_job_registry = FailedJobRegistry(connection=self.testconn)
-        job = queue.enqueue(say_hello)
-        self.testconn.zadd(self.registry.key, { 2})
-        # Job has not been moved to FailedJobRegistry
-        self.registry.cleanup(1)
-        self.assertNotIn(job, failed_job_registry)
-        self.assertIn(job, self.registry)
-        with mock.patch.object(Job, 'execute_failure_callback') as mocked:
-            self.registry.cleanup()
-            mocked.assert_called_once_with(queue.death_penalty_class, AbandonedJobError, ANY, ANY)
-        self.assertIn(, failed_job_registry)
-        self.assertNotIn(job, self.registry)
-        job.refresh()
-        self.assertEqual(job.get_status(), JobStatus.FAILED)
-        self.assertTrue(job.exc_info)  # explanation is written to exc_info
-    def test_job_execution(self):
-        """Job is removed from StartedJobRegistry after execution."""
-        registry = StartedJobRegistry(connection=self.testconn)
-        queue = Queue(connection=self.testconn)
-        worker = Worker([queue])
-        job = queue.enqueue(say_hello)
-        self.assertTrue(job.is_queued)
-        worker.prepare_job_execution(job)
-        self.assertIn(, registry.get_job_ids())
-        self.assertTrue(job.is_started)
-        worker.perform_job(job, queue)
-        self.assertNotIn(, registry.get_job_ids())
-        self.assertTrue(job.is_finished)
-        # Job that fails
-        job = queue.enqueue(div_by_zero)
-        worker.prepare_job_execution(job)
-        self.assertIn(, registry.get_job_ids())
-        worker.perform_job(job, queue)
-        self.assertNotIn(, registry.get_job_ids())
-    def test_job_deletion(self):
-        """Ensure job is removed from StartedJobRegistry when deleted."""
-        registry = StartedJobRegistry(connection=self.testconn)
-        queue = Queue(connection=self.testconn)
-        worker = Worker([queue])
-        job = queue.enqueue(say_hello)
-        self.assertTrue(job.is_queued)
-        worker.prepare_job_execution(job)
-        self.assertIn(, registry.get_job_ids())
-        job.delete()
-        self.assertNotIn(, registry.get_job_ids())
-    def test_get_job_count(self):
-        """StartedJobRegistry returns the right number of job count."""
-        timestamp = current_timestamp() + 10
-        self.testconn.zadd(self.registry.key, {'foo': timestamp})
-        self.testconn.zadd(self.registry.key, {'bar': timestamp})
-        self.assertEqual(self.registry.count, 2)
-        self.assertEqual(len(self.registry), 2)
-        # Make sure
-    def test_clean_registries(self):
-        """clean_registries() cleans Started and Finished job registries."""
-        queue = Queue(connection=self.testconn)
-        finished_job_registry = FinishedJobRegistry(connection=self.testconn)
-        self.testconn.zadd(finished_job_registry.key, {'foo': 1})
-        started_job_registry = StartedJobRegistry(connection=self.testconn)
-        self.testconn.zadd(started_job_registry.key, {'foo': 1})
-        failed_job_registry = FailedJobRegistry(connection=self.testconn)
-        self.testconn.zadd(failed_job_registry.key, {'foo': 1})
-        clean_registries(queue)
-        self.assertEqual(self.testconn.zcard(finished_job_registry.key), 0)
-        self.assertEqual(self.testconn.zcard(started_job_registry.key), 0)
-        self.assertEqual(self.testconn.zcard(failed_job_registry.key), 0)
-    def test_clean_registries_with_serializer(self):
-        """clean_registries() cleans Started and Finished job registries (with serializer)."""
-        queue = Queue(connection=self.testconn, serializer=JSONSerializer)
-        finished_job_registry = FinishedJobRegistry(connection=self.testconn, serializer=JSONSerializer)
-        self.testconn.zadd(finished_job_registry.key, {'foo': 1})
-        started_job_registry = StartedJobRegistry(connection=self.testconn, serializer=JSONSerializer)
-        self.testconn.zadd(started_job_registry.key, {'foo': 1})
-        failed_job_registry = FailedJobRegistry(connection=self.testconn, serializer=JSONSerializer)
-        self.testconn.zadd(failed_job_registry.key, {'foo': 1})
-        clean_registries(queue)
-        self.assertEqual(self.testconn.zcard(finished_job_registry.key), 0)
-        self.assertEqual(self.testconn.zcard(started_job_registry.key), 0)
-        self.assertEqual(self.testconn.zcard(failed_job_registry.key), 0)
-    def test_get_queue(self):
-        """registry.get_queue() returns the right Queue object."""
-        registry = StartedJobRegistry(connection=self.testconn)
-        self.assertEqual(registry.get_queue(), Queue(connection=self.testconn))
-        registry = StartedJobRegistry('foo', connection=self.testconn, serializer=JSONSerializer)
-        self.assertEqual(registry.get_queue(), Queue('foo', connection=self.testconn, serializer=JSONSerializer))
-class TestFinishedJobRegistry(RQTestCase):
-    def setUp(self):
-        super().setUp()
-        self.registry = FinishedJobRegistry(connection=self.testconn)
-    def test_key(self):
-        self.assertEqual(self.registry.key, 'rq:finished:default')
-    def test_cleanup(self):
-        """Finished job registry removes expired jobs."""
-        timestamp = current_timestamp()
-        self.testconn.zadd(self.registry.key, {'foo': 1})
-        self.testconn.zadd(self.registry.key, {'bar': timestamp + 10})
-        self.testconn.zadd(self.registry.key, {'baz': timestamp + 30})
-        self.registry.cleanup()
-        self.assertEqual(self.registry.get_job_ids(), ['bar', 'baz'])
-        self.registry.cleanup(timestamp + 20)
-        self.assertEqual(self.registry.get_job_ids(), ['baz'])
-        # CanceledJobRegistry now implements noop cleanup, should not raise exception
-        registry = CanceledJobRegistry(connection=self.testconn)
-        registry.cleanup()
-    def test_jobs_are_put_in_registry(self):
-        """Completed jobs are added to FinishedJobRegistry."""
-        self.assertEqual(self.registry.get_job_ids(), [])
-        queue = Queue(connection=self.testconn)
-        worker = Worker([queue])
-        # Completed jobs are put in FinishedJobRegistry
-        job = queue.enqueue(say_hello)
-        worker.perform_job(job, queue)
-        self.assertEqual(self.registry.get_job_ids(), [])
-        # When job is deleted, it should be removed from FinishedJobRegistry
-        self.assertEqual(job.get_status(), JobStatus.FINISHED)
-        job.delete()
-        self.assertEqual(self.registry.get_job_ids(), [])
-        # Failed jobs are not put in FinishedJobRegistry
-        failed_job = queue.enqueue(div_by_zero)
-        worker.perform_job(failed_job, queue)
-        self.assertEqual(self.registry.get_job_ids(), [])
-class TestDeferredRegistry(RQTestCase):
-    def setUp(self):
-        super().setUp()
-        self.registry = DeferredJobRegistry(connection=self.testconn)
-    def test_key(self):
-        self.assertEqual(self.registry.key, 'rq:deferred:default')
-    def test_add(self):
-        """Adding a job to DeferredJobsRegistry."""
-        job = Job()
-        self.registry.add(job)
-        job_ids = [as_text(job_id) for job_id in self.testconn.zrange(self.registry.key, 0, -1)]
-        self.assertEqual(job_ids, [])
-    def test_register_dependency(self):
-        """Ensure job creation and deletion works with DeferredJobRegistry."""
-        queue = Queue(connection=self.testconn)
-        job = queue.enqueue(say_hello)
-        job2 = queue.enqueue(say_hello, depends_on=job)
-        registry = DeferredJobRegistry(connection=self.testconn)
-        self.assertEqual(registry.get_job_ids(), [])
-        # When deleted, job removes itself from DeferredJobRegistry
-        job2.delete()
-        self.assertEqual(registry.get_job_ids(), [])
-class TestFailedJobRegistry(RQTestCase):
-    def test_default_failure_ttl(self):
-        """Job TTL defaults to DEFAULT_FAILURE_TTL"""
-        queue = Queue(connection=self.testconn)
-        job = queue.enqueue(say_hello)
-        registry = FailedJobRegistry(connection=self.testconn)
-        key = registry.key
-        timestamp = current_timestamp()
-        registry.add(job)
-        score = self.testconn.zscore(key,
-        self.assertLess(score, timestamp + DEFAULT_FAILURE_TTL + 2)
-        self.assertGreater(score, timestamp + DEFAULT_FAILURE_TTL - 2)
-        # Job key will also expire
-        job_ttl = self.testconn.ttl(job.key)
-        self.assertLess(job_ttl, DEFAULT_FAILURE_TTL + 2)
-        self.assertGreater(job_ttl, DEFAULT_FAILURE_TTL - 2)
-        timestamp = current_timestamp()
-        ttl = 5
-        registry.add(job, ttl=ttl)
-        score = self.testconn.zscore(key,
-        self.assertLess(score, timestamp + ttl + 2)
-        self.assertGreater(score, timestamp + ttl - 2)
-        job_ttl = self.testconn.ttl(job.key)
-        self.assertLess(job_ttl, ttl + 2)
-        self.assertGreater(job_ttl, ttl - 2)
-    def test_requeue(self):
-        """FailedJobRegistry.requeue works properly"""
-        queue = Queue(connection=self.testconn)
-        job = queue.enqueue(div_by_zero, failure_ttl=5)
-        worker = Worker([queue])
-        registry = FailedJobRegistry(connection=worker.connection)
-        self.assertTrue(job in registry)
-        registry.requeue(
-        self.assertFalse(job in registry)
-        self.assertIn(, queue.get_job_ids())
-        job.refresh()
-        self.assertEqual(job.get_status(), JobStatus.QUEUED)
-        self.assertEqual(job.started_at, None)
-        self.assertEqual(job.ended_at, None)
-        self.assertTrue(job in registry)
-        # Should also work with job instance
-        registry.requeue(job)
-        self.assertFalse(job in registry)
-        self.assertIn(, queue.get_job_ids())
-        job.refresh()
-        self.assertEqual(job.get_status(), JobStatus.QUEUED)
-        self.assertTrue(job in registry)
-        # requeue_job should work the same way
-        requeue_job(, connection=self.testconn)
-        self.assertFalse(job in registry)
-        self.assertIn(, queue.get_job_ids())
-        job.refresh()
-        self.assertEqual(job.get_status(), JobStatus.QUEUED)
-        self.assertTrue(job in registry)
-        # And so does job.requeue()
-        job.requeue()
-        self.assertFalse(job in registry)
-        self.assertIn(, queue.get_job_ids())
-        job.refresh()
-        self.assertEqual(job.get_status(), JobStatus.QUEUED)
-    def test_requeue_with_serializer(self):
-        """FailedJobRegistry.requeue works properly (with serializer)"""
-        queue = Queue(connection=self.testconn, serializer=JSONSerializer)
-        job = queue.enqueue(div_by_zero, failure_ttl=5)
-        worker = Worker([queue], serializer=JSONSerializer)
-        registry = FailedJobRegistry(connection=worker.connection, serializer=JSONSerializer)
-        self.assertTrue(job in registry)
-        registry.requeue(
-        self.assertFalse(job in registry)
-        self.assertIn(, queue.get_job_ids())
-        job.refresh()
-        self.assertEqual(job.get_status(), JobStatus.QUEUED)
-        self.assertEqual(job.started_at, None)
-        self.assertEqual(job.ended_at, None)
-        self.assertTrue(job in registry)
-        # Should also work with job instance
-        registry.requeue(job)
-        self.assertFalse(job in registry)
-        self.assertIn(, queue.get_job_ids())
-        job.refresh()
-        self.assertEqual(job.get_status(), JobStatus.QUEUED)
-        self.assertTrue(job in registry)
-        # requeue_job should work the same way
-        requeue_job(, connection=self.testconn, serializer=JSONSerializer)
-        self.assertFalse(job in registry)
-        self.assertIn(, queue.get_job_ids())
-        job.refresh()
-        self.assertEqual(job.get_status(), JobStatus.QUEUED)
-        self.assertTrue(job in registry)
-        # And so does job.requeue()
-        job.requeue()
-        self.assertFalse(job in registry)
-        self.assertIn(, queue.get_job_ids())
-        job.refresh()
-        self.assertEqual(job.get_status(), JobStatus.QUEUED)
-    def test_invalid_job(self):
-        """Requeuing a job that's not in FailedJobRegistry raises an error."""
-        queue = Queue(connection=self.testconn)
-        job = queue.enqueue(say_hello)
-        registry = FailedJobRegistry(connection=self.testconn)
-        with self.assertRaises(InvalidJobOperation):
-            registry.requeue(job)
-    def test_worker_handle_job_failure(self):
-        """Failed jobs are added to FailedJobRegistry"""
-        q = Queue(connection=self.testconn)
-        w = Worker([q])
-        registry = FailedJobRegistry(connection=w.connection)
-        timestamp = current_timestamp()
-        job = q.enqueue(div_by_zero, failure_ttl=5)
-        w.handle_job_failure(job, q)
-        # job is added to FailedJobRegistry with default failure ttl
-        self.assertIn(, registry.get_job_ids())
-        self.assertLess(self.testconn.zscore(registry.key,, timestamp + DEFAULT_FAILURE_TTL + 5)
-        # job is added to FailedJobRegistry with specified ttl
-        job = q.enqueue(div_by_zero, failure_ttl=5)
-        w.handle_job_failure(job, q)
-        self.assertLess(self.testconn.zscore(registry.key,, timestamp + 7)
diff --git a/tests/ b/tests/
deleted file mode 100644
index e27e872..0000000
--- a/tests/
+++ /dev/null
@@ -1,251 +0,0 @@
-import tempfile
-import unittest
-from datetime import timedelta
-from unittest.mock import PropertyMock, patch
-from redis import Redis
-from rq.job import Job
-from rq.queue import Queue
-from rq.registry import StartedJobRegistry
-from rq.results import Result, get_key
-from rq.utils import get_version, utcnow
-from rq.worker import Worker
-from tests import RQTestCase
-from .fixtures import div_by_zero, say_hello
-@unittest.skipIf(get_version(Redis()) < (5, 0, 0), 'Skip if Redis server < 5.0')
-class TestScheduledJobRegistry(RQTestCase):
-    def test_save_and_get_result(self):
-        """Ensure data is saved properly"""
-        queue = Queue(connection=self.connection)
-        job = queue.enqueue(say_hello)
-        result = Result.fetch_latest(job)
-        self.assertIsNone(result)
-        Result.create(job, Result.Type.SUCCESSFUL, ttl=10, return_value=1)
-        result = Result.fetch_latest(job)
-        self.assertEqual(result.return_value, 1)
-        self.assertEqual(job.latest_result().return_value, 1)
-        # Check that ttl is properly set
-        key = get_key(
-        ttl = self.connection.pttl(key)
-        self.assertTrue(5000 < ttl <= 10000)
-        # Check job with None return value
-        Result.create(job, Result.Type.SUCCESSFUL, ttl=10, return_value=None)
-        result = Result.fetch_latest(job)
-        self.assertIsNone(result.return_value)
-        Result.create(job, Result.Type.SUCCESSFUL, ttl=10, return_value=2)
-        result = Result.fetch_latest(job)
-        self.assertEqual(result.return_value, 2)
-    def test_create_failure(self):
-        """Ensure data is saved properly"""
-        queue = Queue(connection=self.connection)
-        job = queue.enqueue(say_hello)
-        Result.create_failure(job, ttl=10, exc_string='exception')
-        result = Result.fetch_latest(job)
-        self.assertEqual(result.exc_string, 'exception')
-        # Check that ttl is properly set
-        key = get_key(
-        ttl = self.connection.pttl(key)
-        self.assertTrue(5000 < ttl <= 10000)
-    def test_getting_results(self):
-        """Check getting all execution results"""
-        queue = Queue(connection=self.connection)
-        job = queue.enqueue(say_hello)
-        # latest_result() returns None when there's no result
-        self.assertIsNone(job.latest_result())
-        result_1 = Result.create_failure(job, ttl=10, exc_string='exception')
-        result_2 = Result.create(job, Result.Type.SUCCESSFUL, ttl=10, return_value=1)
-        result_3 = Result.create(job, Result.Type.SUCCESSFUL, ttl=10, return_value=1)
-        # Result.fetch_latest() returns the latest result
-        result = Result.fetch_latest(job)
-        self.assertEqual(result, result_3)
-        self.assertEqual(job.latest_result(), result_3)
-        # Result.all() and job.results() returns all results, newest first
-        results = Result.all(job)
-        self.assertEqual(results, [result_3, result_2, result_1])
-        self.assertEqual(job.results(), [result_3, result_2, result_1])
-    def test_count(self):
-        """Result.count(job) returns number of results"""
-        queue = Queue(connection=self.connection)
-        job = queue.enqueue(say_hello)
-        self.assertEqual(Result.count(job), 0)
-        Result.create_failure(job, ttl=10, exc_string='exception')
-        self.assertEqual(Result.count(job), 1)
-        Result.create(job, Result.Type.SUCCESSFUL, ttl=10, return_value=1)
-        self.assertEqual(Result.count(job), 2)
-    def test_delete_all(self):
-        """Result.delete_all(job) deletes all results from Redis"""
-        queue = Queue(connection=self.connection)
-        job = queue.enqueue(say_hello)
-        Result.create_failure(job, ttl=10, exc_string='exception')
-        Result.create(job, Result.Type.SUCCESSFUL, ttl=10, return_value=1)
-        Result.delete_all(job)
-        self.assertEqual(Result.count(job), 0)
-    def test_job_successful_result_fallback(self):
-        """Changes to job.result handling should be backwards compatible."""
-        queue = Queue(connection=self.connection)
-        job = queue.enqueue(say_hello)
-        worker = Worker([queue])
-        worker.register_birth()
-        self.assertEqual(worker.failed_job_count, 0)
-        self.assertEqual(worker.successful_job_count, 0)
-        self.assertEqual(worker.total_working_time, 0)
-        # These should only run on workers that supports Redis streams
-        registry = StartedJobRegistry(connection=self.connection)
-        job.started_at = utcnow()
-        job.ended_at = job.started_at + timedelta(seconds=0.75)
-        job._result = 'Success'
-        worker.handle_job_success(job, queue, registry)
-        payload = self.connection.hgetall(job.key)
-        self.assertFalse(b'result' in payload.keys())
-        self.assertEqual(job.result, 'Success')
-        with patch('rq.worker.Worker.supports_redis_streams', new_callable=PropertyMock) as mock:
-            with patch('rq.job.Job.supports_redis_streams', new_callable=PropertyMock) as job_mock:
-                job_mock.return_value = False
-                mock.return_value = False
-                worker = Worker([queue])
-                worker.register_birth()
-                job = queue.enqueue(say_hello)
-                job._result = 'Success'
-                job.started_at = utcnow()
-                job.ended_at = job.started_at + timedelta(seconds=0.75)
-                # If `save_result_to_job` = True, result will be saved to job
-                # hash, simulating older versions of RQ
-                worker.handle_job_success(job, queue, registry)
-                payload = self.connection.hgetall(job.key)
-                self.assertTrue(b'result' in payload.keys())
-                # Delete all new result objects so we only have result stored in job hash,
-                # this should simulate a job that was executed in an earlier RQ version
-                self.assertEqual(job.result, 'Success')
-    def test_job_failed_result_fallback(self):
-        """Changes to job.result failure handling should be backwards compatible."""
-        queue = Queue(connection=self.connection)
-        job = queue.enqueue(say_hello)
-        worker = Worker([queue])
-        worker.register_birth()
-        self.assertEqual(worker.failed_job_count, 0)
-        self.assertEqual(worker.successful_job_count, 0)
-        self.assertEqual(worker.total_working_time, 0)
-        registry = StartedJobRegistry(connection=self.connection)
-        job.started_at = utcnow()
-        job.ended_at = job.started_at + timedelta(seconds=0.75)
-        worker.handle_job_failure(job, exc_string='Error', queue=queue, started_job_registry=registry)
-        job = Job.fetch(, connection=self.connection)
-        payload = self.connection.hgetall(job.key)
-        self.assertFalse(b'exc_info' in payload.keys())
-        self.assertEqual(job.exc_info, 'Error')
-        with patch('rq.worker.Worker.supports_redis_streams', new_callable=PropertyMock) as mock:
-            with patch('rq.job.Job.supports_redis_streams', new_callable=PropertyMock) as job_mock:
-                job_mock.return_value = False
-                mock.return_value = False
-                worker = Worker([queue])
-                worker.register_birth()
-                job = queue.enqueue(say_hello)
-                job.started_at = utcnow()
-                job.ended_at = job.started_at + timedelta(seconds=0.75)
-                # If `save_result_to_job` = True, result will be saved to job
-                # hash, simulating older versions of RQ
-                worker.handle_job_failure(job, exc_string='Error', queue=queue, started_job_registry=registry)
-                payload = self.connection.hgetall(job.key)
-                self.assertTrue(b'exc_info' in payload.keys())
-                # Delete all new result objects so we only have result stored in job hash,
-                # this should simulate a job that was executed in an earlier RQ version
-                Result.delete_all(job)
-                job = Job.fetch(, connection=self.connection)
-                self.assertEqual(job.exc_info, 'Error')
-    def test_job_return_value(self):
-        """Test job.return_value"""
-        queue = Queue(connection=self.connection)
[The following lists of changes regard files as different if they have different names, permissions or owners.]

Files in second set of .debs but not in first

-rw-r--r--  root/root   /usr/lib/python3/dist-packages/rq-1.15.1.egg-info/PKG-INFO
-rw-r--r--  root/root   /usr/lib/python3/dist-packages/rq-1.15.1.egg-info/dependency_links.txt
-rw-r--r--  root/root   /usr/lib/python3/dist-packages/rq-1.15.1.egg-info/entry_points.txt
-rw-r--r--  root/root   /usr/lib/python3/dist-packages/rq-1.15.1.egg-info/not-zip-safe
-rw-r--r--  root/root   /usr/lib/python3/dist-packages/rq-1.15.1.egg-info/requires.txt
-rw-r--r--  root/root   /usr/lib/python3/dist-packages/rq-1.15.1.egg-info/top_level.txt
-rw-r--r--  root/root   /usr/lib/python3/dist-packages/rq/

Files in first set of .debs but not in second

-rw-r--r--  root/root   /usr/lib/python3/dist-packages/rq-1.15.0.egg-info/PKG-INFO
-rw-r--r--  root/root   /usr/lib/python3/dist-packages/rq-1.15.0.egg-info/dependency_links.txt
-rw-r--r--  root/root   /usr/lib/python3/dist-packages/rq-1.15.0.egg-info/entry_points.txt
-rw-r--r--  root/root   /usr/lib/python3/dist-packages/rq-1.15.0.egg-info/not-zip-safe
-rw-r--r--  root/root   /usr/lib/python3/dist-packages/rq-1.15.0.egg-info/requires.txt
-rw-r--r--  root/root   /usr/lib/python3/dist-packages/rq-1.15.0.egg-info/top_level.txt
-rw-r--r--  root/root   /usr/share/doc/python3-rq/changelog.gz

No differences were encountered in the control files

More details

Full run details