PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
17.5k stars 1.64k forks source link

Calling sync task from async flow causes flow run to hang #7551

Closed ilya-galperin closed 1 year ago

ilya-galperin commented 2 years ago

First check

Bug summary

When calling a synchronous task from an asynchronous flow, the flow execution seems to "hang" with the task not being executed. In the MRP below, I tried calling a synchronous task with the result set of some asynchronous tasks; the asynchronous tasks execute and the synchronous task never begins. I've also tried just calling the synchronous task directly with no asynchronous task before it and seem to be experiencing the same behavior.

Reproduction

from prefect import flow, task
import asyncio, time

@task
async def async_task(n):
    await asyncio.sleep(1)
    return n

@task
def sync_task(res):
    time.sleep(1)
    print(res)

@flow
async def async_flow():
    res = await asyncio.gather(*(async_task(n) for n in range(0, 3, 1)))
    sync_task(res)

if __name__ == "__main__":
    asyncio.run(async_flow())

Error

13:22:40.277 | INFO    | prefect.engine - Created flow run 'tourmaline-pronghorn' for flow 'async-flow'
13:22:41.313 | INFO    | Flow run 'tourmaline-pronghorn' - Created task run 'async_task-1da93ad6-0' for task 'async_task'
13:22:41.314 | INFO    | Flow run 'tourmaline-pronghorn' - Executing 'async_task-1da93ad6-0' immediately...
13:22:41.349 | INFO    | Flow run 'tourmaline-pronghorn' - Created task run 'async_task-1da93ad6-2' for task 'async_task'
13:22:41.350 | INFO    | Flow run 'tourmaline-pronghorn' - Executing 'async_task-1da93ad6-2' immediately...
13:22:41.370 | INFO    | Flow run 'tourmaline-pronghorn' - Created task run 'async_task-1da93ad6-1' for task 'async_task'
13:22:41.370 | INFO    | Flow run 'tourmaline-pronghorn' - Executing 'async_task-1da93ad6-1' immediately...
13:22:42.872 | INFO    | Task run 'async_task-1da93ad6-0' - Finished in state Completed()
13:22:42.942 | INFO    | Task run 'async_task-1da93ad6-2' - Finished in state Completed()
13:22:42.962 | INFO    | Task run 'async_task-1da93ad6-1' - Finished in state Completed()

The output hangs after this...

Versions

Version:             2.6.0
API version:         0.8.2
Python version:      3.10.6
Git commit:          96f09a51
Built:               Thu, Oct 13, 2022 3:21 PM
OS/Arch:             darwin/arm64
Profile:             endvr-staging
Server type:         cloud

Additional context

No response

bunchesofdonald commented 2 years ago

Thanks for reporting this issue @ilya-galperin! We have a similar issue logged, #6318, but in the time between that issue and now the error has turned into a stall/deadlock. Totally understandable that a search didn't find that.

We're actively working on this and hope to have pull requests coming in the next few weeks to resolve it.

ilya-galperin commented 2 years ago

Thanks @bunchesofdonald

I've done a little more testing with this MRP and am noticing that this behavior only seems to occur for me when the execution environment is pointing to a remote Orion DB i.e. if I am running this MRP on my local machine with the profile pointing to a Prefect cloud account.

When switching my machines profile to one that is pointing to a local DB via:

prefect profile use none No Prefect Orion instance specified using profile 'none'. Flow run metadata will be stored at the locally configured database: sqlite+aiosqlite:////Users/Galp88021/.prefect/orion.db

The flow runs and completes as expected most of the time, although I get the following intermittent error:

10:36:38.080 | INFO | prefect.engine - Created flow run 'maize-tarsier' for flow 'async-flow' 10:36:38.266 | INFO | Flow run 'maize-tarsier' - Created task run 'async_task-1da93ad6-2' for task 'async_task' 10:36:38.266 | INFO | Flow run 'maize-tarsier' - Executing 'async_task-1da93ad6-2' immediately... 10:36:38.278 | INFO | Flow run 'maize-tarsier' - Created task run 'async_task-1da93ad6-1' for task 'async_task' 10:36:38.278 | INFO | Flow run 'maize-tarsier' - Executing 'async_task-1da93ad6-1' immediately... 10:36:38.292 | INFO | Flow run 'maize-tarsier' - Created task run 'async_task-1da93ad6-0' for task 'async_task' 10:36:38.292 | INFO | Flow run 'maize-tarsier' - Executing 'async_task-1da93ad6-0' immediately... 10:36:39.404 | INFO | Task run 'async_task-1da93ad6-1' - Finished in state Completed() 10:36:39.411 | INFO | Task run 'async_task-1da93ad6-2' - Finished in state Completed() 10:36:39.429 | INFO | Task run 'async_task-1da93ad6-0' - Finished in state Completed() 10:36:39.450 | INFO | Flow run 'maize-tarsier' - Created task run 'sync_task-edd45147-0' for task 'sync_task' 10:36:39.450 | INFO | Flow run 'maize-tarsier' - Executing 'sync_task-edd45147-0' immediately... 10:36:40.277 | ERROR | asyncio - Task exception was never retrieved future: <Task finished name='Task-16' coro=<()> exception=KeyError(4440626448)> Traceback (most recent call last): File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 263, in on_shutdown yield GeneratorExit

During handling of the above exception, another exception occurred:

