PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
16.31k stars 1.59k forks source link

RuntimeError: Timeout context manager should be used inside a task when using aiohttp and task.submit #12857

Open carlosjourdan opened 6 months ago

carlosjourdan commented 6 months ago

First check

Bug summary

When trying to use an aiohttp.ClientSession inside a Prefect task that has been called with the submit() method on a task, I get a "RuntimeError: Timeout context manager should be used inside a task" message.

Reproduction

import aiohttp 
import asyncio
from prefect import flow, task
from prefect.utilities.asyncutils import add_event_loop_shutdown_callback
from prefect.task_runners import ConcurrentTaskRunner

@flow(task_runner=ConcurrentTaskRunner)
async def my_flow():
    session = aiohttp.ClientSession()

    for url in ['https://www.google.com','https://www.microsoft.com']:
        data = await download_file.submit(session, url)
        do_something_with_data.submit(data)

    await add_event_loop_shutdown_callback(session.close)

@task()
async def download_file(session, url):
    async with session.get(url) as resp:
        assert resp.status == 200
        return await resp.read()

@task()
def do_something_with_data(data):
    ...

if __name__ == '__main__':
    asyncio.run(my_flow())

Error

21:04:11.032 | INFO    | prefect.engine - Created flow run 'stoic-warthog' for flow 'my-flow'
21:04:11.167 | INFO    | Flow run 'stoic-warthog' - Created task run 'download_file-1' for task 'download_file'
21:04:11.170 | INFO    | Flow run 'stoic-warthog' - Submitted task run 'download_file-1' for execution.
21:04:11.207 | INFO    | Flow run 'stoic-warthog' - Created task run 'download_file-0' for task 'download_file'
21:04:11.209 | INFO    | Flow run 'stoic-warthog' - Submitted task run 'download_file-0' for execution.
21:04:11.317 | ERROR   | Task run 'download_file-1' - Encountered exception during execution:
Traceback (most recent call last):
  File "D:\ace\acedatatools\AceDataTools\.venv\lib\site-packages\prefect\engine.py", line 2099, in orchestrate_task_run
    result = await call.aresult()
  File "D:\ace\acedatatools\AceDataTools\.venv\lib\site-packages\prefect\_internal\concurrency\calls.py", line 327, in aresult
    return await asyncio.wrap_future(self.future)
  File "D:\ace\acedatatools\AceDataTools\.venv\lib\site-packages\prefect\_internal\concurrency\calls.py", line 389, in _run_async
    result = await coro
  File "D:\ace\acedatatools\AceDataTools\local\task_examples.py", line 18, in download_file
    async with session.get(url) as resp:
  File "D:\ace\acedatatools\AceDataTools\.venv\lib\site-packages\aiohttp\client.py", line 1197, in __aenter__
    self._resp = await self._coro
  File "D:\ace\acedatatools\AceDataTools\.venv\lib\site-packages\aiohttp\client.py", line 507, in _request
    with timer:
  File "D:\ace\acedatatools\AceDataTools\.venv\lib\site-packages\aiohttp\helpers.py", line 715, in __enter__
    raise RuntimeError(
RuntimeError: Timeout context manager should be used inside a task
21:04:11.377 | INFO    | Flow run 'stoic-warthog' - Created task run 'do_something_with_file-1' for task 'do_something_with_file'
21:04:11.379 | INFO    | Flow run 'stoic-warthog' - Submitted task run 'do_something_with_file-1' for execution.
21:04:11.412 | ERROR   | Task run 'download_file-0' - Encountered exception during execution:
Traceback (most recent call last):
  File "D:\ace\acedatatools\AceDataTools\.venv\lib\site-packages\prefect\engine.py", line 2099, in orchestrate_task_run
    result = await call.aresult()
  File "D:\ace\acedatatools\AceDataTools\.venv\lib\site-packages\prefect\_internal\concurrency\calls.py", line 327, in aresult
    return await asyncio.wrap_future(self.future)
  File "D:\ace\acedatatools\AceDataTools\.venv\lib\site-packages\prefect\_internal\concurrency\calls.py", line 389, in _run_async
    result = await coro
  File "D:\ace\acedatatools\AceDataTools\local\task_examples.py", line 18, in download_file
    async with session.get(url) as resp:
  File "D:\ace\acedatatools\AceDataTools\.venv\lib\site-packages\aiohttp\client.py", line 1197, in __aenter__
    self._resp = await self._coro
  File "D:\ace\acedatatools\AceDataTools\.venv\lib\site-packages\aiohttp\client.py", line 507, in _request
    with timer:
  File "D:\ace\acedatatools\AceDataTools\.venv\lib\site-packages\aiohttp\helpers.py", line 715, in __enter__
    raise RuntimeError(
RuntimeError: Timeout context manager should be used inside a task
21:04:11.435 | ERROR   | Task run 'download_file-1' - Finished in state Failed('Task run encountered an exception RuntimeError: Timeout context manager should be used inside a task')
21:04:11.483 | ERROR   | Task run 'download_file-0' - Finished in state Failed('Task run encountered an exception RuntimeError: Timeout context manager should be used inside a task')
21:04:11.548 | INFO    | Flow run 'stoic-warthog' - Created task run 'do_something_with_file-0' for task 'do_something_with_file'
21:04:11.551 | INFO    | Flow run 'stoic-warthog' - Submitted task run 'do_something_with_file-0' for execution.
21:04:11.624 | ERROR   | Flow run 'stoic-warthog' - Finished in state Failed('2/4 states failed.')
Traceback (most recent call last):
  File "D:\ace\acedatatools\AceDataTools\local\task_examples.py", line 28, in <module>
    asyncio.run(my_flow())
  File "C:\Users\carlo\AppData\Local\Programs\Python\Python39\lib\asyncio\runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "C:\Users\carlo\AppData\Local\Programs\Python\Python39\lib\asyncio\base_events.py", line 647, in run_until_complete
    return future.result()
  File "D:\ace\acedatatools\AceDataTools\.venv\lib\site-packages\prefect\_internal\concurrency\api.py", line 150, in wait_for_call_in_loop_thread
    return call.result()
  File "D:\ace\acedatatools\AceDataTools\.venv\lib\site-packages\prefect\_internal\concurrency\calls.py", line 318, in result
    return self.future.result(timeout=timeout)
  File "D:\ace\acedatatools\AceDataTools\.venv\lib\site-packages\prefect\_internal\concurrency\calls.py", line 179, in result
    return self.__get_result()
  File "C:\Users\carlo\AppData\Local\Programs\Python\Python39\lib\concurrent\futures\_base.py", line 391, in __get_result
    raise self._exception
  File "D:\ace\acedatatools\AceDataTools\.venv\lib\site-packages\prefect\_internal\concurrency\calls.py", line 389, in _run_async
    result = await coro
  File "D:\ace\acedatatools\AceDataTools\.venv\lib\site-packages\prefect\client\utilities.py", line 78, in with_injected_client
    return await fn(*args, **kwargs)
  File "D:\ace\acedatatools\AceDataTools\.venv\lib\site-packages\prefect\engine.py", line 394, in create_then_begin_flow_run
    return await state.result(fetch=True)
  File "D:\ace\acedatatools\AceDataTools\.venv\lib\site-packages\prefect\states.py", line 91, in _get_state_result
    raise await get_state_exception(state)
  File "D:\ace\acedatatools\AceDataTools\.venv\lib\site-packages\prefect\engine.py", line 2099, in orchestrate_task_run
    result = await call.aresult()
  File "D:\ace\acedatatools\AceDataTools\.venv\lib\site-packages\prefect\_internal\concurrency\calls.py", line 327, in aresult
    return await asyncio.wrap_future(self.future)
  File "D:\ace\acedatatools\AceDataTools\.venv\lib\site-packages\prefect\_internal\concurrency\calls.py", line 389, in _run_async
    result = await coro
  File "D:\ace\acedatatools\AceDataTools\local\task_examples.py", line 18, in download_file
    async with session.get(url) as resp:
  File "D:\ace\acedatatools\AceDataTools\.venv\lib\site-packages\aiohttp\client.py", line 1197, in __aenter__
    self._resp = await self._coro
  File "D:\ace\acedatatools\AceDataTools\.venv\lib\site-packages\aiohttp\client.py", line 507, in _request
    with timer:
  File "D:\ace\acedatatools\AceDataTools\.venv\lib\site-packages\aiohttp\helpers.py", line 715, in __enter__
    raise RuntimeError(
RuntimeError: Timeout context manager should be used inside a task

Versions

Version:             2.18.0
API version:         0.8.4
Python version:      3.9.13
Git commit:          1006d2d8
Built:               Thu, Apr 18, 2024 4:47 PM
OS/Arch:             win32/AMD64
Profile:             default
Server type:         ephemeral
Server:
  Database:          sqlite
  SQLite version:    3.37.2

Additional context

No response

zzstoatzz commented 6 months ago

hi @carlosjourdan - thanks for the issue

I think there are a couple things here

@task()
async def download_file(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            assert resp.status == 200
            return await resp.read()
A full working example ```python import asyncio import aiohttp from prefect import flow, task from prefect.task_runners import ConcurrentTaskRunner @flow(task_runner=ConcurrentTaskRunner) async def my_flow(): for url in ["https://www.google.com", "https://www.microsoft.com"]: data = await download_file.submit(url) do_something_with_data.submit(data) @task() async def download_file(url): async with aiohttp.ClientSession() as session: async with session.get(url) as resp: assert resp.status == 200 return await resp.read() @task() def do_something_with_data(data): ... if __name__ == "__main__": asyncio.run(my_flow()) ```

If you have any further questions, let me know. otherwise I will close this issue

carlosjourdan commented 6 months ago

Hi @zzstoatzz , thanks for the fast reply. I still have two questions, if you don't mind:

carlosjourdan commented 6 months ago

Just saw that #11930 was reverted by #12054. So this leaves only the question about the RuntimeError: Event loop is closed message.

zzstoatzz commented 6 months ago

hi @carlosjourdan - let me look into that Event loop is closed. I don't remember seeing that but I will see if I can reproduce

carlosjourdan commented 6 months ago

Thanks @zzstoatzz . FYI, I'm able to reproduce the error using your "full working example" code on a windows machine with fresh venv in Python 3.9.13. But the code works fine on Python 3.11.7.

Here is the output of pip freeze for reference:

aiohttp==3.9.5
aiosignal==1.3.1
aiosqlite==0.20.0
alembic==1.13.1
annotated-types==0.6.0
anyio==3.7.1
apprise==1.7.6
asgi-lifespan==2.1.0
async-timeout==4.0.3
asyncpg==0.29.0
attrs==23.2.0
cachetools==5.3.3
certifi==2024.2.2
cffi==1.16.0
charset-normalizer==3.3.2
click==8.1.7
cloudpickle==3.0.0
colorama==0.4.6
coolname==2.2.0
croniter==2.0.5
cryptography==42.0.5
dateparser==1.2.0
dnspython==2.6.1
docker==6.1.3
email_validator==2.1.1
exceptiongroup==1.2.1
frozenlist==1.4.1
fsspec==2024.3.1
google-auth==2.29.0
graphviz==0.20.3
greenlet==3.0.3
griffe==0.44.0
h11==0.14.0
h2==4.1.0
hpack==4.0.0
httpcore==1.0.5
httpx==0.27.0
humanize==4.9.0
hyperframe==6.0.1
idna==3.7
importlib_metadata==7.1.0
importlib_resources==6.1.3
itsdangerous==2.2.0
Jinja2==3.1.3
jinja2-humanize-extension==0.4.0
jsonpatch==1.33
jsonpointer==2.4
jsonschema==4.21.1
jsonschema-specifications==2023.12.1
kubernetes==29.0.0
Mako==1.3.3
Markdown==3.6
markdown-it-py==3.0.0
MarkupSafe==2.1.5
mdurl==0.1.2
multidict==6.0.5
oauthlib==3.2.2
orjson==3.10.1
packaging==24.0
pathspec==0.12.1
pendulum==2.1.2
prefect==2.18.1
pyasn1==0.6.0
pyasn1_modules==0.4.0
pycparser==2.22
pydantic==2.7.1
pydantic_core==2.18.2
Pygments==2.17.2
python-dateutil==2.9.0.post0
python-multipart==0.0.9
python-slugify==8.0.4
pytz==2024.1
pytzdata==2020.1
pywin32==306
PyYAML==6.0.1
readchar==4.0.6
referencing==0.35.0
regex==2024.4.28
requests==2.31.0
requests-oauthlib==2.0.0
rfc3339-validator==0.1.4
rich==13.7.1
rpds-py==0.18.0
rsa==4.9
ruamel.yaml==0.18.6
ruamel.yaml.clib==0.2.8
shellingham==1.5.4
six==1.16.0
sniffio==1.3.1
SQLAlchemy==2.0.29
text-unidecode==1.3
toml==0.10.2
typer==0.12.3
typing_extensions==4.11.0
tzdata==2024.1
tzlocal==5.2
ujson==5.9.0
urllib3==2.2.1
uvicorn==0.28.1
websocket-client==1.8.0
websockets==12.0
yarl==1.9.4
zipp==3.18.1
zzstoatzz commented 6 months ago

hi @carlosjourdan - thanks for the context! I have reproduced this specifically on older python in windows

I will add this issue to the backlog. We are doing work to simplify our use of threads in the prefect engine, so hopefully that should help fix / diagnose this.