PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
15.94k stars 1.56k forks source link

Error when running prefect under heavy load: SAVEPOINT can only be used in transaction blocks #14728

Open obendidi opened 2 months ago

obendidi commented 2 months ago

Bug summary

Hello everyone,

I'm reporting a bug that I've noticed in our prefect server that only happens during relatively higher loads (even though the container is only at a max of 40% CPU and 30% RAM)

During high load the UI is empty and on the network tab, API calls get a response {"exception_message":"Service Unavailable"} When checking the logs of the containers, I see errors like these below:

(sqlalchemy.dialects.postgresql.asyncpg.Error) <class 'asyncpg.exceptions.NoActiveSQLTransactionError'>: SAVEPOINT can only be used in transaction blocks
[SQL: WITH queued_notifications_ids AS 
(SELECT flow_run_notification_queue.id AS id 
FROM flow_run_notification_queue ORDER BY flow_run_notification_queue.updated 
 LIMIT $1::INTEGER FOR UPDATE SKIP LOCKED), 
queued_notifications AS 
(DELETE FROM flow_run_notification_queue WHERE flow_run_notification_queue.id IN (SELECT queued_notifications_ids.id 
FROM queued_notifications_ids) RETURNING flow_run_notification_queue.id, flow_run_notification_queue.flow_run_notification_policy_id, flow_run_notification_queue.flow_run_state_id)
 SELECT queued_notifications.id AS queue_id, flow_run_notification_policy.id AS flow_run_notification_policy_id, flow_run_notification_policy.message_template AS flow_run_notification_policy_message_template, flow_run_notification_policy.block_document_id, flow.id AS flow_id, flow.name AS flow_name, flow_run.id AS flow_run_id, flow_run.name AS flow_run_name, flow_run.parameters AS flow_run_parameters, flow_run_state.type AS flow_run_state_type, flow_run_state.name AS flow_run_state_name, flow_run_state.timestamp AS flow_run_state_timestamp, flow_run_state.message AS flow_run_state_message 
FROM queued_notifications JOIN flow_run_notification_policy ON queued_notifications.flow_run_notification_policy_id = flow_run_notification_policy.id JOIN flow_run_state ON queued_notifications.flow_run_state_id = flow_run_state.id JOIN flow_run ON flow_run_state.flow_run_id = flow_run.id JOIN flow ON flow_run.flow_id = flow.id]
[parameters: (1,)]
(Background on this error at: https://sqlalche.me/e/20/dbapi)

  File "/usr/local/lib/python3.11/site-packages/prefect/server/services/loop_service.py", line 79, in start
    await self.run_once()
  File "/usr/local/lib/python3.11/site-packages/prefect/server/database/dependencies.py", line 125, in async_wrapper
    return await fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/server/services/flow_run_notifications.py", line 38, in run_once
    notifications = await db.get_flow_run_notifications_from_queue(
                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/server/database/interface.py", line 405, in get_flow_run_notifications_from_queue
    return await self.queries.get_flow_run_notifications_from_queue(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/server/database/query_components.py", line 763, in get_flow_run_notifications_from_queue
    result = await session.execute(notification_details_stmt)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/session.py", line 461, in execute
    result = await greenlet_spawn(
             ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 201, in greenlet_spawn
    result = context.throw(*sys.exc_info())
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 2306, in execute
    return self._execute_internal(
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 2191, in _execute_internal
    result: Result[Any] = compile_state_cls.orm_execute_statement(
                          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/context.py", line 293, in orm_execute_statement
    result = conn.execute(
             ^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1422, in execute
    return meth(
           ^^^^^
  File "/usr/local/lib/python3.11/site-packages/sqlalchemy/sql/elements.py", line 514, in _execute_on_connection
    return connection._execute_clauseelement(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1644, in _execute_clauseelement
    ret = self._execute_context(
          ^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1850, in _execute_context
    return self._exec_single_context(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1990, in _exec_single_context
    self._handle_dbapi_exception(
  File "/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 2357, in _handle_dbapi_exception
    raise sqlalchemy_exception.with_traceback(exc_info[2]) from e
  File "/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1971, in _exec_single_context
    self.dialect.do_execute(
  File "/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/default.py", line 919, in do_execute
    cursor.execute(statement, parameters)
  File "/usr/local/lib/python3.11/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 572, in execute
    self._adapt_connection.await_(
  File "/usr/local/lib/python3.11/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 131, in await_only
    return current.driver.switch(awaitable)  # type: ignore[no-any-return]
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 196, in greenlet_spawn
    value = await result
            ^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 508, in _prepare_and_execute
    await adapt_connection._start_transaction()
  File "/usr/local/lib/python3.11/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 842, in _start_transaction
    self._handle_exception(error)
  File "/usr/local/lib/python3.11/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 789, in _handle_exception
    raise translated_error from error

or

(sqlalchemy.dialects.postgresql.asyncpg.Error) <class 'asyncpg.exceptions.NoActiveSQLTransactionError'>: SAVEPOINT can only be used in transaction blocks
[SQL: SELECT flow_run.id, flow_run.next_scheduled_start_time 
FROM flow_run 
WHERE flow_run.next_scheduled_start_time <= $1::TIMESTAMP WITH TIME ZONE AND flow_run.state_type = $2::state_type AND flow_run.state_name = $3::VARCHAR 
 LIMIT $4::INTEGER]
[parameters: (DateTime(2024, 7, 24, 9, 55, 40, 279971, tzinfo=Timezone('UTC')), 'SCHEDULED', 'Scheduled', 400)]
(Background on this error at: https://sqlalche.me/e/20/dbapi)

  File "/usr/local/lib/python3.11/site-packages/prefect/server/services/loop_service.py", line 79, in start
    await self.run_once()
  File "/usr/local/lib/python3.11/site-packages/prefect/server/database/dependencies.py", line 125, in async_wrapper
    return await fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/server/services/late_runs.py", line 68, in run_once
    result = await session.execute(query)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/session.py", line 461, in execute
    result = await greenlet_spawn(
             ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 201, in greenlet_spawn
    result = context.throw(*sys.exc_info())
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 2306, in execute
    return self._execute_internal(
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 2191, in _execute_internal
    result: Result[Any] = compile_state_cls.orm_execute_statement(
                          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/context.py", line 293, in orm_execute_statement
    result = conn.execute(
             ^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1422, in execute
    return meth(
           ^^^^^
  File "/usr/local/lib/python3.11/site-packages/sqlalchemy/sql/elements.py", line 514, in _execute_on_connection
    return connection._execute_clauseelement(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1644, in _execute_clauseelement
    ret = self._execute_context(
          ^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1850, in _execute_context
    return self._exec_single_context(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1990, in _exec_single_context
    self._handle_dbapi_exception(
  File "/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 2357, in _handle_dbapi_exception
    raise sqlalchemy_exception.with_traceback(exc_info[2]) from e
  File "/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1971, in _exec_single_context
    self.dialect.do_execute(
  File "/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/default.py", line 919, in do_execute
    cursor.execute(statement, parameters)
  File "/usr/local/lib/python3.11/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 572, in execute
    self._adapt_connection.await_(
  File "/usr/local/lib/python3.11/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 131, in await_only
    return current.driver.switch(awaitable)  # type: ignore[no-any-return]
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 196, in greenlet_spawn
    value = await result
            ^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 508, in _prepare_and_execute
    await adapt_connection._start_transaction()
  File "/usr/local/lib/python3.11/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 842, in _start_transaction
    self._handle_exception(error)
  File "/usr/local/lib/python3.11/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 789, in _handle_exception
    raise translated_error from error

It probably fails on other queries too.

Any clue on what I might be doing wrong here, or how I can mitigate this kind of errors ?

Version info (prefect version output)

Version:             2.18.3
API version:         0.8.4
Python version:      3.11.2
Git commit:          c449aee8
Built:               Thu, May 2, 2024 5:47 PM
OS/Arch:             linux/x86_64
Profile:             default
Server type:         server

Additional context

image image image

tjgalvin commented 2 months ago

Are you using a postgres database backend? If so how have you configured that, and do you see any errors in its logs?

And how are you starting your prefect server?

I have had strange errors like this when my VM hosting both services is under heavy load from my flows. I have started

When starting my postgres server in an apptainer I used

APPTAINERENV_POSTGRES_PASSWORD="$POSTGRES_PASS" APPTAINERENV_POSTGRES_DB="$POSTGRES_DB" APPTAINERENV_PGDATA="$POSTGRES_SCRATCH/pgdata" \
        apptainer run --cleanenv --bind "$POSTGRES_SCRATCH":/var postgres_latest.sif -c max_connections=2096 -c shared_buffers=8000MB -c min_wal_size=8096 -c max_wal_size=32384 -c synchronous_commit=off -c wal_buffers=16MB

Setting -c synchronous_commit=off was by far the biggest improvement to my prefect servers stability.

I also found that settings these in the environment that would run the prefect server also helped

export WEB_CONCURRENCY=32
export PREFECT_SQLALCHEMY_POOL_SIZE=30
export PREFECT_SQLALCHEMY_MAX_OVERFLOW=40
export PREFECT_API_DATABASE_TIMEOUT=60
export PREFECT_API_DATABASE_CONNECTION_TIMEOUT=60
#export PREFECT_SERVER_API_KEEPALIVE_TIMEOUT=15
obendidi commented 2 months ago

Thanks for the help,

I'm using a postgres database (RDS with aurora serverless on aws) (PG version 14 to be exact). And i'm using the default pg 14 configuration.

Setting -c synchronous_commit=off was by far the biggest improvement to my prefect servers stability.

From what I've read, aws doesn't recommend disabling that (here), as that could potentially lead to losing transactions. have you noticed that when running it with synchronous_commit=off ?

Thanks for the env vars, I'm doing more or less the same:

ENV PREFECT_LOGGING_HANDLERS_CONSOLE_FORMATTER=json \
    PREFECT_LOGGING_EXTRA_LOGGERS=fcv \
    PREFECT_SQLALCHEMY_POOL_SIZE=100 \
    PREFECT_SQLALCHEMY_MAX_OVERFLOW=100 \
    PREFECT_API_DATABASE_TIMEOUT=60 \
    PREFECT_API_DATABASE_CONNECTION_TIMEOUT=60 \
    PREFECT_API_REQUEST_TIMEOUT=120