Traceback (most recent call last): File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 267, in on_shutdown EVENT_LOOP_GC_REFS.pop(key) KeyError: 4440626448 [0, 1, 2] 10:36:40.503 | INFO | Task run 'sync_task-edd45147-0' - Finished in state Completed() 10:36:40.519 | INFO | Flow run 'maize-tarsier' - Finished in state Completed('All states completed.')

zanieb commented 2 years ago

Thanks for the additional notes! This is related to #6891

Very interesting failure for our EVENT_LOOP_GC_REFS — we'll investigate that as well.

ilya-galperin commented 2 years ago

Wanted to add some more findings here if that's okay @madkinsz

I did a similar test to the above using asyncio.as_completed() and get some hanging for about 30 secs then the below error log if pointing to my local Orion DB. I haven't tested this fully when pointing to cloud Orion since I don't want to cause any locking issues with our cloud Orion instance but in the short attempt I made it did hang.

Let me know if you'd like for me to open a new issue on this behavior since as_completed() and gather() seem to maybe be resulting in different behaviors. I think as_completed() is a relatively popular use-case for ETL type workloads so getting this working as well would be great :)

from prefect import flow, task, get_run_logger
import asyncio, time

@task
async def async_task(n):
    await asyncio.sleep(1)
    return n

@task
def sync_task(res):
    time.sleep(1)
    print(res)

@flow
async def async_flow():
    async_task_list = []
    for n in range(0, 3, 1):
        async_task_list.append(
            asyncio.create_task(async_task.with_options(name=f"task_{n}")(n))
        )

    for res in asyncio.as_completed(async_task_list):
        completed = await res
        sync_task(completed)

if __name__ == "__main__":
    asyncio.run(async_flow())
