Open heatherKoo07 opened 1 year ago
This is probably some sort of resource leak. The underlying RabbitMQ client we use, pika, creates a socket pair per connection in order to handle interruptions, so this error is probably a red herring and the real issue is you have too many open connections.
If you're using gevent, this issue may be relevant.
Thank you! Yes I do use gevent from gunicorn and have two custom broker middlewares. I addded the middlewares to the broker defined from flask_melodramatiq like below.
broker = flask_melodramatiq.Broker()
dramatiq.set_broker(broker)
broker.add_middleware(
_progress := Progress(),
before=middleware.Retries,
)
broker.add_middleware(
DatabaseMiddleware(),
after=flask_melodramatiq.lazy_broker.AppContextMiddleware,
)
According to the known issue link, I need to create a Middleware class and delete broker connection in the class like this. I'm not sure how to do it with flask_melodramatiq tho
del dramatiq.get_broker().connection
I found that the resource leak was from database connection. I open a transaction for each task with a middleware and MySQL keeps the connections open for 8 hours by default even though I close sessions because of sqlalchemy pooling. I think I can fix it by decreasing the wait_timeout configuration in MySQL to kill the processes ealier than 8 hours.
I am encountering the same issue. I used an outdated version of Dramatiq before, but after reading #381 I updated it to version 1.14.2. I am also using Gevent version 23.9.1.
However, unlike the OP I am not using MySQL and flask_melodramatiq or DB session middleware. Instead, I am using Postgres 13.1 with SQLAlchemy 1.4.48.
I open a DB session using SQLAlchemy's sessionmaker context manager in each individual actor function, so I believe sessions are guaranteed to be closed when the actor exits.
I only encountered this error under high load. It is the exact same exception produced by Pika when calling actor.send()
My version of Pika is 1.3.2.
I have spent a lot of time debugging this issue and still could not find the cause.
I had a hypothesis: SQLAlchemy was creating a new thread-local connection pool for each message and then was failing to properly close connections when message handling was done. This could have been leading to a connection leakage.
I've setup a testing environment:
Monitoring active and idle Postgres connections in one tab with this query:
SELECT
datid,
state,
pid,
application_name
FROM pg_stat_activity
WHERE true
AND state IN ('active', 'idle')
AND datname = 'my_db_name'
ORDER BY backend_start DESC;
Created a test actor to imitate tasks that use the DB:
engine = create_engine(config.DB_DSN, pool_pre_ping=True)
SessionLocal = sessionmaker(autocommit=True, autoflush=False, expire_on_commit=True, bind=engine)
@dramatiq.actor(queue_name="test_db", priority=0)
def mock_db_actor(n: int) -> None:
import time
from sqlalchemy import text
with SessionLocal() as db_session:
logger.info(f"Session {n} opened")
logger.info(db_session.execute(text("SELECT 1;")).scalar_one())
time.sleep(5)
logger.info(f"Session {n} closed")
if __name__ == "__main__":
x = 100
for n in range(x):
mock_db_actor.send(n)
print(f"Spawned {x} tasks")
Then I started testing. I was launching the worker with this code:
dramatiq.cli.main(
broker=__file__,
processes=1,
threads=10,
worker_shutdown_timeout=60_000,
)
I started spawning 100 messages at a time and monitoring the SQL query.
All I could see were 5 open idle connections. The connection count has never grown past that number. Then I tried shutting down the container with the worker and all connections were successfully closed and no longer appeared in the query output.
I then repeated the experiment with updated DB engine settings. This time I used the following code to init the engine:
null_pool_engine = create_engine(config.DB_DSN, poolclass=NullPool)
SessionLocal = sessionmaker(autocommit=True, autoflush=False, expire_on_commit=True, bind=null_pool_engine)
this disabled the connection pooling entirely.
This time I was not able to see any Idle connections in the SQL query's output, although I've set it to auto-refresh every second.
Conclusion:
Any help is appreciated. Hope to hear back from @Bogdanp about this case soon.
This error came back to us again even though I fixed possible resource leak in my app. I think the problem is port conflicts between rabbitmq and mysql when they open connections. I hope to hear back too @Bogdanp
The underlying RabbitMQ client we use, pika, creates a socket pair per connection in order to handle interruptions
After some investigation, I found that the socket pairs returned from pika.compat._nonblocking_socketpair()
are not closed. I think database connection has nothing to do with this issue. Is it a dramatiq bug that doesn't close the sockets? @Bogdanp
I reproduced the error and found that RabbitMQ connection is leaking in dramatiq. When messages are enqueued to RabbitMQ, the pika.BlockingConnections are not closed after the messages are delivered. This eats up all the local socket ports under high load and raises Address already in use error. I created a PR for the fix in https://github.com/Bogdanp/dramatiq/pull/589 @Bogdanp Please review this
@heatherKoo07 thank you for your effort, can you please describe a way to reliably reproduce this issue? I would like to test the solution, suggested by @Bogdanp here
Pasting it here for people who will be searching for the same issue and will find this thread:
class CloseMiddleware(Middleware):
def after_enqueue(self, broker, message, delay):
del broker.connection
Issues
GitHub issues are for bugs. If you have questions, please ask them on the mailing list.
Checklist
What OS are you using?
Debian 12
What version of Dramatiq are you using?
v1.12.0
What did you do?
I'm using dramatiq in a flask app with flask-melodramatiq. I got an OSError from POST request where dramatiq workers are supposed to run an async job.
What did you expect would happen?
The task is supposed to be processed successfully
What happened?
The below error was raised and tasks in queue were not processed but remained for hours. I had to restart flask to solve this issue.