Bogdanp / django_dramatiq

A Django app that integrates with Dramatiq.
https://dramatiq.io
Other
349 stars 77 forks source link

Using dramatiq, apscheduler with RabbitMQresults in missing heartbeats on RabbitMQ #44

Open th0th opened 5 years ago

th0th commented 5 years ago

I needed to add a scheduler to run some actors periodically and I added a django management command to run the scheduler, it goes like this:

import signal
import sys

from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.interval import IntervalTrigger
from django.core.management.base import BaseCommand

class Command(BaseCommand):
    help = 'Run task scheduler'

    def handle(self, *args, **options):
        scheduler = BlockingScheduler()

        scheduler.add(
            trigger=IntervalTrigger(
                seconds=30,
            ),
            name='task1',
            func='project.apps.app.tasks.task1.send',  # Edit:  added '.send' 
        )

        def shutdown(*args):
            self.stdout.write("Exiting...")
            sys.exit(0)

        signal.signal(signal.SIGINT, shutdown)
        signal.signal(signal.SIGTERM, shutdown)

        self.stdout.write("Discovered tasks:")

        for s in scheduler.get_jobs():
            self.stdout.write(f"* {s.name} - {s.trigger}")

        self.stdout.write("\nStarting scheduler...")

        scheduler.start()

        return 0

It works but connection resets from time to time, with this:

[2019-07-05 21:40:49,396] [ERROR] pika.adapters.utils.io_services_utils: _AsyncBaseTransport._produce() failed, aborting connection: error=ConnectionResetError(104, 'Connection reset by peer'); sock=<socket.socket fd=36, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('10.42.6.131', 55836)>; Caller's stack: Traceback (most recent call last): File "/usr/lib/python3.7/site-packages/pika/adapters/utils/io_services_utils.py", line 1097, in _on_socket_writable self._produce() File "/usr/lib/python3.7/site-packages/pika/adapters/utils/io_services_utils.py", line 820, in _produce self._tx_buffers[0]) File "/usr/lib/python3.7/site-packages/pika/adapters/utils/io_services_utils.py", line 79, in retry_sigint_wrap return func(*args, *kwargs) File "/usr/lib/python3.7/site-packages/pika/adapters/utils/io_services_utils.py", line 861, in _sigint_safe_send return sock.send(data) ConnectionResetError: [Errno 104] Connection reset by peer Traceback (most recent call last): File "/usr/lib/python3.7/site-packages/pika/adapters/utils/io_services_utils.py", line 1097, in _on_socket_writable self._produce() File "/usr/lib/python3.7/site-packages/pika/adapters/utils/io_services_utils.py", line 820, in _produce self._tx_buffers[0]) File "/usr/lib/python3.7/site-packages/pika/adapters/utils/io_services_utils.py", line 79, in retry_sigint_wrap return func(args, **kwargs) File "/usr/lib/python3.7/site-packages/pika/adapters/utils/io_services_utils.py", line 861, in _sigint_safe_send return sock.send(data) ConnectionResetError: [Errno 104] Connection reset by peer [2019-07-05 21:40:49,488] [ERROR] pika.adapters.base_connection: connection_lost: StreamLostError: ("Stream connection lost: ConnectionResetError(104, 'Connection reset by peer')",) [2019-07-05 21:40:49,555] [ERROR] pika.adapters.blocking_connection: Unexpected connection close detected: StreamLostError: ("Stream connection lost: ConnectionResetError(104, 'Connection reset by peer')",)

and here is the RabbitMQ log:

2019-07-05 16:24:40.426 [error] <0.31273.0> closing AMQP connection <0.31273.0> (10.42.6.131:36524 -> 10.42.6.93:5672): missed heartbeats from client, timeout: 60s

I tried to do something similar to one in @Bogdanp post. Am I missing something?

Bogdanp commented 5 years ago

The issue here is that the broker doesn't immediately close connections after messages get enqueued, the reasoning being that you're usually going to enqueue more than one message off of a connection. That, combined with the fact that BlockingScheduler blocks the whole thread means heartbeats are never sent to RMQ so the connection eventually gets closed.

The fix here would be to change Dramatiq so that all enqueues are serialized off of a single thread that can wake up everyone once in a while and send heartbeats, but until I can get around to doing that, you can just append

get_broker().connection.close()

to the function you're scheduling. That should get rid of these errors.

th0th commented 5 years ago

Thanks for the quick response @Bogdanp! 💐

I have multiple actors scheduled and I'd rather wait for the update than add broker connection closing part to all scheduled actors :)

Bogdanp commented 5 years ago

@th0th in the mean time, you could also increase your heartbeat timeouts.

th0th commented 5 years ago

@Bogdanp not to pressure or anything but, do you have a time in mind for the update? I am just asking to schedule my own release :)

Bogdanp commented 5 years ago

I don't intend to make that change I mentinoed for a long time, sorry.

th0th commented 5 years ago

Oh, when you said I thought it was a trivial and planned change, sorry. I would better implement the workaround you suggested then. Thank you @Bogdanp.

th0th commented 5 years ago

@Bogdanp I took your advice and added rabbitmq.get_broker().connection.close() to the end of each scheduled task and now I constantly get warnings like this:

[2019-07-14 08:06:47,943] [WARNING] dramatiq.worker.WorkerThread: Failed to process message task() with unhandled exception.
Traceback (most recent call last):
File "/usr/lib/python3.7/site-packages/dramatiq/worker.py", line 470, in process_message
res = actor(*message.args, **message.kwargs)
File "/usr/lib/python3.7/site-packages/dramatiq/actor.py", line 145, in __call__
return self.fn(*args, **kwargs)
File "./project/apps/app/tasks.py", line 173, in task
dramatiq.get_broker().connection.close()
File "/usr/lib/python3.7/site-packages/pika/adapters/blocking_connection.py", line 787, in close
raise exceptions.ConnectionWrongStateError(msg)
pika.exceptions.ConnectionWrongStateError: BlockingConnection.close(200, 'Normal shutdown') called on closed connection.

Am I doing something wrong?

th0th commented 5 years ago

I think I came up with a proper workaround and I wanted to share in case someone else stumbles upon the same issue:

import importlib
import signal
import sys

from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.interval import IntervalTrigger
from django.core.management.base import BaseCommand

def send(job_path):
    module_path, func_name = job_path.split(':')

    task_module = importlib.import_module(module_path)
    task = getattr(task_module, func_name)

    task.send()

    dramatiq.get_broker().connection.close()

class Command(BaseCommand):
    help = 'Run task scheduler'

    def handle(self, *args, **options):
        scheduler = BlockingScheduler()

        scheduler.add.job(
            trigger=IntervalTrigger(
                seconds=30,
            ),
            name='task1',
            func=send,
            kwargs={
                'job_path': 'project.apps.app.tasks.task1'
            },
        )

        def shutdown(*args):
            self.stdout.write("Exiting...")
            sys.exit(0)

        signal.signal(signal.SIGINT, shutdown)
        signal.signal(signal.SIGTERM, shutdown)

        self.stdout.write("Discovered tasks:")

        for s in scheduler.get_jobs():
            self.stdout.write(f"* {s.name} - {s.trigger}")

        self.stdout.write("\nStarting scheduler...")

        scheduler.start()

        return 0
xunto commented 1 year ago

Sorry for necroposting, but I want to express my interest in actual fixes for this (not just a workaround). I assume it is not worked on at the moment?