PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
17.38k stars 1.64k forks source link

Schedules no longer working #15919

Open rmnvncnt opened 1 week ago

rmnvncnt commented 1 week ago

Bug summary

Since I upgraded Prefect from v2 to v3, schedules on existing deployments seems not to work anymore. I can create or activate a new schedule, either from the UI or from my prefect.yaml file, but no upcoming flows will be scheduled.

In the following example, I created a new hourly schedule for this deployment, yet no upcoming runs are scheduled :

Capture d’écran 2024-11-04 à 16 38 11

I plan on wiping my internal database and upgrade to Prefect 3.1 but is there something I can do before doing that?

Version info

Version:             3.0.1
API version:         0.8.4
Python version:      3.11.7
Git commit:          c6b2ffe1
Built:               Fri, Sep 6, 2024 10:05 AM
OS/Arch:             darwin/arm64
Server type:         server
Pydantic version:    2.9.1
Integrations:
  prefect-slack:     0.3.0
  prefect-aws:       0.5.0
  prefect-docker:    0.6.1
  prefect-dask:      0.3.1

Additional context

No response

desertaxle commented 1 week ago

Thanks for the bug report @rmnvncnt! Seems like there could be an issue with the scheduler. Do you see any error logs on your server?

rmnvncnt commented 1 week ago

Lots of them actually. Here is a sample :