Traceback (most recent call last):
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1819, in _execute_context
    self.dialect.do_execute(
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/engine/default.py", line 732, in do_execute
    cursor.execute(statement, parameters)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 100, in execute
    self._adapt_connection._handle_exception(error)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 228, in _handle_exception
    raise error
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 82, in execute
    self.await_(_cursor.execute(operation, parameters))
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 68, in await_only
    return current.driver.switch(awaitable)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 121, in greenlet_spawn
    value = await result
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/aiosqlite/cursor.py", line 37, in execute
    await self._execute(self._cursor.execute, sql, parameters)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/aiosqlite/cursor.py", line 31, in _execute
    return await self._conn._execute(fn, *args, **kwargs)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/aiosqlite/core.py", line 129, in _execute
    return await future
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/aiosqlite/core.py", line 102, in run
    result = function()
sqlite3.OperationalError: database is locked

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/prefect/logging/handlers.py", line 146, in send_logs
    await client.create_logs(self._pending_logs)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/prefect/client/orion.py", line 1683, in create_logs
    await self._client.post(f"/logs/", json=serialized_logs)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/httpx/_client.py", line 1842, in post
    return await self.request(
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/httpx/_client.py", line 1527, in request
    return await self.send(request, auth=auth, follow_redirects=follow_redirects)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/prefect/client/base.py", line 159, in send
    await super().send(*args, **kwargs)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/httpx/_client.py", line 1614, in send
    response = await self._send_handling_auth(
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/httpx/_client.py", line 1642, in _send_handling_auth
    response = await self._send_handling_redirects(
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/httpx/_client.py", line 1679, in _send_handling_redirects
    response = await self._send_single_request(request)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/httpx/_client.py", line 1716, in _send_single_request
    response = await transport.handle_async_request(request)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/httpx/_transports/asgi.py", line 152, in handle_async_request
    await self.app(scope, receive, send)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/fastapi/applications.py", line 270, in __call__
    await super().__call__(scope, receive, send)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/starlette/applications.py", line 124, in __call__
    await self.middleware_stack(scope, receive, send)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/starlette/middleware/errors.py", line 184, in __call__
    raise exc
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/starlette/middleware/errors.py", line 162, in __call__
    await self.app(scope, receive, _send)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/starlette/middleware/cors.py", line 84, in __call__
    await self.app(scope, receive, send)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/starlette/middleware/exceptions.py", line 75, in __call__
    raise exc
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/starlette/middleware/exceptions.py", line 64, in __call__
    await self.app(scope, receive, sender)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/fastapi/middleware/asyncexitstack.py", line 21, in __call__
    raise e
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/fastapi/middleware/asyncexitstack.py", line 18, in __call__
    await self.app(scope, receive, send)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/starlette/routing.py", line 680, in __call__
    await route.handle(scope, receive, send)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/starlette/routing.py", line 427, in handle
    await self.app(scope, receive, send)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/fastapi/applications.py", line 270, in __call__
    await super().__call__(scope, receive, send)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/starlette/applications.py", line 124, in __call__
    await self.middleware_stack(scope, receive, send)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/starlette/middleware/errors.py", line 184, in __call__
    raise exc
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/starlette/middleware/errors.py", line 162, in __call__
    await self.app(scope, receive, _send)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/starlette/middleware/exceptions.py", line 75, in __call__
    raise exc
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/starlette/middleware/exceptions.py", line 64, in __call__
    await self.app(scope, receive, sender)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/fastapi/middleware/asyncexitstack.py", line 21, in __call__
    raise e
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/fastapi/middleware/asyncexitstack.py", line 18, in __call__
    await self.app(scope, receive, send)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/starlette/routing.py", line 680, in __call__
    await route.handle(scope, receive, send)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/starlette/routing.py", line 275, in handle
    await self.app(scope, receive, send)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/starlette/routing.py", line 65, in app
    response = await func(request)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/prefect/orion/utilities/server.py", line 101, in handle_response_scoped_depends
    response = await default_handler(request)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/fastapi/routing.py", line 231, in app
    raw_response = await run_endpoint_function(
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/fastapi/routing.py", line 160, in run_endpoint_function
    return await dependant.call(**values)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/prefect/orion/api/logs.py", line 27, in create_logs
    await models.logs.create_logs(session=session, logs=batch)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/prefect/orion/database/dependencies.py", line 117, in async_wrapper
    return await fn(*args, **kwargs)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/prefect/orion/models/logs.py", line 45, in create_logs
    await session.execute(log_insert.values([log.dict() for log in logs]))
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/ext/asyncio/session.py", line 212, in execute
    result = await greenlet_spawn(
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 126, in greenlet_spawn
    result = context.throw(*sys.exc_info())
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 1712, in execute
    result = conn._execute_20(statement, params or {}, execution_options)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1631, in _execute_20
    return meth(self, args_10style, kwargs_10style, execution_options)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/sql/elements.py", line 332, in _execute_on_connection
    return connection._execute_clauseelement(
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1498, in _execute_clauseelement
    ret = self._execute_context(
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1862, in _execute_context
    self._handle_dbapi_exception(
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 2043, in _handle_dbapi_exception
    util.raise_(
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/util/compat.py", line 208, in raise_
    raise exception
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1819, in _execute_context
    self.dialect.do_execute(
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/engine/default.py", line 732, in do_execute
    cursor.execute(statement, parameters)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 100, in execute
    self._adapt_connection._handle_exception(error)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 228, in _handle_exception
    raise error
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 82, in execute
    self.await_(_cursor.execute(operation, parameters))
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 68, in await_only
    return current.driver.switch(awaitable)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 121, in greenlet_spawn
    value = await result
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/aiosqlite/cursor.py", line 37, in execute
    await self._execute(self._cursor.execute, sql, parameters)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/aiosqlite/cursor.py", line 31, in _execute
    return await self._conn._execute(fn, *args, **kwargs)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/aiosqlite/core.py", line 129, in _execute
    return await future
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/aiosqlite/core.py", line 102, in run
    result = function()
sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) database is locked
[SQL: INSERT INTO log (id, created, updated, name, level, flow_run_id, task_run_id, message, timestamp) VALUES (:id, :created, :updated, :name_m0, :level_m0, :flow_run_id_m0, :task_run_id_m0, :message_m0, :timestamp_m0), (:id_m1, :created_m1, :updated_m1, :name_m1, :level_m1, :flow_run_id_m1, :task_run_id_m1, :message_m1, :timestamp_m1), (:id_m2, :created_m2, :updated_m2, :name_m2, :level_m2, :flow_run_id_m2, :task_run_id_m2, :message_m2, :timestamp_m2), (:id_m3, :created_m3, :updated_m3, :name_m3, :level_m3, :flow_run_id_m3, :task_run_id_m3, :message_m3, :timestamp_m3), (:id_m4, :created_m4, :updated_m4, :name_m4, :level_m4, :flow_run_id_m4, :task_run_id_m4, :message_m4, :timestamp_m4), (:id_m5, :created_m5, :updated_m5, :name_m5, :level_m5, :flow_run_id_m5, :task_run_id_m5, :message_m5, :timestamp_m5)]
[parameters: {'id': 'b77e5931-5fd3-4d8f-8415-7a98a2352a56', 'created': '2022-11-16 23:14:55.583187', 'updated': '2022-11-16 23:14:55.583198', 'name_m0': 'prefect.flow_runs', 'level_m0': 20, 'flow_run_id_m0': 'ddb004c2-54a0-4845-91d3-11a42c936cfa', 'task_run_id_m0': None, 'message_m0': "Created task run 'task_1-1da93ad6-1' for task 'task_1'", 'timestamp_m0': '2022-11-16 23:14:53.497164', 'id_m1': '794014d1-5dba-4f0f-ac90-723bee757e20', 'created_m1': '2022-11-16 23:14:55.583204', 'updated_m1': '2022-11-16 23:14:55.583206', 'name_m1': 'prefect.flow_runs', 'level_m1': 20, 'flow_run_id_m1': 'ddb004c2-54a0-4845-91d3-11a42c936cfa', 'task_run_id_m1': None, 'message_m1': "Executing 'task_1-1da93ad6-1' immediately...", 'timestamp_m1': '2022-11-16 23:14:53.497536', 'id_m2': '7cdbd571-51c9-4890-97e3-76bc08dcc579', 'created_m2': '2022-11-16 23:14:55.583210', 'updated_m2': '2022-11-16 23:14:55.583212', 'name_m2': 'prefect.flow_runs', 'level_m2': 20, 'flow_run_id_m2': 'ddb004c2-54a0-4845-91d3-11a42c936cfa', 'task_run_id_m2': None, 'message_m2': "Created task run 'task_0-1da93ad6-0' for task 'task_0'", 'timestamp_m2': '2022-11-16 23:14:53.510227', 'id_m3': '2a365778-eb9b-4f83-89a6-762ebcaf1740', 'created_m3': '2022-11-16 23:14:55.583216', 'updated_m3': '2022-11-16 23:14:55.583218', 'name_m3': 'prefect.flow_runs', 'level_m3': 20, 'flow_run_id_m3': 'ddb004c2-54a0-4845-91d3-11a42c936cfa', 'task_run_id_m3': None, 'message_m3': "Executing 'task_0-1da93ad6-0' immediately...", 'timestamp_m3': '2022-11-16 23:14:53.510392', 'id_m4': '926b538e-6334-4dac-aaea-5cfc58ed5867', 'created_m4': '2022-11-16 23:14:55.583221', 'updated_m4': '2022-11-16 23:14:55.583223', 'name_m4': 'prefect.flow_runs', 'level_m4': 20, 'flow_run_id_m4': 'ddb004c2-54a0-4845-91d3-11a42c936cfa', 'task_run_id_m4': None, 'message_m4': "Created task run 'task_2-1da93ad6-2' for task 'task_2'", 'timestamp_m4': '2022-11-16 23:14:53.552236', 'id_m5': 'c5fc0c9c-9f88-4005-b699-7103fe79a538', 'created_m5': '2022-11-16 23:14:55.583227', 'updated_m5': '2022-11-16 23:14:55.583229', 'name_m5': 'prefect.flow_runs', 'level_m5': 20, 'flow_run_id_m5': 'ddb004c2-54a0-4845-91d3-11a42c936cfa', 'task_run_id_m5': None, 'message_m5': "Executing 'task_2-1da93ad6-2' immediately...", 'timestamp_m5': '2022-11-16 23:14:53.552423'}]
(Background on this error at: https://sqlalche.me/e/14/e3q8)
Worker information:
    Approximate queue length: 0
    Pending log batch length: 6
    Pending log batch size: 2160
The log worker will attempt to send these logs again in 2.0s
^C15:16:31.153 | ERROR   | prefect.orion - Encountered exception in request:
Traceback (most recent call last):
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/prefect/orion/models/task_runs.py", line 316, in set_task_run_state
    await context.validate_proposed_state()
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/prefect/orion/database/dependencies.py", line 117, in async_wrapper
    return await fn(*args, **kwargs)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/prefect/orion/orchestration/rules.py", line 371, in validate_proposed_state
    await self._validate_proposed_state()
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/prefect/orion/database/dependencies.py", line 117, in async_wrapper
    return await fn(*args, **kwargs)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/prefect/orion/orchestration/rules.py", line 412, in _validate_proposed_state
    await self.session.flush()
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/ext/asyncio/session.py", line 403, in flush
    await greenlet_spawn(self.sync_session.flush, objects=objects)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 128, in greenlet_spawn
    result = context.switch(value)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 3383, in flush
    self._flush(objects)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 3522, in _flush
    with util.safe_reraise():
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
    compat.raise_(
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/util/compat.py", line 208, in raise_
    raise exception
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 3483, in _flush
    flush_context.execute()
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/orm/unitofwork.py", line 453, in execute
    n.execute_aggregate(self, set_)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/orm/unitofwork.py", line 747, in execute_aggregate
    persistence.save_obj(
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/orm/persistence.py", line 245, in save_obj
    _emit_insert_statements(
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/orm/persistence.py", line 1097, in _emit_insert_statements
    c = connection._execute_20(
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1631, in _execute_20
    return meth(self, args_10style, kwargs_10style, execution_options)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/sql/elements.py", line 332, in _execute_on_connection
    return connection._execute_clauseelement(
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1498, in _execute_clauseelement
    ret = self._execute_context(
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1862, in _execute_context
    self._handle_dbapi_exception(
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 2047, in _handle_dbapi_exception
    util.raise_(exc_info[1], with_traceback=exc_info[2])
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/util/compat.py", line 208, in raise_
    raise exception
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1819, in _execute_context
    self.dialect.do_execute(
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/engine/default.py", line 732, in do_execute
    cursor.execute(statement, parameters)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 82, in execute
    self.await_(_cursor.execute(operation, parameters))
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 68, in await_only
    return current.driver.switch(awaitable)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 121, in greenlet_spawn
    value = await result
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/aiosqlite/cursor.py", line 37, in execute
    await self._execute(self._cursor.execute, sql, parameters)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/aiosqlite/cursor.py", line 31, in _execute
    return await self._conn._execute(fn, *args, **kwargs)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/aiosqlite/core.py", line 129, in _execute
    return await future
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/starlette/middleware/errors.py", line 162, in __call__
    await self.app(scope, receive, _send)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/starlette/middleware/exceptions.py", line 75, in __call__
    raise exc
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/starlette/middleware/exceptions.py", line 64, in __call__
    await self.app(scope, receive, sender)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/fastapi/middleware/asyncexitstack.py", line 21, in __call__
    raise e
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/fastapi/middleware/asyncexitstack.py", line 18, in __call__
    await self.app(scope, receive, send)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/starlette/routing.py", line 680, in __call__
    await route.handle(scope, receive, send)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/starlette/routing.py", line 275, in handle
    await self.app(scope, receive, send)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/starlette/routing.py", line 65, in app
    response = await func(request)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/prefect/orion/utilities/server.py", line 101, in handle_response_scoped_depends
    response = await default_handler(request)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/fastapi/routing.py", line 231, in app
    raw_response = await run_endpoint_function(
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/fastapi/routing.py", line 160, in run_endpoint_function
    return await dependant.call(**values)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/prefect/orion/api/task_runs.py", line 198, in set_task_run_state
    orchestration_result = await models.task_runs.set_task_run_state(
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/prefect/orion/models/task_runs.py", line 307, in set_task_run_state
    async with contextlib.AsyncExitStack() as stack:
  File "/opt/homebrew/Cellar/python@3.10/3.10.6_1/Frameworks/Python.framework/Versions/3.10/lib/python3.10/contextlib.py", line 714, in __aexit__
    raise exc_details[1]
  File "/opt/homebrew/Cellar/python@3.10/3.10.6_1/Frameworks/Python.framework/Versions/3.10/lib/python3.10/contextlib.py", line 697, in __aexit__
    cb_suppress = await cb(*exc_details)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/prefect/orion/orchestration/rules.py", line 598, in __aexit__
    await self.after_transition(*exit_context)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/prefect/orion/orchestration/core_policy.py", line 169, in after_transition
    context.session, tags=context.run.tags
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/orm/attributes.py", line 481, in __get__
    return self.impl.get(state, dict_)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/orm/attributes.py", line 941, in get
    value = self._fire_loader_callables(state, key, passive)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/orm/attributes.py", line 972, in _fire_loader_callables
    return state._load_expired(state, passive)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/orm/state.py", line 712, in _load_expired
    self.manager.expired_attribute_loader(self, toload, passive)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/orm/loading.py", line 1451, in load_scalar_attributes
    result = load_on_ident(
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/orm/loading.py", line 407, in load_on_ident
    return load_on_pk_identity(
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/orm/loading.py", line 530, in load_on_pk_identity
    session.execute(
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 1711, in execute
    conn = self._connection_for_bind(bind)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 1549, in _connection_for_bind
    TransactionalContext._trans_ctx_check(self)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/engine/util.py", line 199, in _trans_ctx_check
    raise exc.InvalidRequestError(
sqlalchemy.exc.InvalidRequestError: Can't operate on closed transaction inside context manager.  Please complete the context manager before emitting further commands.
15:16:31.340 | ERROR   | Task run 'task_0-1da93ad6-0' - Crash detected! Execution was interrupted by an unexpected exception: Traceback (most recent call last):
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/prefect/orion/models/task_runs.py", line 316, in set_task_run_state
    await context.validate_proposed_state()
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/prefect/orion/database/dependencies.py", line 117, in async_wrapper
    return await fn(*args, **kwargs)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/prefect/orion/orchestration/rules.py", line 371, in validate_proposed_state
    await self._validate_proposed_state()
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/prefect/orion/database/dependencies.py", line 117, in async_wrapper
    return await fn(*args, **kwargs)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/prefect/orion/orchestration/rules.py", line 412, in _validate_proposed_state
    await self.session.flush()
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/ext/asyncio/session.py", line 403, in flush
    await greenlet_spawn(self.sync_session.flush, objects=objects)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 128, in greenlet_spawn
    result = context.switch(value)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 3383, in flush
    self._flush(objects)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 3522, in _flush
    with util.safe_reraise():
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
    compat.raise_(
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/util/compat.py", line 208, in raise_
    raise exception
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 3483, in _flush
    flush_context.execute()
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/orm/unitofwork.py", line 453, in execute
    n.execute_aggregate(self, set_)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/orm/unitofwork.py", line 747, in execute_aggregate
    persistence.save_obj(
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/orm/persistence.py", line 245, in save_obj
    _emit_insert_statements(
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/orm/persistence.py", line 1097, in _emit_insert_statements
    c = connection._execute_20(
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1631, in _execute_20
    return meth(self, args_10style, kwargs_10style, execution_options)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/sql/elements.py", line 332, in _execute_on_connection
    return connection._execute_clauseelement(
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1498, in _execute_clauseelement
    ret = self._execute_context(
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1862, in _execute_context
    self._handle_dbapi_exception(
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 2047, in _handle_dbapi_exception
    util.raise_(exc_info[1], with_traceback=exc_info[2])
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/util/compat.py", line 208, in raise_
    raise exception
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1819, in _execute_context
    self.dialect.do_execute(
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/engine/default.py", line 732, in do_execute
    cursor.execute(statement, parameters)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 82, in execute
    self.await_(_cursor.execute(operation, parameters))
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 68, in await_only
    return current.driver.switch(awaitable)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 121, in greenlet_spawn
    value = await result
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/aiosqlite/cursor.py", line 37, in execute
    await self._execute(self._cursor.execute, sql, parameters)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/aiosqlite/cursor.py", line 31, in _execute
    return await self._conn._execute(fn, *args, **kwargs)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/aiosqlite/core.py", line 129, in _execute
    return await future
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

sqlalchemy.exc.InvalidRequestError: Can't operate on closed transaction inside context manager.  Please complete the context manager before emitting further commands.

15:16:31.369 | ERROR   | prefect.orion - Encountered exception in request:
Traceback (most recent call last):
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/prefect/orion/models/task_runs.py", line 316, in set_task_run_state
    await context.validate_proposed_state()
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/prefect/orion/database/dependencies.py", line 117, in async_wrapper
    return await fn(*args, **kwargs)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/prefect/orion/orchestration/rules.py", line 371, in validate_proposed_state
    await self._validate_proposed_state()
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/prefect/orion/database/dependencies.py", line 117, in async_wrapper
    return await fn(*args, **kwargs)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/prefect/orion/orchestration/rules.py", line 412, in _validate_proposed_state
    await self.session.flush()
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/ext/asyncio/session.py", line 403, in flush
    await greenlet_spawn(self.sync_session.flush, objects=objects)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 128, in greenlet_spawn
    result = context.switch(value)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 3383, in flush
    self._flush(objects)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 3522, in _flush
    with util.safe_reraise():
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
    compat.raise_(
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/util/compat.py", line 208, in raise_
    raise exception
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 3483, in _flush
    flush_context.execute()
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/orm/unitofwork.py", line 453, in execute
    n.execute_aggregate(self, set_)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/orm/unitofwork.py", line 747, in execute_aggregate
    persistence.save_obj(
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/orm/persistence.py", line 237, in save_obj
    _emit_update_statements(
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/orm/persistence.py", line 975, in _emit_update_statements
    c = connection._execute_20(
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1631, in _execute_20
    return meth(self, args_10style, kwargs_10style, execution_options)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/sql/elements.py", line 332, in _execute_on_connection
    return connection._execute_clauseelement(
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1498, in _execute_clauseelement
    ret = self._execute_context(
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1862, in _execute_context
    self._handle_dbapi_exception(
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 2047, in _handle_dbapi_exception
    util.raise_(exc_info[1], with_traceback=exc_info[2])
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/util/compat.py", line 208, in raise_
    raise exception
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1819, in _execute_context
    self.dialect.do_execute(
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/engine/default.py", line 732, in do_execute
    cursor.execute(statement, parameters)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 82, in execute
    self.await_(_cursor.execute(operation, parameters))
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 68, in await_only
    return current.driver.switch(awaitable)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 121, in greenlet_spawn
    value = await result
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/aiosqlite/cursor.py", line 37, in execute
    await self._execute(self._cursor.execute, sql, parameters)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/aiosqlite/cursor.py", line 31, in _execute
    return await self._conn._execute(fn, *args, **kwargs)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/aiosqlite/core.py", line 129, in _execute
    return await future
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/starlette/middleware/errors.py", line 162, in __call__
    await self.app(scope, receive, _send)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/starlette/middleware/exceptions.py", line 75, in __call__
    raise exc
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/starlette/middleware/exceptions.py", line 64, in __call__
    await self.app(scope, receive, sender)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/fastapi/middleware/asyncexitstack.py", line 21, in __call__
    raise e
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/fastapi/middleware/asyncexitstack.py", line 18, in __call__
    await self.app(scope, receive, send)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/starlette/routing.py", line 680, in __call__
    await route.handle(scope, receive, send)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/starlette/routing.py", line 275, in handle
    await self.app(scope, receive, send)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/starlette/routing.py", line 65, in app
    response = await func(request)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/prefect/orion/utilities/server.py", line 101, in handle_response_scoped_depends
    response = await default_handler(request)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/fastapi/routing.py", line 231, in app
    raw_response = await run_endpoint_function(
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/fastapi/routing.py", line 160, in run_endpoint_function
    return await dependant.call(**values)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/prefect/orion/api/task_runs.py", line 198, in set_task_run_state
    orchestration_result = await models.task_runs.set_task_run_state(
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/prefect/orion/models/task_runs.py", line 307, in set_task_run_state
    async with contextlib.AsyncExitStack() as stack:
  File "/opt/homebrew/Cellar/python@3.10/3.10.6_1/Frameworks/Python.framework/Versions/3.10/lib/python3.10/contextlib.py", line 714, in __aexit__
    raise exc_details[1]
  File "/opt/homebrew/Cellar/python@3.10/3.10.6_1/Frameworks/Python.framework/Versions/3.10/lib/python3.10/contextlib.py", line 697, in __aexit__
    cb_suppress = await cb(*exc_details)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/prefect/orion/orchestration/rules.py", line 598, in __aexit__
    await self.after_transition(*exit_context)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/prefect/orion/orchestration/core_policy.py", line 169, in after_transition
    context.session, tags=context.run.tags
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/orm/attributes.py", line 481, in __get__
    return self.impl.get(state, dict_)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/orm/attributes.py", line 941, in get
    value = self._fire_loader_callables(state, key, passive)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/orm/attributes.py", line 972, in _fire_loader_callables
    return state._load_expired(state, passive)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/orm/state.py", line 712, in _load_expired
    self.manager.expired_attribute_loader(self, toload, passive)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/orm/loading.py", line 1451, in load_scalar_attributes
    result = load_on_ident(
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/orm/loading.py", line 407, in load_on_ident
    return load_on_pk_identity(
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/orm/loading.py", line 530, in load_on_pk_identity
    session.execute(
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 1711, in execute
    conn = self._connection_for_bind(bind)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 1549, in _connection_for_bind
    TransactionalContext._trans_ctx_check(self)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/engine/util.py", line 199, in _trans_ctx_check
    raise exc.InvalidRequestError(
sqlalchemy.exc.InvalidRequestError: Can't operate on closed transaction inside context manager.  Please complete the context manager before emitting further commands.
15:16:31.424 | ERROR   | Task run 'task_1-1da93ad6-1' - Crash detected! Execution was interrupted by an unexpected exception: Traceback (most recent call last):
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/prefect/orion/models/task_runs.py", line 316, in set_task_run_state
    await context.validate_proposed_state()
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/prefect/orion/database/dependencies.py", line 117, in async_wrapper
    return await fn(*args, **kwargs)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/prefect/orion/orchestration/rules.py", line 371, in validate_proposed_state
    await self._validate_proposed_state()
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/prefect/orion/database/dependencies.py", line 117, in async_wrapper
    return await fn(*args, **kwargs)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/prefect/orion/orchestration/rules.py", line 412, in _validate_proposed_state
    await self.session.flush()
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/ext/asyncio/session.py", line 403, in flush
    await greenlet_spawn(self.sync_session.flush, objects=objects)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 128, in greenlet_spawn
    result = context.switch(value)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 3383, in flush
    self._flush(objects)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 3522, in _flush
    with util.safe_reraise():
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
    compat.raise_(
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/util/compat.py", line 208, in raise_
    raise exception
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 3483, in _flush
    flush_context.execute()
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/orm/unitofwork.py", line 453, in execute
    n.execute_aggregate(self, set_)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/orm/unitofwork.py", line 747, in execute_aggregate
    persistence.save_obj(
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/orm/persistence.py", line 237, in save_obj
    _emit_update_statements(
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/orm/persistence.py", line 975, in _emit_update_statements
    c = connection._execute_20(
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1631, in _execute_20
    return meth(self, args_10style, kwargs_10style, execution_options)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/sql/elements.py", line 332, in _execute_on_connection
    return connection._execute_clauseelement(
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1498, in _execute_clauseelement
    ret = self._execute_context(
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1862, in _execute_context
    self._handle_dbapi_exception(
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 2047, in _handle_dbapi_exception
    util.raise_(exc_info[1], with_traceback=exc_info[2])
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/util/compat.py", line 208, in raise_
    raise exception
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1819, in _execute_context
    self.dialect.do_execute(
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/engine/default.py", line 732, in do_execute
    cursor.execute(statement, parameters)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 82, in execute
    self.await_(_cursor.execute(operation, parameters))
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 68, in await_only
    return current.driver.switch(awaitable)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 121, in greenlet_spawn
    value = await result
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/aiosqlite/cursor.py", line 37, in execute
    await self._execute(self._cursor.execute, sql, parameters)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/aiosqlite/cursor.py", line 31, in _execute
    return await self._conn._execute(fn, *args, **kwargs)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/aiosqlite/core.py", line 129, in _execute
    return await future
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

sqlalchemy.exc.InvalidRequestError: Can't operate on closed transaction inside context manager.  Please complete the context manager before emitting further commands.

15:16:31.539 | ERROR   | Flow run 'piquant-nautilus' - Crash detected! Execution was interrupted by an unexpected exception: anyio._backends._asyncio.ExceptionGroup: 2 exceptions were raised in the task group:
----------------------------
Traceback (most recent call last):
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1819, in _execute_context
    self.dialect.do_execute(
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/engine/default.py", line 732, in do_execute
    cursor.execute(statement, parameters)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 100, in execute
    self._adapt_connection._handle_exception(error)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 228, in _handle_exception
    raise error
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 82, in execute
    self.await_(_cursor.execute(operation, parameters))
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 68, in await_only
    return current.driver.switch(awaitable)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 121, in greenlet_spawn
    value = await result
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/aiosqlite/cursor.py", line 37, in execute
    await self._execute(self._cursor.execute, sql, parameters)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/aiosqlite/cursor.py", line 31, in _execute
    return await self._conn._execute(fn, *args, **kwargs)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/aiosqlite/core.py", line 129, in _execute
    return await future
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/aiosqlite/core.py", line 102, in run
    result = function()
sqlite3.OperationalError: database is locked

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/prefect/engine.py", line 908, in create_task_run_then_submit
    task_run = await create_task_run(
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/prefect/engine.py", line 953, in create_task_run
    task_run = await flow_run_context.client.create_task_run(
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/prefect/client/orion.py", line 1561, in create_task_run
    response = await self._client.post(
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/httpx/_client.py", line 1842, in post
    return await self.request(
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/httpx/_client.py", line 1527, in request
    return await self.send(request, auth=auth, follow_redirects=follow_redirects)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/prefect/client/base.py", line 159, in send
    await super().send(*args, **kwargs)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/httpx/_client.py", line 1614, in send
    response = await self._send_handling_auth(
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/httpx/_client.py", line 1642, in _send_handling_auth
    response = await self._send_handling_redirects(
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/httpx/_client.py", line 1679, in _send_handling_redirects
    response = await self._send_single_request(request)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/httpx/_client.py", line 1716, in _send_single_request
    response = await transport.handle_async_request(request)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/httpx/_transports/asgi.py", line 152, in handle_async_request
    await self.app(scope, receive, send)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/fastapi/applications.py", line 270, in __call__
    await super().__call__(scope, receive, send)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/starlette/applications.py", line 124, in __call__
    await self.middleware_stack(scope, receive, send)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/starlette/middleware/errors.py", line 184, in __call__
    raise exc
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/starlette/middleware/errors.py", line 162, in __call__
    await self.app(scope, receive, _send)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/starlette/middleware/cors.py", line 84, in __call__
    await self.app(scope, receive, send)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/starlette/middleware/exceptions.py", line 75, in __call__
    raise exc
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/starlette/middleware/exceptions.py", line 64, in __call__
    await self.app(scope, receive, sender)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/fastapi/middleware/asyncexitstack.py", line 21, in __call__
    raise e
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/fastapi/middleware/asyncexitstack.py", line 18, in __call__
    await self.app(scope, receive, send)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/starlette/routing.py", line 680, in __call__
    await route.handle(scope, receive, send)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/starlette/routing.py", line 427, in handle
    await self.app(scope, receive, send)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/fastapi/applications.py", line 270, in __call__
    await super().__call__(scope, receive, send)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/starlette/applications.py", line 124, in __call__
    await self.middleware_stack(scope, receive, send)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/starlette/middleware/errors.py", line 184, in __call__
    raise exc
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/starlette/middleware/errors.py", line 162, in __call__
    await self.app(scope, receive, _send)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/starlette/middleware/exceptions.py", line 75, in __call__
    raise exc
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/starlette/middleware/exceptions.py", line 64, in __call__
    await self.app(scope, receive, sender)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/fastapi/middleware/asyncexitstack.py", line 21, in __call__
    raise e
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/fastapi/middleware/asyncexitstack.py", line 18, in __call__
    await self.app(scope, receive, send)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/starlette/routing.py", line 680, in __call__
    await route.handle(scope, receive, send)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/starlette/routing.py", line 275, in handle
    await self.app(scope, receive, send)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/starlette/routing.py", line 65, in app
    response = await func(request)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/prefect/orion/utilities/server.py", line 101, in handle_response_scoped_depends
    response = await default_handler(request)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/fastapi/routing.py", line 231, in app
    raw_response = await run_endpoint_function(
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/fastapi/routing.py", line 160, in run_endpoint_function
    return await dependant.call(**values)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/prefect/orion/api/task_runs.py", line 49, in create_task_run
    model = await models.task_runs.create_task_run(
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/prefect/orion/database/dependencies.py", line 117, in async_wrapper
    return await fn(*args, **kwargs)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/prefect/orion/models/task_runs.py", line 59, in create_task_run
    await session.execute(insert_stmt)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/ext/asyncio/session.py", line 212, in execute
    result = await greenlet_spawn(
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 126, in greenlet_spawn
    result = context.throw(*sys.exc_info())
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 1712, in execute
    result = conn._execute_20(statement, params or {}, execution_options)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1631, in _execute_20
    return meth(self, args_10style, kwargs_10style, execution_options)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/sql/elements.py", line 332, in _execute_on_connection
    return connection._execute_clauseelement(
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1498, in _execute_clauseelement
    ret = self._execute_context(
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1862, in _execute_context
    self._handle_dbapi_exception(
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 2043, in _handle_dbapi_exception
    util.raise_(
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/util/compat.py", line 208, in raise_
    raise exception
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1819, in _execute_context
    self.dialect.do_execute(
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/engine/default.py", line 732, in do_execute
    cursor.execute(statement, parameters)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 100, in execute
    self._adapt_connection._handle_exception(error)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 228, in _handle_exception
    raise error
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 82, in execute
    self.await_(_cursor.execute(operation, parameters))
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 68, in await_only
    return current.driver.switch(awaitable)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 121, in greenlet_spawn
    value = await result
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/aiosqlite/cursor.py", line 37, in execute
    await self._execute(self._cursor.execute, sql, parameters)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/aiosqlite/cursor.py", line 31, in _execute
    return await self._conn._execute(fn, *args, **kwargs)
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/aiosqlite/core.py", line 129, in _execute
    return await future
  File "/Users/Galp88021/Documents/repo/prefect-2-flows/.venv-dask-for-snowflake-driver/lib/python3.10/site-packages/aiosqlite/core.py", line 102, in run
    result = function()
sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) database is locked
[SQL: INSERT INTO task_run (id, created, updated, name, run_count, total_run_time, task_key, dynamic_key, cache_key, cache_expiration, task_version, empirical_policy, task_inputs, tags, flow_run_id) VALUES (:id, :created, :updated, :name, :run_count, :total_run_time, :task_key, :dynamic_key, :cache_key, :cache_expiration, :task_version, :empirical_policy, :task_inputs, :tags, :flow_run_id) ON CONFLICT (flow_run_id, task_key, dynamic_key) DO NOTHING]
[parameters: {'id': 'cfe5c3fc-4663-4a8f-a85f-9b1b4aeb17f8', 'created': '2022-11-16 23:14:54.613154', 'updated': '2022-11-16 23:14:54.616339', 'name': 'sync_task-edd45147-0', 'run_count': 0, 'total_run_time': '1970-01-01 00:00:00.000000', 'task_key': '__main__.sync_task', 'dynamic_key': '0', 'cache_key': None, 'cache_expiration': None, 'task_version': None, 'empirical_policy': '{"max_retries": 0, "retry_delay_seconds": 0.0, "retries": 0, "retry_delay": 0}', 'task_inputs': '{"res": []}', 'tags': '[]', 'flow_run_id': 'ddb004c2-54a0-4845-91d3-11a42c936cfa'}]
(Background on this error at: https://sqlalche.me/e/14/e3q8)
zanieb commented 1 year ago

I believe this is resolved on main now.