Open dhirschfeld opened 4 years ago
There was some discussion of this on a Trio issue, https://github.com/python-trio/trio/issues/1089#issuecomment-521552829, and that thread noted the obstacle that a generator can't yield from an internal nursery. So you end up having to pass in an external nursery, or hop through a context manager.
Thanks for the link @belm0 - an interesting discussion! I don't think having to pass in a nursery is too onerous. Perhaps an api like below could be made to work?
async with trio.open_nursery() as nursery:
async for idx, outcome in as_completed(*async_fns, nursery=nursery):
nursery.start_soon(process_result, outcome.unwrap())
I thought I'd naively try to use trio_async_generator
to implement this but failed at the first hurdle. Haven't had time to look into it but thought I'd post in case it's obvious what I'm doing wrong.
from functools import partial
from typing import Any, AsyncIterable, Awaitable, Callable, Tuple
from trio_util import trio_async_generator
@trio_async_generator
async def as_completed(
*funcs: Callable[[], Awaitable[Any]]
) -> AsyncIterable[Tuple[int, Any]]:
async def yield_result(func: Callable[[], Awaitable[Any]], *, idx: int) -> None:
result = await func()
yield idx, result
async with trio.open_nursery() as nursery:
for idx, func in enumerate(funcs):
nursery.start_soon(
partial(yield_result, func, idx=idx)
)
async def f(x):
return x
async with as_completed(partial(f, 1), partial(f, 2)) as results:
async for idx, result in results:
print(idx, result)
Traceback (most recent call last):
File "C:\Users\dhirschf\envs\dev\lib\site-packages\trio_util\_trio_async_generator.py", line 50, in adapter
value = await agen.__anext__()
AttributeError: 'coroutine' object has no attribute '__anext__'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "C:\Users\dhirschf\envs\dev\lib\site-packages\IPython\core\interactiveshell.py", line 3435, in run_code
last_expr = (await self._async_exec(code_obj, self.user_ns))
File "<ipython-input-94-2025cd5b27a0>", line 6, in async-def-wrapper
File "C:\Users\dhirschf\envs\dev\lib\contextlib.py", line 177, in __aexit__
await self.gen.__anext__()
File "C:\Users\dhirschf\envs\dev\lib\site-packages\trio_util\_trio_async_generator.py", line 72, in wrapper
yield receive_channel
File "C:\Users\dhirschf\envs\dev\lib\site-packages\trio\_core\_run.py", line 813, in __aexit__
raise combined_error_from_nursery
File "C:\Users\dhirschf\envs\dev\lib\site-packages\trio_util\_trio_async_generator.py", line 68, in adapter
return
File "C:\Users\dhirschf\envs\dev\lib\site-packages\async_generator\_util.py", line 14, in __aexit__
await self._aiter.aclose()
AttributeError: 'coroutine' object has no attribute 'aclose'
I'm testing on py37 in case that makes a difference.
I think the body of as_completed()
itself needs a yield
to be a generator. The yield
in yield_result()
applies to that nested function (making it an async generator), rather than applying to as_completed()
.
Yeah, I figured that wouldn't work 🤦♂️
Will have to think how I can yield as results arrive 🤔
I guess the function results feed into a trio memory channel, and the main loop just yields incoming items?
I guess the function results feed into a trio memory channel, and the main loop just yields incoming items?
That sounds like it could work - will give it a go and report back!
I think an async-iterable function which took functions to execute asynchronously and yielded the results as an
Outcome
objects would be very useful.I find the
dask
as_completed
api to be very convenient and am looking for something similar in trio.I think it's a more fundamental primitive than either
wait_any
orwait_all
as (I think) both could be implemented with anasync def as_completed(*funcs) -> AsyncIterable
function.