spotify / luigi

Luigi is a Python module that helps you build complex pipelines of batch jobs. It handles dependency resolution, workflow management, visualization etc. It also comes with Hadoop support built in.
Apache License 2.0
17.79k stars 2.39k forks source link

Broken or too many connection to database when workers > 1 #3313

Open simonpicard opened 1 month ago

simonpicard commented 1 month ago

Hi team, many thanks for the great work.

I am using Luigi to schedule a workflow, where some tasks can be run in parallel. Hence, I would like to have multiple workers to parallelise computation, resulting in faster completion. However, I am experiencing issue with the connection to my database when enabling multiple workers, whereas there is no issues with a single workers. For reference, I am using SQLAlchemy 2.0.25 with a PostgreSQL database. The typical error arising is: (psycopg2.OperationalError) SSL SYSCALL error: EOF detected which typically happens when the connection to the database was closed unexpectedly, according to ref. As I understood that enabling multiple workers leads to creating/forking threads, I read that I must dispose the database connection as per the documentation. Hence, I used Luigi events callback to dispose the database connection at the start of each of the task:

    @luigi.Task.event_handler(luigi.Event.START)  # type: ignore
    def on_start(self) -> None:  # pragma: no cover
        engine.dispose(close=False)

Unfortunately, such approach leads to (psycopg2.OperationalError) FATAL: sorry, too many clients already error.

As per my understanding, I should execute this database engine disposal a single time at the creation of worker, not at the start of each task.

Given the above context, my questions are:

Many thanks for the help.

Similar question but for Django: https://github.com/spotify/luigi/issues/2782

simonpicard commented 4 weeks ago

Hi team, following up on my investigation, I increased the number of max connection to my PGSQL database and the above error about too many connections does not happen anymore but the following one arise: sqlalchemy.exc.DatabaseError: (psycopg2.DatabaseError) error with status PGRES_TUPLES_OK and no message from the libpq

The full traceback is the following:

2024-09-17 14:44:58 Traceback (most recent call last):
2024-09-17 14:44:58   File "/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1967, in _exec_single_context
2024-09-17 14:44:58     self.dialect.do_execute(
2024-09-17 14:44:58   File "/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/default.py", line 941, in do_execute
2024-09-17 14:44:58     cursor.execute(statement, parameters)
2024-09-17 14:44:58   File "/usr/local/lib/python3.11/site-packages/newrelic/hooks/database_psycopg2.py", line 61, in execute
2024-09-17 14:44:58     return super(CursorWrapper, self).execute(sql, parameters, *args, **kwargs)
2024-09-17 14:44:58            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-09-17 14:44:58   File "/usr/local/lib/python3.11/site-packages/newrelic/hooks/database_dbapi2.py", line 42, in execute
2024-09-17 14:44:58     return self.__wrapped__.execute(sql, parameters, *args, **kwargs)
2024-09-17 14:44:58            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-09-17 14:44:58 psycopg2.DatabaseError: error with status PGRES_TUPLES_OK and no message from the libpq
2024-09-17 14:44:58 
2024-09-17 14:44:58 The above exception was the direct cause of the following exception:
2024-09-17 14:44:58 
2024-09-17 14:44:58 Traceback (most recent call last):
2024-09-17 14:44:58   File "/usr/local/lib/python3.11/site-packages/luigi/worker.py", line 185, in run
2024-09-17 14:44:58     missing = [dep.task_id for dep in self.task.deps() if not self.check_complete(dep)]
2024-09-17 14:44:58                                       ^^^^^^^^^^^^^^^^
2024-09-17 14:44:58   File "/usr/local/lib/python3.11/site-packages/luigi/task.py", line 676, in deps
2024-09-17 14:44:58     return flatten(self._requires())
2024-09-17 14:44:58                    ^^^^^^^^^^^^^^^^
2024-09-17 14:44:58   File "/usr/local/lib/python3.11/site-packages/luigi/task.py", line 648, in _requires
2024-09-17 14:44:58     return flatten(self.requires())  # base impl
2024-09-17 14:44:58            ^^^^^^^^^^^^^^^^^^^^^^^^
2024-09-17 14:44:58   File "/usr/local/lib/python3.11/site-packages/luigi/task.py", line 987, in flatten
2024-09-17 14:44:58     for result in iterator:
2024-09-17 14:44:58   File "/code/src/tasks/utils/custom_tasks.py", line 149, in requires
2024-09-17 14:44:58     image_task = ImageTask.get_by_uuid(self.image_task_uuid)
2024-09-17 14:44:58                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-09-17 14:44:58   File "/code/src/server/models/base_model.py", line 57, in get_by_uuid
2024-09-17 14:44:58     result = db.execute(select(cls).filter(cls.uuid == record_uuid))
2024-09-17 14:44:58              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-09-17 14:44:58   File "/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 2362, in execute
2024-09-17 14:44:58     return self._execute_internal(
2024-09-17 14:44:58            ^^^^^^^^^^^^^^^^^^^^^^^
2024-09-17 14:44:58   File "/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 2247, in _execute_internal
2024-09-17 14:44:58     result: Result[Any] = compile_state_cls.orm_execute_statement(
2024-09-17 14:44:58                           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-09-17 14:44:58   File "/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/context.py", line 293, in orm_execute_statement
2024-09-17 14:44:58     result = conn.execute(
2024-09-17 14:44:58              ^^^^^^^^^^^^^
2024-09-17 14:44:58   File "/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1418, in execute
2024-09-17 14:44:58     return meth(
2024-09-17 14:44:58            ^^^^^
2024-09-17 14:44:58   File "/usr/local/lib/python3.11/site-packages/sqlalchemy/sql/elements.py", line 515, in _execute_on_connection
2024-09-17 14:44:58     return connection._execute_clauseelement(
2024-09-17 14:44:58            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-09-17 14:44:58   File "/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1640, in _execute_clauseelement
2024-09-17 14:44:58     ret = self._execute_context(
2024-09-17 14:44:58           ^^^^^^^^^^^^^^^^^^^^^^
2024-09-17 14:44:58   File "/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1846, in _execute_context
2024-09-17 14:44:58     return self._exec_single_context(
2024-09-17 14:44:58            ^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-09-17 14:44:58   File "/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1986, in _exec_single_context
2024-09-17 14:44:58     self._handle_dbapi_exception(
2024-09-17 14:44:58   File "/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 2355, in _handle_dbapi_exception
2024-09-17 14:44:58     raise sqlalchemy_exception.with_traceback(exc_info[2]) from e
2024-09-17 14:44:58   File "/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1967, in _exec_single_context
2024-09-17 14:44:58     self.dialect.do_execute(
2024-09-17 14:44:58   File "/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/default.py", line 941, in do_execute
2024-09-17 14:44:58     cursor.execute(statement, parameters)
2024-09-17 14:44:58   File "/usr/local/lib/python3.11/site-packages/newrelic/hooks/database_psycopg2.py", line 61, in execute
2024-09-17 14:44:58     return super(CursorWrapper, self).execute(sql, parameters, *args, **kwargs)
2024-09-17 14:44:58            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-09-17 14:44:58   File "/usr/local/lib/python3.11/site-packages/newrelic/hooks/database_dbapi2.py", line 42, in execute
2024-09-17 14:44:58     return self.__wrapped__.execute(sql, parameters, *args, **kwargs)
2024-09-17 14:44:58            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-09-17 14:44:58 sqlalchemy.exc.DatabaseError: (psycopg2.DatabaseError) error with status PGRES_TUPLES_OK and no message from the libpq

As you can see, the error happens during a custom require function which make call to the database. Hence I tried to trigger an event at the dependency discovery as such:

    @luigi.Task.event_handler(luigi.Event.DEPENDENCY_DISCOVERED)  # type: ignore
    def prepare_db_conn(self, *args: Any, **kwargs: Any) -> None:  # pragma: no cover
        engine.dispose(close=False)

It did not work, the same error happened from the same place. Note that I kept the on start engine disposal too.

Hence, it would be really great to have a proper way to call that engine disposal at worker startup, rather than using the task events, is there anyway to achieve it?

Many thanks.

921kiyo commented 3 weeks ago

Hi, I am facing the same issue. Could anyone from Luigi/Spotify have a look at it?