PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
15.78k stars 1.54k forks source link

RayTaskRunner gives sqlite3.OperationalError: database is locked #7277

Closed robbert-harms closed 1 year ago

robbert-harms commented 1 year ago

First check

Bug summary

With the latest Prefect v2 library, if I run a flow with RayTaskRunner, I get the following error:

sqlite3.OperationalError: database is locked

after that the error:

sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) database is locked
(begin_task_run pid=122393) [SQL: INSERT INTO task_run_state (id, created, updated, type, timestamp, name, message, state_details, data, task_run_id) VALUES (:id, :created, :updated, :type, :timestamp, :name, :message, :state_details, :data, :task_run_id)]
(begin_task_run pid=122393) [parameters: {'id': '62625eab-0657-49d1-8e11-1db2fb37dc7a', 'created': '2022-10-21 09:07:22.594887', 'updated': '2022-10-21 09:07:22.594895', 'type': 'RUNNING', 'timestamp': '2022-10-21 09:07:22.567893', 'name': 'Running', 'message': None, 'state_details': '{"flow_run_id": "57a7a48d-95d4-4fdd-ba3d-e48fe0a897b1", "task_run_id": "512989fc-6e87-49e0-9c86-84bfdf4328d2", "child_flow_run_id": null, "scheduled_time": null, "cache_key": null, "cache_expiration": null, "untrackable_result": false}', 'data': 'null', 'task_run_id': '512989fc-6e87-49e0-9c86-84bfdf4328d2'}]

followed by:

sqlalchemy.exc.InvalidRequestError: Can't operate on closed transaction inside context manager. Please complete the context manager before emitting further commands.

ending with:

sqlalchemy.exc.InvalidRequestError: Can't operate on closed transaction inside context manager. Please complete the context manager before emitting further commands.

It doesn't happen with the DaskTaskRunner or SequentialTaskRunner. I don't know if I should post this bug here or in the prefect-ray github.

Reproduction

I am sorry I didn't have the time to make a minimal version for this bug. If needed I can try to make one. The bug happened in a relatively simple pipeline with two functions.

Error