| 2024-11-04 16:15:42.835 | 16:15:42.831 | ERROR   | prefect.server.services.scheduler - Unexpected error in: TimeoutError() |
| 2024-11-04 16:15:42.835 | Traceback (most recent call last): |
| 2024-11-04 16:15:42.835 | File "/usr/local/lib/python3.10/site-packages/prefect/server/services/loop_service.py", line 83, in start |
| 2024-11-04 16:15:42.835 | await self.run_once() |
| 2024-11-04 16:15:42.835 | File "/usr/local/lib/python3.10/site-packages/prefect/server/database/dependencies.py", line 125, in async_wrapper |
| 2024-11-04 16:15:42.835 | return await fn(*args, **kwargs) |
| 2024-11-04 16:15:42.835 | File "/usr/local/lib/python3.10/site-packages/prefect/server/services/scheduler.py", line 104, in run_once |
| 2024-11-04 16:15:42.835 | inserted_runs = await self._insert_scheduled_flow_runs( |
| 2024-11-04 16:15:42.835 | File "/usr/local/lib/python3.10/site-packages/prefect/server/database/dependencies.py", line 125, in async_wrapper |
| 2024-11-04 16:15:42.835 | return await fn(*args, **kwargs) |
| 2024-11-04 16:15:42.835 | File "/usr/local/lib/python3.10/site-packages/prefect/server/services/scheduler.py", line 291, in _insert_scheduled_flow_runs |
| 2024-11-04 16:15:42.835 | return await models.deployments._insert_scheduled_flow_runs( |
| 2024-11-04 16:15:42.835 | File "/usr/local/lib/python3.10/site-packages/prefect/server/database/dependencies.py", line 168, in async_wrapper |
| 2024-11-04 16:15:42.835 | return await func(db, *args, **kwargs)  # type: ignore |
| 2024-11-04 16:15:42.835 | File "/usr/local/lib/python3.10/site-packages/prefect/server/models/deployments.py", line 693, in _insert_scheduled_flow_runs |
| 2024-11-04 16:15:42.835 | inserted_flow_run_ids = (await session.execute(inserted_rows)).scalars().all() |
| 2024-11-04 16:15:42.835 | File "/usr/local/lib/python3.10/site-packages/sqlalchemy/ext/asyncio/session.py", line 461, in execute |
| 2024-11-04 16:15:42.835 | result = await greenlet_spawn( |
| 2024-11-04 16:15:42.835 | File "/usr/local/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 203, in greenlet_spawn |
| 2024-11-04 16:15:42.835 | result = context.switch(value) |
| 2024-11-04 16:15:42.835 | File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 2362, in execute |
| 2024-11-04 16:15:42.835 | return self._execute_internal( |
| 2024-11-04 16:15:42.835 | File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 2247, in _execute_internal |
| 2024-11-04 16:15:42.835 | result: Result[Any] = compile_state_cls.orm_execute_statement( |
| 2024-11-04 16:15:42.835 | File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/context.py", line 293, in orm_execute_statement |
| 2024-11-04 16:15:42.835 | result = conn.execute( |
| 2024-11-04 16:15:42.835 | File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1418, in execute |
| 2024-11-04 16:15:42.835 | return meth( |
| 2024-11-04 16:15:42.835 | File "/usr/local/lib/python3.10/site-packages/sqlalchemy/sql/elements.py", line 515, in _execute_on_connection |
| 2024-11-04 16:15:42.835 | return connection._execute_clauseelement( |
| 2024-11-04 16:15:42.835 | File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1640, in _execute_clauseelement |
| 2024-11-04 16:15:42.835 | ret = self._execute_context( |
| 2024-11-04 16:15:42.835 | File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1846, in _execute_context |
| 2024-11-04 16:15:42.835 | return self._exec_single_context( |
| 2024-11-04 16:15:42.835 | File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1986, in _exec_single_context |
| 2024-11-04 16:15:42.835 | self._handle_dbapi_exception( |
| 2024-11-04 16:15:42.835 | File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 2358, in _handle_dbapi_exception |
| 2024-11-04 16:15:42.835 | raise exc_info[1].with_traceback(exc_info[2]) |
| 2024-11-04 16:15:42.835 | File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1967, in _exec_single_context |
| 2024-11-04 16:15:42.835 | self.dialect.do_execute( |
| 2024-11-04 16:15:42.835 | File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/default.py", line 941, in do_execute |
| 2024-11-04 16:15:42.835 | cursor.execute(statement, parameters) |
| 2024-11-04 16:15:42.835 | File "/usr/local/lib/python3.10/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 572, in execute |
| 2024-11-04 16:15:42.835 | self._adapt_connection.await_( |
| 2024-11-04 16:15:42.835 | File "/usr/local/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 132, in await_only |
| 2024-11-04 16:15:42.835 | return current.parent.switch(awaitable)  # type: ignore[no-any-return,attr-defined] # noqa: E501 |
| 2024-11-04 16:15:42.835 | File "/usr/local/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 196, in greenlet_spawn |
| 2024-11-04 16:15:42.835 | value = await result |
| 2024-11-04 16:15:42.835 | File "/usr/local/lib/python3.10/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 550, in _prepare_and_execute |
| 2024-11-04 16:15:42.835 | self._handle_exception(error) |
| 2024-11-04 16:15:42.835 | File "/usr/local/lib/python3.10/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 501, in _handle_exception |
| 2024-11-04 16:15:42.835 | self._adapt_connection._handle_exception(error) |
| 2024-11-04 16:15:42.835 | File "/usr/local/lib/python3.10/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 786, in _handle_exception |
| 2024-11-04 16:15:42.835 | raise error |
| 2024-11-04 16:15:42.835 | File "/usr/local/lib/python3.10/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 538, in _prepare_and_execute |
| 2024-11-04 16:15:42.835 | self._rows = deque(await prepared_stmt.fetch(*parameters)) |
| 2024-11-04 16:15:42.835 | File "/usr/local/lib/python3.10/site-packages/asyncpg/prepared_stmt.py", line 176, in fetch |
| 2024-11-04 16:15:42.835 | data = await self.__bind_execute(args, 0, timeout) |
| 2024-11-04 16:15:42.835 | File "/usr/local/lib/python3.10/site-packages/asyncpg/prepared_stmt.py", line 241, in __bind_execute |
| 2024-11-04 16:15:42.835 | data, status, _ = await self.__do_execute( |
| 2024-11-04 16:15:42.835 | File "/usr/local/lib/python3.10/site-packages/asyncpg/prepared_stmt.py", line 230, in __do_execute |
| 2024-11-04 16:15:42.835 | return await executor(protocol) |
| 2024-11-04 16:15:42.835 | File "asyncpg/protocol/protocol.pyx", line 207, in bind_execute |
| 2024-11-04 16:15:42.835 | asyncio.exceptions.TimeoutError |
| 2024-11-04 16:15:12.799 | 16:15:12.795 | ERROR   | prefect.server.services.recentdeploymentsscheduler - Unexpected error in: TimeoutError() |
| 2024-11-04 16:15:12.799 | Traceback (most recent call last): |
| 2024-11-04 16:15:12.799 | File "/usr/local/lib/python3.10/site-packages/prefect/server/services/loop_service.py", line 83, in start |
| 2024-11-04 16:15:12.799 | await self.run_once() |
| 2024-11-04 16:15:12.799 | File "/usr/local/lib/python3.10/site-packages/prefect/server/database/dependencies.py", line 125, in async_wrapper |
| 2024-11-04 16:15:12.799 | return await fn(*args, **kwargs) |
| 2024-11-04 16:15:12.799 | File "/usr/local/lib/python3.10/site-packages/prefect/server/services/scheduler.py", line 104, in run_once |
| 2024-11-04 16:15:12.799 | inserted_runs = await self._insert_scheduled_flow_runs( |
| 2024-11-04 16:15:12.799 | File "/usr/local/lib/python3.10/site-packages/prefect/server/database/dependencies.py", line 125, in async_wrapper |
| 2024-11-04 16:15:12.799 | return await fn(*args, **kwargs) |
| 2024-11-04 16:15:12.799 | File "/usr/local/lib/python3.10/site-packages/prefect/server/services/scheduler.py", line 291, in _insert_scheduled_flow_runs |
| 2024-11-04 16:15:12.799 | return await models.deployments._insert_scheduled_flow_runs( |
| 2024-11-04 16:15:12.799 | File "/usr/local/lib/python3.10/site-packages/prefect/server/database/dependencies.py", line 168, in async_wrapper |
| 2024-11-04 16:15:12.799 | return await func(db, *args, **kwargs)  # type: ignore |
| 2024-11-04 16:15:12.799 | File "/usr/local/lib/python3.10/site-packages/prefect/server/models/deployments.py", line 693, in _insert_scheduled_flow_runs |
| 2024-11-04 16:15:12.799 | inserted_flow_run_ids = (await session.execute(inserted_rows)).scalars().all() |
| 2024-11-04 16:15:12.799 | File "/usr/local/lib/python3.10/site-packages/sqlalchemy/ext/asyncio/session.py", line 461, in execute |
| 2024-11-04 16:15:12.799 | result = await greenlet_spawn( |
| 2024-11-04 16:15:12.799 | File "/usr/local/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 203, in greenlet_spawn |
| 2024-11-04 16:15:12.799 | result = context.switch(value) |
| 2024-11-04 16:15:12.799 | File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 2362, in execute |
| 2024-11-04 16:15:12.799 | return self._execute_internal( |
| 2024-11-04 16:15:12.799 | File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 2247, in _execute_internal |
| 2024-11-04 16:15:12.799 | result: Result[Any] = compile_state_cls.orm_execute_statement( |
| 2024-11-04 16:15:12.799 | File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/context.py", line 293, in orm_execute_statement |
| 2024-11-04 16:15:12.799 | result = conn.execute( |
| 2024-11-04 16:15:12.799 | File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1418, in execute |
| 2024-11-04 16:15:12.799 | return meth( |
| 2024-11-04 16:15:12.799 | File "/usr/local/lib/python3.10/site-packages/sqlalchemy/sql/elements.py", line 515, in _execute_on_connection |
| 2024-11-04 16:15:12.799 | return connection._execute_clauseelement( |
| 2024-11-04 16:15:12.799 | File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1640, in _execute_clauseelement |
| 2024-11-04 16:15:12.799 | ret = self._execute_context( |
| 2024-11-04 16:15:12.799 | File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1846, in _execute_context |
| 2024-11-04 16:15:12.799 | return self._exec_single_context( |
| 2024-11-04 16:15:12.799 | File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1986, in _exec_single_context |
| 2024-11-04 16:15:12.799 | self._handle_dbapi_exception( |
| 2024-11-04 16:15:12.799 | File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 2358, in _handle_dbapi_exception |
| 2024-11-04 16:15:12.799 | raise exc_info[1].with_traceback(exc_info[2]) |
| 2024-11-04 16:15:12.799 | File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1967, in _exec_single_context |
| 2024-11-04 16:15:12.799 | self.dialect.do_execute( |
| 2024-11-04 16:15:12.799 | File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/default.py", line 941, in do_execute |
| 2024-11-04 16:15:12.799 | cursor.execute(statement, parameters) |
| 2024-11-04 16:15:12.799 | File "/usr/local/lib/python3.10/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 572, in execute |
| 2024-11-04 16:15:12.799 | self._adapt_connection.await_( |
| 2024-11-04 16:15:12.799 | File "/usr/local/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 132, in await_only |
| 2024-11-04 16:15:12.799 | return current.parent.switch(awaitable)  # type: ignore[no-any-return,attr-defined] # noqa: E501 |
| 2024-11-04 16:15:12.799 | File "/usr/local/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 196, in greenlet_spawn |
| 2024-11-04 16:15:12.799 | value = await result |
| 2024-11-04 16:15:12.799 | File "/usr/local/lib/python3.10/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 550, in _prepare_and_execute |
| 2024-11-04 16:15:12.799 | self._handle_exception(error) |
| 2024-11-04 16:15:12.799 | File "/usr/local/lib/python3.10/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 501, in _handle_exception |
| 2024-11-04 16:15:12.799 | self._adapt_connection._handle_exception(error) |
| 2024-11-04 16:15:12.799 | File "/usr/local/lib/python3.10/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 786, in _handle_exception |
| 2024-11-04 16:15:12.799 | raise error |
| 2024-11-04 16:15:12.799 | File "/usr/local/lib/python3.10/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 538, in _prepare_and_execute |
| 2024-11-04 16:15:12.799 | self._rows = deque(await prepared_stmt.fetch(*parameters)) |
| 2024-11-04 16:15:12.799 | File "/usr/local/lib/python3.10/site-packages/asyncpg/prepared_stmt.py", line 176, in fetch |
| 2024-11-04 16:15:12.799 | data = await self.__bind_execute(args, 0, timeout) |
| 2024-11-04 16:15:12.799 | File "/usr/local/lib/python3.10/site-packages/asyncpg/prepared_stmt.py", line 241, in __bind_execute |
| 2024-11-04 16:15:12.799 | data, status, _ = await self.__do_execute( |
| 2024-11-04 16:15:12.799 | File "/usr/local/lib/python3.10/site-packages/asyncpg/prepared_stmt.py", line 230, in __do_execute |
| 2024-11-04 16:15:12.799 | return await executor(protocol) |
| 2024-11-04 16:15:12.799 | File "asyncpg/protocol/protocol.pyx", line 207, in bind_execute |
| 2024-11-04 16:15:12.799 | asyncio.exceptions.TimeoutError |
| 2024-11-04 16:15:12.799 | 16:15:12.798 | WARNING | prefect.server.services.recentdeploymentsscheduler - RecentDeploymentsScheduler took 60.063469 seconds to run, which is longer than its loop interval of 5 seconds. |
Arthurhussey commented 1 week ago

I'm also seeing this in version 2.20.2

11:15:37.423 | WARNING | prefect.server.services.recentdeploymentsscheduler - RecentDeploymentsScheduler took 19.288165 seconds to run, which is longer than its loop interval of 5 seconds.
Arthurhussey commented 6 days ago

@rmnvncnt fyi this was happening to me because my postgresql db had grown to like 200GB - It seems to keep all state information of every run, for ever. I wrote this script to clear the db which has solved this (and other) issues

"""Flow to delete old entries from the database."""

import os

from prefect import flow, task, get_run_logger
from sqlalchemy import create_engine, text
from datetime import datetime, timedelta

@task
def dbconnect():
    """Connect to the database."""
    log = get_run_logger()
    log.info("Connecting to database")
    connection_url = os.getenv("PREFECT_API_DATABASE_CONNECTION_URL")
    if not connection_url:
        raise ValueError(
            "Variable PREFECT_API_DATABASE_CONNECTION_URL is not set."
        )
    engine = create_engine(connection_url)
    return engine

def delete_old_entries(engine, table_name, date_column):
    """Delete old entries from the database."""
    log = get_run_logger()
    log.info(f"Deleting entries older than 7 days from {table_name}")
    seven_days_ago = datetime.now() - timedelta(days=7)
    query = text(
        f"""
        DELETE FROM {table_name}
        WHERE {date_column} < :seven_days_ago
        """
    )
    with engine.connect() as connection:
        with connection.begin():  # Begin a transaction
            result = connection.execute(
                query, {"seven_days_ago": seven_days_ago.isoformat()}
            )
            deleted_count = result.rowcount
            log.info(f"Deleted {deleted_count} entries from {table_name}")

@task
def delete_old_flow_entries(engine):
    """Delete old flow entries from the database."""
    delete_old_entries(engine, "flow_run_state", "timestamp")
    delete_old_entries(engine, "flow_run", "created")

@task
def delete_old_task_entries(engine):
    """Delete old task entries from the database."""
    delete_old_entries(engine, "task_run_state", "timestamp")
    delete_old_entries(engine, "task_run", "created")

@flow(name="DBFlow")
def run():
    """Run the flow to delete old entries from the database."""
    log = get_run_logger()

    engine = dbconnect()  # Retrieve the engine from dbconnect task
    log.info("Deleting old task entries")
    delete_old_task_entries(engine)  # Pass the engine to q1 task
    log.info("Deleting old flow entries")
    delete_old_flow_entries(engine)  # Pass the engine to q1 task

if __name__ == "__main__":
    run()
rmnvncnt commented 5 days ago

Indeed, these tables are getting huge :

prefect=> \d+
                                                 List of relations
 Schema |              Name              | Type  |  Owner  | Persistence | Access method |    Size    | Description 
--------+--------------------------------+-------+---------+-------------+---------------+------------+-------------
 public | agent                          | table | prefect | permanent   | heap          | 8192 bytes | 
 public | alembic_version                | table | prefect | permanent   | heap          | 40 kB      | 
 public | artifact                       | table | prefect | permanent   | heap          | 71 MB      | 
 public | artifact_collection            | table | prefect | permanent   | heap          | 8192 bytes | 
 public | automation                     | table | prefect | permanent   | heap          | 8192 bytes | 
 public | automation_bucket              | table | prefect | permanent   | heap          | 8192 bytes | 
 public | automation_event_follower      | table | prefect | permanent   | heap          | 24 kB      | 
 public | automation_related_resource    | table | prefect | permanent   | heap          | 8192 bytes | 
 public | block_document                 | table | prefect | permanent   | heap          | 304 kB     | 
 public | block_document_reference       | table | prefect | permanent   | heap          | 16 kB      | 
 public | block_schema                   | table | prefect | permanent   | heap          | 576 kB     | 
 public | block_schema_reference         | table | prefect | permanent   | heap          | 16 kB      | 
 public | block_type                     | table | prefect | permanent   | heap          | 192 kB     | 
 public | composite_trigger_child_firing | table | prefect | permanent   | heap          | 8192 bytes | 
 public | concurrency_limit              | table | prefect | permanent   | heap          | 56 kB      | 
 public | concurrency_limit_v2           | table | prefect | permanent   | heap          | 48 kB      | 
 public | configuration                  | table | prefect | permanent   | heap          | 16 kB      | 
 public | csrf_token                     | table | prefect | permanent   | heap          | 48 kB      | 
 public | deployment                     | table | prefect | permanent   | heap          | 31 MB      | 
 public | deployment_schedule            | table | prefect | permanent   | heap          | 48 kB      | 
 public | event_resources                | table | prefect | permanent   | heap          | 3046 MB    | 
 public | events                         | table | prefect | permanent   | heap          | 807 MB     | 
 public | flow                           | table | prefect | permanent   | heap          | 48 kB      | 
 public | flow_run                       | table | prefect | permanent   | heap          | 22 MB      | 
 public | flow_run_input                 | table | prefect | permanent   | heap          | 8192 bytes | 
 public | flow_run_notification_policy   | table | prefect | permanent   | heap          | 8192 bytes | 
 public | flow_run_notification_queue    | table | prefect | permanent   | heap          | 0 bytes    | 
 public | flow_run_state                 | table | prefect | permanent   | heap          | 5348 MB    | 
 public | log                            | table | prefect | permanent   | heap          | 651 MB     | 
 public | saved_search                   | table | prefect | permanent   | heap          | 8192 bytes | 
 public | task_run                       | table | prefect | permanent   | heap          | 373 MB     | 
 public | task_run_state                 | table | prefect | permanent   | heap          | 6308 MB    | 
 public | task_run_state_cache           | table | prefect | permanent   | heap          | 16 kB      | 
 public | variable                       | table | prefect | permanent   | heap          | 8192 bytes | 
 public | work_pool                      | table | prefect | permanent   | heap          | 264 kB     | 
 public | work_queue                     | table | prefect | permanent   | heap          | 752 kB     | 
 public | worker                         | table | prefect | permanent   | heap          | 1456 kB    | 

@desertaxle Is there a way to avoid events, event_resources, task_run_state and flow_run_state to become that big over time?

@Arthurhussey Thanks for your solution, it seems that it worked and scheduled runs now appear in the queues!