PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
15.82k stars 1.55k forks source link

asyncpg.exceptions._base.InterfaceError: connection is closed #13755

Open Ishankoradia opened 3 months ago

Ishankoradia commented 3 months ago

First check

Bug summary

Prefect keeps losing its connection to its own database. Error trace is given below. This usually happens after a deployment is run. I am using prefect version 2.13.5 with a remote postgres database.

Reproduction

I have no idea how to reproduce. On my local it works but on our production setup with a remote database & multiple pipelines running; it keeps throwing this error. 
We have 3 workers serving 2 queues in 1 work pool.

Error

Traceback (most recent call last):
11|prefect |   File "/home/ddp/prefect-proxy/venv/lib/python3.10/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 709, in _start_transaction
11|prefect |     self._transaction = self._connection.transaction(
11|prefect |   File "/home/ddp/prefect-proxy/venv/lib/python3.10/site-packages/asyncpg/connection.py", line 274, in transaction
11|prefect |     self._check_open()
11|prefect |   File "/home/ddp/prefect-proxy/venv/lib/python3.10/site-packages/asyncpg/connection.py", line 1399, in _check_open
11|prefect |     raise exceptions.InterfaceError('connection is closed')
11|prefect | asyncpg.exceptions._base.InterfaceError: connection is closed
11|prefect | 
11|prefect | The above exception was the direct cause of the following exception:
11|prefect | 
11|prefect | Traceback (most recent call last):
11|prefect |   File "/home/ddp/prefect-proxy/venv/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1900, in _execute_context
11|prefect |     self.dialect.do_execute(
11|prefect |   File "/home/ddp/prefect-proxy/venv/lib/python3.10/site-packages/sqlalchemy/engine/default.py", line 736, in do_execute
11|prefect |     cursor.execute(statement, parameters)
11|prefect |   File "/home/ddp/prefect-proxy/venv/lib/python3.10/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 479, in execute
11|prefect |     self._adapt_connection.await_(
11|prefect |   File "/home/ddp/prefect-proxy/venv/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 68, in await_only
11|prefect |     return current.driver.switch(awaitable)
11|prefect |   File "/home/ddp/prefect-proxy/venv/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 121, in greenlet_spawn
11|prefect |     value = await result
11|prefect |   File "/home/ddp/prefect-proxy/venv/lib/python3.10/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 408, in _prepare_and_execute
11|prefect |     await adapt_connection._start_transaction()
11|prefect |   File "/home/ddp/prefect-proxy/venv/lib/python3.10/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 716, in _start_transaction
11|prefect |     self._handle_exception(error)
11|prefect |   File "/home/ddp/prefect-proxy/venv/lib/python3.10/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 682, in _handle_exception
11|prefect |     raise translated_error from error
11|prefect | sqlalchemy.dialects.postgresql.asyncpg.AsyncAdapt_asyncpg_dbapi.InterfaceError: <class 'asyncpg.exceptions._base.InterfaceError'>: connection is closed
11|prefect | 
11|prefect | The above exception was the direct cause of the following exception:
11|prefect | 
11|prefect | Traceback (most recent call last):
11|prefect |   File "/home/ddp/prefect-proxy/venv/lib/python3.10/site-packages/starlette/middleware/errors.py", line 162, in __call__
11|prefect |     await self.app(scope, receive, _send)
11|prefect |   File "/home/ddp/prefect-proxy/venv/lib/python3.10/site-packages/starlette/middleware/gzip.py", line 24, in __call__
11|prefect |     await responder(scope, receive, send)
11|prefect |   File "/home/ddp/prefect-proxy/venv/lib/python3.10/site-packages/starlette/middleware/gzip.py", line 44, in __call__
11|prefect |     await self.app(scope, receive, self.send_with_gzip)
11|prefect |   File "/home/ddp/prefect-proxy/venv/lib/python3.10/site-packages/starlette/middleware/exceptions.py", line 79, in __call__
11|prefect |     raise exc
11|prefect |   File "/home/ddp/prefect-proxy/venv/lib/python3.10/site-packages/starlette/middleware/exceptions.py", line 68, in __call__
11|prefect |     await self.app(scope, receive, sender)
11|prefect |   File "/home/ddp/prefect-proxy/venv/lib/python3.10/site-packages/prefect/_vendor/fastapi/middleware/asyncexitstack.py", line 20, in __call__
11|prefect |     raise e
11|prefect |   File "/home/ddp/prefect-proxy/venv/lib/python3.10/site-packages/prefect/_vendor/fastapi/middleware/asyncexitstack.py", line 17, in __call__
11|prefect |     await self.app(scope, receive, send)
11|prefect |   File "/home/ddp/prefect-proxy/venv/lib/python3.10/site-packages/starlette/routing.py", line 718, in __call__
11|prefect |     await route.handle(scope, receive, send)
11|prefect |   File "/home/ddp/prefect-proxy/venv/lib/python3.10/site-packages/starlette/routing.py", line 276, in handle
11|prefect |     await self.app(scope, receive, send)
11|prefect |   File "/home/ddp/prefect-proxy/venv/lib/python3.10/site-packages/starlette/routing.py", line 66, in app
11|prefect |     response = await func(request)
11|prefect |   File "/home/ddp/prefect-proxy/venv/lib/python3.10/site-packages/prefect/server/utilities/server.py", line 101, in handle_response_scoped_depends
11|prefect |     response = await default_handler(request)
11|prefect |   File "/home/ddp/prefect-proxy/venv/lib/python3.10/site-packages/prefect/_vendor/fastapi/routing.py", line 251, in app
11|prefect |     raw_response = await run_endpoint_function(
11|prefect |   File "/home/ddp/prefect-proxy/venv/lib/python3.10/site-packages/prefect/_vendor/fastapi/routing.py", line 177, in run_endpoint_function
11|prefect |     return await dependant.call(**values)
11|prefect |   File "/home/ddp/prefect-proxy/venv/lib/python3.10/site-packages/prefect/server/api/logs.py", line 27, in create_logs
11|prefect |     await models.logs.create_logs(session=session, logs=batch)
11|prefect |   File "/home/ddp/prefect-proxy/venv/lib/python3.10/site-packages/prefect/server/database/dependencies.py", line 119, in async_wrapper
11|prefect |     return await fn(*args, **kwargs)
11|prefect |   File "/home/ddp/prefect-proxy/venv/lib/python3.10/site-packages/prefect/server/models/logs.py", line 45, in create_logs
11|prefect |     await session.execute(log_insert.values([log.dict() for log in logs]))
11|prefect |   File "/home/ddp/prefect-proxy/venv/lib/python3.10/site-packages/sqlalchemy/ext/asyncio/session.py", line 214, in execute
11|prefect |     result = await greenlet_spawn(
11|prefect |   File "/home/ddp/prefect-proxy/venv/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 126, in greenlet_spawn
11|prefect |     result = context.throw(*sys.exc_info())
11|prefect |   File "/home/ddp/prefect-proxy/venv/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 1714, in execute
11|prefect |     result = conn._execute_20(statement, params or {}, execution_options)
11|prefect |   File "/home/ddp/prefect-proxy/venv/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1705, in _execute_20
11|prefect |     return meth(self, args_10style, kwargs_10style, execution_options)
11|prefect |   File "/home/ddp/prefect-proxy/venv/lib/python3.10/site-packages/sqlalchemy/sql/elements.py", line 334, in _execute_on_connection
11|prefect |     return connection._execute_clauseelement(
11|prefect |   File "/home/ddp/prefect-proxy/venv/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1572, in _execute_clauseelement
11|prefect |     ret = self._execute_context(
11|prefect |   File "/home/ddp/prefect-proxy/venv/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1943, in _execute_context
11|prefect |     self._handle_dbapi_exception(
11|prefect |   File "/home/ddp/prefect-proxy/venv/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 2124, in _handle_dbapi_exception
11|prefect |     util.raise_(
11|prefect |   File "/home/ddp/prefect-proxy/venv/lib/python3.10/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
11|prefect |     raise exception
11|prefect |   File "/home/ddp/prefect-proxy/venv/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1900, in _execute_context
11|prefect |     self.dialect.do_execute(
11|prefect |   File "/home/ddp/prefect-proxy/venv/lib/python3.10/site-packages/sqlalchemy/engine/default.py", line 736, in do_execute
11|prefect |     cursor.execute(statement, parameters)
11|prefect |   File "/home/ddp/prefect-proxy/venv/lib/python3.10/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 479, in execute
11|prefect |     self._adapt_connection.await_(
11|prefect |   File "/home/ddp/prefect-proxy/venv/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 68, in await_only
11|prefect |     return current.driver.switch(awaitable)
11|prefect |   File "/home/ddp/prefect-proxy/venv/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 121, in greenlet_spawn
11|prefect |     value = await result
11|prefect |   File "/home/ddp/prefect-proxy/venv/lib/python3.10/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 408, in _prepare_and_execute
11|prefect |     await adapt_connection._start_transaction()
11|prefect |   File "/home/ddp/prefect-proxy/venv/lib/python3.10/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 716, in _start_transaction
11|prefect |     self._handle_exception(error)
11|prefect |   File "/home/ddp/prefect-proxy/venv/lib/python3.10/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 682, in _handle_exception
11|prefect |     raise translated_error from error
11|prefect | sqlalchemy.exc.InterfaceError: (sqlalchemy.dialects.postgresql.asyncpg.InterfaceError) <class 'asyncpg.exceptions._base.InterfaceError'>: connection is closed
11|prefect | [SQL: INSERT INTO log (id, created, updated, name, level, flow_run_id, task_run_id, message, timestamp) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)]
11|prefect | [parameters: (UUID('18ec0dce-76aa-4abb-98f3-e00e1d3ae76b'), DateTime(2024, 6, 3, 16, 55, 42, 869041, tzinfo=Timezone('UTC')), DateTime(2024, 6, 3, 16, 55, 42, 869063, tzinfo=Timezone('UTC')), 'prefect.flow_runs.worker', 20, UUID('00351d7e-5bb7-468c-adce-4017c90ace31'), None, 'Process 606166 exited cleanly.', DateTime(2024, 6, 3, 16, 55, 41, 993709, tzinfo=Timezone('+00:00')))]
11|prefect | (Background on this error at: https://sqlalche.me/e/14/rvf5)

Versions

2.13.5

Additional context

Any guidance on how to approach this would be appreciated.

tojimahammatov commented 2 weeks ago

I am not aware of prefect but this issue, I believe, do not belong to it. I have seen this issue once when there was no resource left like CPU and RAM to use for a process. Otherwise connection is never closed. So I suggest to monitor your CPU and RAM usage. It must be loaded/used fully without leaving anything for postgres connection to run.

Ishankoradia commented 2 weeks ago

Thanks @tojimahammatov , it was indeed the cleanup services that were taking so much time & also our CPU was very high.