[2022-10-21 11:07:15,343] [INFO] [prefect.engine] - Created flow run 'glorious-wolverine' for flow 'feature-extraction'
[2022-10-21 11:07:15,648] [INFO] [prefect.flow_runs] - Created task run 'load_ground_truth_data_931784-ae9c4093-0' for task 'load_ground_truth_data_931784'
[2022-10-21 11:07:15,648] [INFO] [prefect.flow_runs] - Executing 'load_ground_truth_data_931784-ae9c4093-0' immediately...
[2022-10-21 11:07:15,692] [INFO] [prefect.task_runs] - Returning cached value.
[2022-10-21 11:07:15,742] [INFO] [prefect.task_runs] - Finished in state Completed()
[2022-10-21 11:07:15,785] [INFO] [prefect.flow_runs] - Created task run 'extract_features_83998-b9a504b6-0' for task 'extract_features_83998'
[2022-10-21 11:07:15,785] [INFO] [prefect.flow_runs] - Executing 'extract_features_83998-b9a504b6-0' immediately...
[2022-10-21 11:07:15,830] [INFO] [prefect.task_runs] - Returning cached value.
[2022-10-21 11:07:16,058] [INFO] [prefect.task_runs] - Finished in state Completed()
[2022-10-21 11:07:16,091] [INFO] [prefect.flow_runs] - Finished in state Completed()
[2022-10-21 11:07:17,059] [INFO] [prefect.engine] - Created flow run 'impetuous-boobook' for flow 'train-classifiers'
[2022-10-21 11:07:17,059] [INFO] [prefect.task_runner.ray] - Creating a local Ray instance
[2022-10-21 11:07:19,790] [INFO] [prefect.task_runner.ray] - Using Ray cluster with 1 nodes.
[2022-10-21 11:07:19,790] [INFO] [prefect.task_runner.ray] - The Ray UI is available at 127.0.0.1:8265
[2022-10-21 11:07:19,995] [INFO] [prefect.flow_runs] - Created task run 'train_classification_model_43718-1bf81be2-0' for task 'train_classification_model_43718'
[2022-10-21 11:07:20,013] [INFO] [prefect.flow_runs] - Submitted task run 'train_classification_model_43718-1bf81be2-0' for execution.
[2022-10-21 11:07:20,031] [INFO] [prefect.flow_runs] - Created task run 'train_classification_model_864410-1bf81be2-1' for task 'train_classification_model_864410'
[2022-10-21 11:07:20,044] [INFO] [prefect.flow_runs] - Submitted task run 'train_classification_model_864410-1bf81be2-1' for execution.
[2022-10-21 11:07:20,062] [INFO] [prefect.flow_runs] - Created task run 'write_classification_output_456697-272977eb-0' for task 'write_classification_output_456697'
(begin_task_run pid=122393) [2022-10-21 11:08:22,633] [ERROR] [prefect.orion] - Encountered exception in request:
(begin_task_run pid=122393) Traceback (most recent call last):
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1900, in _execute_context
(begin_task_run pid=122393)     self.dialect.do_execute(
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 736, in do_execute
(begin_task_run pid=122393)     cursor.execute(statement, parameters)
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 108, in execute
(begin_task_run pid=122393)     self._adapt_connection._handle_exception(error)
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 236, in _handle_exception
(begin_task_run pid=122393)     raise error
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 90, in execute
(begin_task_run pid=122393)     self.await_(_cursor.execute(operation, parameters))
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 68, in await_only
(begin_task_run pid=122393)     return current.driver.switch(awaitable)
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 121, in greenlet_spawn
(begin_task_run pid=122393)     value = await result
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/aiosqlite/cursor.py", line 37, in execute
(begin_task_run pid=122393)     await self._execute(self._cursor.execute, sql, parameters)
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/aiosqlite/cursor.py", line 31, in _execute
(begin_task_run pid=122393)     return await self._conn._execute(fn, *args, **kwargs)
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/aiosqlite/core.py", line 129, in _execute
(begin_task_run pid=122393)     return await future
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/aiosqlite/core.py", line 102, in run
(begin_task_run pid=122393)     result = function()
(begin_task_run pid=122393) sqlite3.OperationalError: database is locked
(begin_task_run pid=122393) 
(begin_task_run pid=122393) The above exception was the direct cause of the following exception:
(begin_task_run pid=122393) 
(begin_task_run pid=122393) Traceback (most recent call last):
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/prefect/orion/orchestration/rules.py", line 371, in validate_proposed_state
(begin_task_run pid=122393)     await self._validate_proposed_state()
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/prefect/orion/database/dependencies.py", line 117, in async_wrapper
(begin_task_run pid=122393)     return await fn(*args, **kwargs)
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/prefect/orion/orchestration/rules.py", line 412, in _validate_proposed_state
(begin_task_run pid=122393)     await self.session.flush()
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/sqlalchemy/ext/asyncio/session.py", line 405, in flush
(begin_task_run pid=122393)     await greenlet_spawn(self.sync_session.flush, objects=objects)
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 128, in greenlet_spawn
(begin_task_run pid=122393)     result = context.switch(value)
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 3429, in flush
(begin_task_run pid=122393)     self._flush(objects)
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 3569, in _flush
(begin_task_run pid=122393)     transaction.rollback(_capture_exception=True)
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
(begin_task_run pid=122393)     compat.raise_(
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/sqlalchemy/util/compat.py", line 208, in raise_
(begin_task_run pid=122393)     raise exception
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 3529, in _flush
(begin_task_run pid=122393)     flush_context.execute()
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/sqlalchemy/orm/unitofwork.py", line 453, in execute
(begin_task_run pid=122393)     n.execute_aggregate(self, set_)
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/sqlalchemy/orm/unitofwork.py", line 747, in execute_aggregate
(begin_task_run pid=122393)     persistence.save_obj(
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/sqlalchemy/orm/persistence.py", line 245, in save_obj
(begin_task_run pid=122393)     _emit_insert_statements(
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/sqlalchemy/orm/persistence.py", line 1097, in _emit_insert_statements
(begin_task_run pid=122393)     c = connection._execute_20(
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1705, in _execute_20
(begin_task_run pid=122393)     return meth(self, args_10style, kwargs_10style, execution_options)
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/sqlalchemy/sql/elements.py", line 333, in _execute_on_connection
(begin_task_run pid=122393)     return connection._execute_clauseelement(
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1572, in _execute_clauseelement
(begin_task_run pid=122393)     ret = self._execute_context(
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1943, in _execute_context
(begin_task_run pid=122393)     self._handle_dbapi_exception(
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 2124, in _handle_dbapi_exception
(begin_task_run pid=122393)     util.raise_(
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/sqlalchemy/util/compat.py", line 208, in raise_
(begin_task_run pid=122393)     raise exception
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1900, in _execute_context
(begin_task_run pid=122393)     self.dialect.do_execute(
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 736, in do_execute
(begin_task_run pid=122393)     cursor.execute(statement, parameters)
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 108, in execute
(begin_task_run pid=122393)     self._adapt_connection._handle_exception(error)
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 236, in _handle_exception
(begin_task_run pid=122393)     raise error
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 90, in execute
(begin_task_run pid=122393)     self.await_(_cursor.execute(operation, parameters))
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 68, in await_only
(begin_task_run pid=122393)     return current.driver.switch(awaitable)
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 121, in greenlet_spawn
(begin_task_run pid=122393)     value = await result
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/aiosqlite/cursor.py", line 37, in execute
(begin_task_run pid=122393)     await self._execute(self._cursor.execute, sql, parameters)
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/aiosqlite/cursor.py", line 31, in _execute
(begin_task_run pid=122393)     return await self._conn._execute(fn, *args, **kwargs)
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/aiosqlite/core.py", line 129, in _execute
(begin_task_run pid=122393)     return await future
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/aiosqlite/core.py", line 102, in run
(begin_task_run pid=122393)     result = function()
(begin_task_run pid=122393) sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) database is locked
(begin_task_run pid=122393) [SQL: INSERT INTO task_run_state (id, created, updated, type, timestamp, name, message, state_details, data, task_run_id) VALUES (:id, :created, :updated, :type, :timestamp, :name, :message, :state_details, :data, :task_run_id)]
(begin_task_run pid=122393) [parameters: {'id': '62625eab-0657-49d1-8e11-1db2fb37dc7a', 'created': '2022-10-21 09:07:22.594887', 'updated': '2022-10-21 09:07:22.594895', 'type': 'RUNNING', 'timestamp': '2022-10-21 09:07:22.567893', 'name': 'Running', 'message': None, 'state_details': '{"flow_run_id": "57a7a48d-95d4-4fdd-ba3d-e48fe0a897b1", "task_run_id": "512989fc-6e87-49e0-9c86-84bfdf4328d2", "child_flow_run_id": null, "scheduled_time": null, "cache_key": null, "cache_expiration": null, "untrackable_result": false}', 'data': 'null', 'task_run_id': '512989fc-6e87-49e0-9c86-84bfdf4328d2'}]
(begin_task_run pid=122393) (Background on this error at: https://sqlalche.me/e/14/e3q8)
(begin_task_run pid=122393) 
(begin_task_run pid=122393) During handling of the above exception, another exception occurred:
(begin_task_run pid=122393) 
(begin_task_run pid=122393) Traceback (most recent call last):
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/starlette/middleware/errors.py", line 162, in __call__
(begin_task_run pid=122393)     await self.app(scope, receive, _send)
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/starlette/middleware/exceptions.py", line 75, in __call__
(begin_task_run pid=122393)     raise exc
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/starlette/middleware/exceptions.py", line 64, in __call__
(begin_task_run pid=122393)     await self.app(scope, receive, sender)
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/fastapi/middleware/asyncexitstack.py", line 21, in __call__
(begin_task_run pid=122393)     raise e
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/fastapi/middleware/asyncexitstack.py", line 18, in __call__
(begin_task_run pid=122393)     await self.app(scope, receive, send)
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/starlette/routing.py", line 680, in __call__
(begin_task_run pid=122393)     await route.handle(scope, receive, send)
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/starlette/routing.py", line 275, in handle
(begin_task_run pid=122393)     await self.app(scope, receive, send)
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/starlette/routing.py", line 65, in app
(begin_task_run pid=122393)     response = await func(request)
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/prefect/orion/utilities/server.py", line 101, in handle_response_scoped_depends
(begin_task_run pid=122393)     response = await default_handler(request)
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/fastapi/routing.py", line 231, in app
(begin_task_run pid=122393)     raw_response = await run_endpoint_function(
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/fastapi/routing.py", line 160, in run_endpoint_function
(begin_task_run pid=122393)     return await dependant.call(**values)
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/prefect/orion/api/task_runs.py", line 198, in set_task_run_state
(begin_task_run pid=122393)     orchestration_result = await models.task_runs.set_task_run_state(
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/prefect/orion/models/task_runs.py", line 316, in set_task_run_state
(begin_task_run pid=122393)     await context.validate_proposed_state()
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/prefect/orion/database/dependencies.py", line 117, in async_wrapper
(begin_task_run pid=122393)     return await fn(*args, **kwargs)
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/prefect/orion/orchestration/rules.py", line 377, in validate_proposed_state
(begin_task_run pid=122393)     task_run_id=self.run.id,
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/sqlalchemy/orm/attributes.py", line 482, in __get__
(begin_task_run pid=122393)     return self.impl.get(state, dict_)
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/sqlalchemy/orm/attributes.py", line 942, in get
(begin_task_run pid=122393)     value = self._fire_loader_callables(state, key, passive)
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/sqlalchemy/orm/attributes.py", line 973, in _fire_loader_callables
(begin_task_run pid=122393)     return state._load_expired(state, passive)
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/sqlalchemy/orm/state.py", line 712, in _load_expired
(begin_task_run pid=122393)     self.manager.expired_attribute_loader(self, toload, passive)
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/sqlalchemy/orm/loading.py", line 1451, in load_scalar_attributes
(begin_task_run pid=122393)     result = load_on_ident(
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/sqlalchemy/orm/loading.py", line 407, in load_on_ident
(begin_task_run pid=122393)     return load_on_pk_identity(
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/sqlalchemy/orm/loading.py", line 530, in load_on_pk_identity
(begin_task_run pid=122393)     session.execute(
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 1713, in execute
(begin_task_run pid=122393)     conn = self._connection_for_bind(bind)
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 1549, in _connection_for_bind
(begin_task_run pid=122393)     TransactionalContext._trans_ctx_check(self)
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/sqlalchemy/engine/util.py", line 199, in _trans_ctx_check
(begin_task_run pid=122393)     raise exc.InvalidRequestError(
(begin_task_run pid=122393) sqlalchemy.exc.InvalidRequestError: Can't operate on closed transaction inside context manager.  Please complete the context manager before emitting further commands.
(begin_task_run pid=122393) [2022-10-21 11:08:22,637] [ERROR] [prefect.task_runs] - Crash detected! Execution was interrupted by an unexpected exception: Traceback (most recent call last):
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1900, in _execute_context
(begin_task_run pid=122393)     self.dialect.do_execute(
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 736, in do_execute
(begin_task_run pid=122393)     cursor.execute(statement, parameters)
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 108, in execute
(begin_task_run pid=122393)     self._adapt_connection._handle_exception(error)
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 236, in _handle_exception
(begin_task_run pid=122393)     raise error
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 90, in execute
(begin_task_run pid=122393)     self.await_(_cursor.execute(operation, parameters))
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 68, in await_only
(begin_task_run pid=122393)     return current.driver.switch(awaitable)
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 121, in greenlet_spawn
(begin_task_run pid=122393)     value = await result
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/aiosqlite/cursor.py", line 37, in execute
(begin_task_run pid=122393)     await self._execute(self._cursor.execute, sql, parameters)
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/aiosqlite/cursor.py", line 31, in _execute
(begin_task_run pid=122393)     return await self._conn._execute(fn, *args, **kwargs)
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/aiosqlite/core.py", line 129, in _execute
(begin_task_run pid=122393)     return await future
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/aiosqlite/core.py", line 102, in run
(begin_task_run pid=122393)     result = function()
(begin_task_run pid=122393) sqlite3.OperationalError: database is locked
(begin_task_run pid=122393) 
(begin_task_run pid=122393) The above exception was the direct cause of the following exception:
(begin_task_run pid=122393) 
(begin_task_run pid=122393) Traceback (most recent call last):
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/prefect/orion/orchestration/rules.py", line 371, in validate_proposed_state
(begin_task_run pid=122393)     await self._validate_proposed_state()
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/prefect/orion/database/dependencies.py", line 117, in async_wrapper
(begin_task_run pid=122393)     return await fn(*args, **kwargs)
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/prefect/orion/orchestration/rules.py", line 412, in _validate_proposed_state
(begin_task_run pid=122393)     await self.session.flush()
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/sqlalchemy/ext/asyncio/session.py", line 405, in flush
(begin_task_run pid=122393)     await greenlet_spawn(self.sync_session.flush, objects=objects)
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 128, in greenlet_spawn
(begin_task_run pid=122393)     result = context.switch(value)
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 3429, in flush
(begin_task_run pid=122393)     self._flush(objects)
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 3569, in _flush
(begin_task_run pid=122393)     transaction.rollback(_capture_exception=True)
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
(begin_task_run pid=122393)     compat.raise_(
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/sqlalchemy/util/compat.py", line 208, in raise_
(begin_task_run pid=122393)     raise exception
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 3529, in _flush
(begin_task_run pid=122393)     flush_context.execute()
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/sqlalchemy/orm/unitofwork.py", line 453, in execute
(begin_task_run pid=122393)     n.execute_aggregate(self, set_)
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/sqlalchemy/orm/unitofwork.py", line 747, in execute_aggregate
(begin_task_run pid=122393)     persistence.save_obj(
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/sqlalchemy/orm/persistence.py", line 245, in save_obj
(begin_task_run pid=122393)     _emit_insert_statements(
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/sqlalchemy/orm/persistence.py", line 1097, in _emit_insert_statements
(begin_task_run pid=122393)     c = connection._execute_20(
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1705, in _execute_20
(begin_task_run pid=122393)     return meth(self, args_10style, kwargs_10style, execution_options)
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/sqlalchemy/sql/elements.py", line 333, in _execute_on_connection
(begin_task_run pid=122393)     return connection._execute_clauseelement(
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1572, in _execute_clauseelement
(begin_task_run pid=122393)     ret = self._execute_context(
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1943, in _execute_context
(begin_task_run pid=122393)     self._handle_dbapi_exception(
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 2124, in _handle_dbapi_exception
(begin_task_run pid=122393)     util.raise_(
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/sqlalchemy/util/compat.py", line 208, in raise_
(begin_task_run pid=122393)     raise exception
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1900, in _execute_context
(begin_task_run pid=122393)     self.dialect.do_execute(
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 736, in do_execute
(begin_task_run pid=122393)     cursor.execute(statement, parameters)
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 108, in execute
(begin_task_run pid=122393)     self._adapt_connection._handle_exception(error)
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 236, in _handle_exception
(begin_task_run pid=122393)     raise error
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 90, in execute
(begin_task_run pid=122393)     self.await_(_cursor.execute(operation, parameters))
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 68, in await_only
(begin_task_run pid=122393)     return current.driver.switch(awaitable)
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 121, in greenlet_spawn
(begin_task_run pid=122393)     value = await result
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/aiosqlite/cursor.py", line 37, in execute
(begin_task_run pid=122393)     await self._execute(self._cursor.execute, sql, parameters)
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/aiosqlite/cursor.py", line 31, in _execute
(begin_task_run pid=122393)     return await self._conn._execute(fn, *args, **kwargs)
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/aiosqlite/core.py", line 129, in _execute
(begin_task_run pid=122393)     return await future
(begin_task_run pid=122393)   File "/home/robbert/.virtualenvs/prefect_test/lib/python3.8/site-packages/aiosqlite/core.py", line 102, in run
(begin_task_run pid=122393)     result = function()
(begin_task_run pid=122393) sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) database is locked
(begin_task_run pid=122393) [SQL: INSERT INTO task_run_state (id, created, updated, type, timestamp, name, message, state_details, data, task_run_id) VALUES (:id, :created, :updated, :type, :timestamp, :name, :message, :state_details, :data, :task_run_id)]
(begin_task_run pid=122393) [parameters: {'id': '62625eab-0657-49d1-8e11-1db2fb37dc7a', 'created': '2022-10-21 09:07:22.594887', 'updated': '2022-10-21 09:07:22.594895', 'type': 'RUNNING', 'timestamp': '2022-10-21 09:07:22.567893', 'name': 'Running', 'message': None, 'state_details': '{"flow_run_id": "57a7a48d-95d4-4fdd-ba3d-e48fe0a897b1", "task_run_id": "512989fc-6e87-49e0-9c86-84bfdf4328d2", "child_flow_run_id": null, "scheduled_time": null, "cache_key": null, "cache_expiration": null, "untrackable_result": false}', 'data': 'null', 'task_run_id': '512989fc-6e87-49e0-9c86-84bfdf4328d2'}]
(begin_task_run pid=122393) (Background on this error at: https://sqlalche.me/e/14/e3q8)
(begin_task_run pid=122393) 
(begin_task_run pid=122393) During handling of the above exception, another exception occurred:

...
the same error multiple times.
...

Versions

Version:             2.6.3
API version:         0.8.2
Python version:      3.8.14
Git commit:          9e7da96e
Built:               Tue, Oct 18, 2022 1:55 PM
OS/Arch:             linux/x86_64
Profile:             default
Server type:         ephemeral
Server:
  Database:          sqlite
  SQLite version:    3.37.2

Additional context

No response

drfraser commented 1 year ago

I have gotten this error as well - the initial stack trace is equivalent to the one in this report. Unfortunately it seems to have locked up everything and one flow has been "running" for 2 days with everything else completely halted / stuck on "late". Perhaps relevant: flows have been running fine for quite some time and the database become pretty large (176MB) which I did not expect.

lmmentel commented 1 year ago

Same here, running a flow with simple tasks and .map() locks up the database and fails all the tasks.

zanieb commented 1 year ago

Hi! This is because you are using the SQLite backend with an ephemeral API. When using the ephemeral API, each task run starts its own copy of the API server in-memory. Each of these API servers creates a connection to the database. When you run many tasks concurrently, you have many connections to the SQLite database which does not perform well with concurrent writes.

Instead, you should start a standalone API server (with prefect orion start) then set PREFECT_API_URL so your flow and task runs connect to it. Then, the single instance of the API will manage communication with the database reducing strain during concurrent runs. I'd also recommend switching to using PostgreSQL for the backing database instead, as it'll perform much better at scale.

lmmentel commented 1 year ago

Thanks @madkinsz, I suspected something like you've explained taking place, however is there a way to run flow as a script without setting all the infa? Would SequentialTaskRunner be a remedy here?

zanieb commented 1 year ago

If you run tasks sequentially yeah you're unlikely to run into database locking issues. You could still have problems running flow runs concurrently though.

drfraser commented 1 year ago

For anyone else encountering this issue - in my setup, I was running a standalone "prefect orion start &", an agent as well, PREFECT_API_HOST was set (so I assumed the URL setting was set as well) - and all the flows were using SequentialTaskRunner (because I am using Prefect as a replacement for cron w/ a nicer UI).

But only upon explicitly setting PREFECT_API_URL did the server type switch from ephemeral to hosted.

robbert-harms commented 1 year ago

Is it possible to solve this issue without setting up all this infrastructure? My main use of Prefect was actually being able to smoothly parallelize my pipeline while running only locally on my development machine. This used to work before in Prefect version 2.0.3, after that version it keeps crashing. Is there a way to disable this database system? Can we have an in memory database working with multiple threads/processes?

d33bs commented 1 year ago

Hello - I'm also seeing the effects of SQLite database locking with the ephemeral API in Prefect (specifically with ConcurrentTaskRunner) (many thanks @shntnu). Based on my understanding of the conversation in this issue, it seems like this locking may occur with any scenario which leverages task.map() without SequentialTaskRunner or prefect orion server running somewhere outside of memory (is this accurate?).

Like @robbert-harms mentions, I would also seek a solution that doesn't rely on additional complexities with new or added infrastructure. More broadly I'd say that prefect's task.map() capabilities are one reason I find inspiration in this library, in addition to a lot of the other fantastic work taking place, and I hope there's a way to resolve these challenges.

A few additional questions in the interest of solutions:

zanieb commented 1 year ago

Thanks for the comments!

I'm not sure adding warnings to .map specifically makes sense to me. Any highly concurrent usage of the database is going to cause problems. We're going to be adding more concurrency heavy features in the future. I think that we should highlight that the ephemeral API and SQLite database is not intended for highly concurrent workflows. Perhaps there are some good places in the documentation for this cc @tpdorsey

There's also the SQLite HCTree project https://sqlite.org/hctree/doc/hctree/doc/hctree/index.html. DuckDB might make sense, but it'll be hard for us to justify development time for another database backend at this time. Writing migrations and testing both databases is already quite a bit of work.

I'm working on changes to improve concurrency handling. Perhaps I can expose automatic throttling of concurrency when using the empheral API with SQLite or make the settings easy to reach.

I think the fastest and simplest fix may be to adjust our task runs on Ray to share a single client instance which would avoid creating multiple ephemeral APIs and consequently create less concurrent SQLite connections. I'm a little fuzzy on the details of how that would work, but it is probably worth exploring.

d33bs commented 1 year ago

Thank you @madkinsz for the thorough reply and thoughts towards addressing this topic! I'm excited to see the concurrency heavy features and capabilities you mentioned. The controls with automated throttling for concurrency seem like a great way to balance the challenges at hand, though I'd still hope for a simplified solution via the database technologies (perhaps through SQLite HCTree as you suggested). I'd also wonder: are there are there any ways to BYOD "bring your own database" when leveraging the ephemeral API (with an eye towards experimentation or decoupling the problem space)?

github-actions[bot] commented 1 year ago

This issue is stale because it has been open 30 days with no activity. To keep this issue open remove stale label or comment.

robbert-harms commented 1 year ago

Has this been resolved yet? It would be wonderful if we can run prefect pipelines standalone.

github-actions[bot] commented 1 year ago

This issue is stale because it has been open 30 days with no activity. To keep this issue open remove stale label or comment.

github-actions[bot] commented 1 year ago

This issue was closed because it has been stale for 14 days with no activity. If this issue is important or you have more to add feel free to re-open it.

blakete commented 1 year ago

Has anyone come up with a solution for this? I am experiencing a similar issue while running a large number of concurrent tasks.

zanieb commented 1 year ago

@blakete first switch to using a hosted server (i.e. prefect server start or Prefect Cloud) and if you're already doing that switch to Postgres as a backing database.

blakete commented 1 year ago

@zanieb Thanks for getting back on this! When you say Postgres as a hacking database do you mean for the application or for the Prefect Orion database?

zanieb commented 1 year ago

@blakete for the Prefect Server (previously known as Orion) database, yes.

cleung1996 commented 4 months ago

Hi @zanieb Just a quick question on this issue, as I am experiencing it too. Is there a way to switch to Postgres while still retaining my flow history and run deployments? I dont want to switch over to Postgresql and lose out on all the blocks and everything that was already built out. Thanks!

zanieb commented 4 months ago

@cleung1996 I don't work at Prefect anymore, but I don't think there's automated migration. cc @serinamarie