Right now asyncio support in accelerated DAGs uses a slightly different codepath from normal DAGs:
you need to specify enable_asyncio=True in dag.experimental_compile()
you need to use dag.execute_async
you need to await twice, once for the execution and again for the output
This is because asyncio execution currently starts a background thread to read/write channels, so we need these extra APIs for coordinating with the background thread. With timeout support for channel reads and writes (#46259), we might be able to remove some of these APIs and support asyncio with single-threaded execution, using mostly the same codepath as non-asyncio execution.
Description
Right now asyncio support in accelerated DAGs uses a slightly different codepath from normal DAGs:
enable_asyncio=True
indag.experimental_compile()
dag.execute_async
await
twice, once for the execution and again for the outputThis is because asyncio execution currently starts a background thread to read/write channels, so we need these extra APIs for coordinating with the background thread. With timeout support for channel reads and writes (#46259), we might be able to remove some of these APIs and support asyncio with single-threaded execution, using mostly the same codepath as non-asyncio execution.
Use case
No response