Bogdanp / django_dramatiq

A Django app that integrates with Dramatiq.
https://dramatiq.io
Other
347 stars 77 forks source link

StreamLostError while sending tasks #85

Closed dnmellen closed 3 years ago

dnmellen commented 3 years ago

Hi! I have a django app (served with gunicorn with sync worker, not gevent) that sends tasks to dramatiq. The workers use gevent, since my background tasks are basically API calls to third-party apps.

Sometimes I see this kind of error when I try to perform a .send() from the backend (not the gevent worker):

Unexpected connection close detected: "StreamLostError: (\"Stream connection lost: ConnectionResetError(104, 'Connection reset by peer')\",)"

I searched for this error and a possible solution was to increase the RabbitMQ heartbeat (I set it to 600) or close the connection after every .send(). (https://github.com/Bogdanp/dramatiq/issues/217)

The only option that worked for me is to close the connection after every .send(), but I wonder if this solution penalizes performance when I have to send a lot of messages to dramatiq and if so, how can I send multiple messages to dramatiq in an efficient way.

Thanks in advance!

Bogdanp commented 3 years ago

Were the messages lost or was that just a warning that got logged? Dramatiq closes the connections and retries sending (up to six times) when this happens.

dnmellen commented 3 years ago

I tracked those errors with Sentry and the logging level is error. I don't know for sure if the message is lost, because at that point I don't have any message id to search for in the dramatiq worker logs. This error occurs before the message is sent to rabbitmq.

Bogdanp commented 3 years ago

Looks like you might need to filter these logs out of Sentry:

https://github.com/pika/pika/blob/32d450db7e96592be4afcc45e82e7596345344cd/pika/adapters/base_connection.py#L428-L429

Pika ends up logging these messages with the error level, but Dramatiq does re-create the connection and retry when this happens. Unfortunately, pika is kind of loud and obnoxious with some of its logging, so much so that this exists. I suppose _IgnoreScaryLogs could be changed to ignore these as well. If you're feeling up for it, feel free to make a PR.

dnmellen commented 3 years ago

It might be that, I also saw a bunch of BrokenPipe errors too. I suppose I only need to worry about this kind of errors:

Consumer encountered a connection error: ConnectionClosed(StreamLostError: ("Stream connection lost: ConnectionResetError(104, 'Connection reset by peer')",))

Looking at the code, this one seems to be raised after the retries, but it's an old one.

Thanks!

Bogdanp commented 3 years ago

Yes, though even those will just cause the consumer to restart and shouldn't really cause any problems. Increasing the heartbeat interval would help there. I'm going to close this for now, but feel free to re-open if you think it's unresolved.

philippkeller commented 11 months ago

I was bit by the same issue: Everything was working but Sentry was alerting (pushing me out of the free tier actually 😅 )

I was able to filter out the errors with this:

def filter_sentry(event, hint):

    if 'logger' in event and event['logger'] in ['pika.adapters.blocking_connection', 'pika.adapters.base_connection', 'pika.adapters.utils.io_services_utils']:
        return None

    return event

sentry_sdk.init(
    dsn=…
    traces_sample_rate=1.0,
    profiles_sample_rate=1.0,
    before_send=process.filter_sentry,
)