Bogdanp / django_dramatiq

A Django app that integrates with Dramatiq.
https://dramatiq.io
Other
331 stars 77 forks source link

Failed task are hanging the test #53

Open husio opened 4 years ago

husio commented 4 years ago

When running a test that executes a background task that fails, test is hanging forever.

I am expecting that when calling broker.join with fail_fast=True, exception raised in the task will be propagated in the test into the broker.join call. Instead the test never finish.

Here is an example code, I can share the whole example Django project if needed.

# myapp/tests.py
@pytest.mark.django_db(transaction=True)
def test_register_user_task_using_invalid_input(broker, worker):
    tasks.register_user.send("")
    broker.join(tasks.register_user.queue_name, fail_fast=True)
    worker.join()
    assert User.objects.count() == 1

# myapp/tasks.py
@dramatiq.actor
def register_user(email):
    if not email:
        raise Exception("empty")
    user = User.objects.create(email=email)
    return user.id
platform linux -- Python 3.7.3, pytest-5.1.2, py-1.8.0, pluggy-0.13.0
Django settings: testapp.settings (from environment variable)
rootdir: /home/piotr/testapp, inifile: setup.cfg
plugins: django-3.5.1
.venv/lib/python3.7/site-packages/_pytest/python.py:170: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

broker = <dramatiq.brokers.stub.StubBroker object at 0x7efd94f00320>, worker = <dramatiq.worker.Worker object at 0x7efd8efc3c50>

    @pytest.mark.django_db(transaction=True)
    def test_register_user_task_using_invalid_input(broker, worker):
        tasks.register_user.send("")
>       broker.join(tasks.register_user.queue_name, fail_fast=True)

myapp/tests.py:27: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <dramatiq.brokers.stub.StubBroker object at 0x7efd94f00320>, queue_name = 'default'

    def join(self, queue_name, *, fail_fast=False, timeout=None):
        """Wait for all the messages on the given queue to be
        processed.  This method is only meant to be used in tests
        to wait for all the messages in a queue to be processed.

        Raises:
          QueueJoinTimeout: When the timeout elapses.
          QueueNotFound: If the given queue was never declared.

        Parameters:
          queue_name(str): The queue to wait on.
          fail_fast(bool): When this is True and any message gets
            dead-lettered during the join, then an exception will be
            raised.  This will be True by default starting with
            version 2.0.
          timeout(Optional[int]): The max amount of time, in
            milliseconds, to wait on this queue.
        """
        try:
            queues = [
                self.queues[queue_name],
                self.queues[dq_name(queue_name)],
            ]
        except KeyError:
            raise QueueNotFound(queue_name)

        deadline = timeout and time.monotonic() + timeout / 1000
        while True:
            for queue in queues:
                timeout = deadline and deadline - time.monotonic()
>               join_queue(queue, timeout=timeout)

.venv/lib/python3.7/site-packages/dramatiq/brokers/stub.py:160: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

queue = <queue.Queue object at 0x7efd94ca4ac8>, timeout = None

    def join_queue(queue, timeout=None):
        """The join() method of standard queues in Python doesn't support
        timeouts.  This implements the same functionality as that method,
        with optional timeout support, by depending the internals of
        Queue.

        Raises:
          QueueJoinTimeout: When the timeout is reached.

        Parameters:
          timeout(Optional[float])
        """
        with queue.all_tasks_done:
            while queue.unfinished_tasks:
>               finished_in_time = queue.all_tasks_done.wait(timeout=timeout)