PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
15.83k stars 1.55k forks source link

Scheduler timeout and no new scheduled jobs with big flow_run_state/task_run_state tables #15045

Open kgorszczyk opened 3 weeks ago

kgorszczyk commented 3 weeks ago

Bug summary

First check

Bug summary About two months ago, I successfully completed the migration to Prefect 2. Initially, all flow runs were logged very verbosely to ensure thorough error analysis. Afterward, all flows ran daily with mostly normal logging.

Two days ago, I noticed that after a new deployment, scheduled jobs started disappearing one after the other and were not being rescheduled despite an active schedule.

In the logs of the Prefect server container, I found the following traceback:

08:06:22.202 | ERROR | prefect.server.services.scheduler - Unexpected error in: TimeoutError() Traceback (most recent call last): File "/usr/local/lib/python3.10/site-packages/prefect/server/services/loop_service.py", line 79, in start await self.run_once() File "/usr/local/lib/python3.10/site-packages/prefect/server/database/dependencies.py", line 125, in async_wrapper return await fn(*args, kwargs) File "/usr/local/lib/python3.10/site-packages/prefect/server/services/scheduler.py", line 103, in run_once inserted_runs = await self._insert_scheduled_flow_runs( File "/usr/local/lib/python3.10/site-packages/prefect/server/database/dependencies.py", line 125, in async_wrapper return await fn(*args, *kwargs) File "/usr/local/lib/python3.10/site-packages/prefect/server/services/scheduler.py", line 290, in _insert_scheduled_flow_runs return await models.deployments._insert_scheduled_flow_runs( File "/usr/local/lib/python3.10/site-packages/prefect/server/database/dependencies.py", line 168, in async_wrapper return await func(db, args, kwargs) # type: ignore File "/usr/local/lib/python3.10/site-packages/prefect/server/models/deployments.py", line 687, in _insert_scheduled_flow_runs await session.execute( File "/usr/local/lib/python3.10/site-packages/sqlalchemy/ext/asyncio/session.py", line 461, in execute result = await greenlet_spawn( File "/usr/local/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 203, in greenlet_spawn result = context.switch(value) File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 2306, in execute return self._execute_internal( File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 2191, in _execute_internal result: Result[Any] = compile_state_cls.orm_execute_statement( File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/bulk_persistence.py", line 1262, in orm_execute_statement result = _bulk_insert( File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/bulk_persistence.py", line 197, in _bulk_insert result = persistence._emit_insert_statements( File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/persistence.py", line 1048, in _emit_insert_statements result = connection.execute( File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1422, in execute return meth( File "/usr/local/lib/python3.10/site-packages/sqlalchemy/sql/elements.py", line 514, in _execute_on_connection return connection._execute_clauseelement( File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1644, in _execute_clauseelement ret = self._execute_context( File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1850, in _execute_context return self._exec_single_context( File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1990, in _exec_single_context self._handle_dbapi_exception( File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 2360, in _handle_dbapi_exception raise exc_info[1].with_traceback(exc_info[2]) File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1940, in _exec_single_context self.dialect.do_executemany( File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/default.py", line 916, in do_executemany cursor.executemany(statement, parameters) File "/usr/local/lib/python3.10/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 577, in executemany return self._adaptconnection.await( File "/usr/local/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 131, in await_only return current.driver.switch(awaitable) # type: ignore[no-any-return] File "/usr/local/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 196, in greenlet_spawn value = await result File "/usr/local/lib/python3.10/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 569, in _executemany self._handle_exception(error) File "/usr/local/lib/python3.10/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 501, in _handle_exception self._adapt_connection._handle_exception(error) File "/usr/local/lib/python3.10/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 791, in _handle_exception raise error File "/usr/local/lib/python3.10/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 565, in _executemany return await self._connection.executemany( File "/usr/local/lib/python3.10/site-packages/asyncpg/connection.py", line 391, in executemany return await self._executemany(command, args, timeout) File "/usr/local/lib/python3.10/site-packages/asyncpg/connection.py", line 1911, in executemany result, = await self._do_execute(query, executor, timeout) File "/usr/local/lib/python3.10/site-packages/asyncpg/connection.py", line 1948, in _do_execute result = await executor(stmt, timeout) File "asyncpg/protocol/protocol.pyx", line 267, in bind_execute_many asyncio.exceptions.TimeoutError

After reviewing the Postgres database, I discovered that the "task_run_state" and "flow_run_state" tables had grown to over 40GB in size.

As a test, I truncated both tables, and the scheduler was able to plan new jobs again and ran without any timeouts.

I suspect that due to the strictly scheduled loop interval (5 seconds), Postgres on slower/heavily loaded systems may not be able to deliver results in time, causing asyncio to fall into a timeout. As a result, no new jobs are being scheduled.

Version info (prefect version output)

Version:             2.20.2
API version:         0.8.4
Python version:      3.10.14
Git commit:          51c3f290
Built:               Wed, Aug 14, 2024 11:27 AM
OS/Arch:             linux/x86_64
Profile:             default
Server type:         server

Additional context

Interestingly, the runtime of my MAIN_EXECUTION flow was reduced from nearly 3 hours to 1 hour after truncating the tables.

cicdw commented 3 weeks ago

Interestingly, the runtime of my MAIN_EXECUTION flow was reduced from nearly 3 hours to 1 hour after truncating the tables.

The good news for this is that 3.0 avoids a lot of this database traffic during execution, so flow run execution will be less impacted by DB performance on versions 3.0+.

That being said, this is something we should be better about alerting on from within the server somehow - we plan to focus on performance tuning in the next few months I will be sure to include this issue in scope for that effort.