python-trio / trio

Trio – a friendly Python library for async concurrency and I/O
https://trio.readthedocs.io
Other
6.06k stars 330 forks source link

trio.wait_any() #1089

Closed belm0 closed 4 years ago

belm0 commented 5 years ago

I'm going out on a limb: wait_any() should be part of the Trio package.

async def wait_any(*functions):
    """Wait until any of the given async functions are completed.

    Equivalent to creating a new nursery and calling `start_soon()` on
    each async function, where the first function to return will cause the
    nursery to be cancelled.
    """
    ...

The bar for utility-like things should be high for Trio. Points about wait_any():

It's actually detrimental to Trio to not have this available to all programs out of the box.

smurfix commented 5 years ago

Isn't that reasonably simple to write? This ten-liner should do it:

async def wait_any(*functions):
    result = None
    async with trio.open_nursery() as nursery:
        async def until_finished(proc):
            nonlocal result
            result = await proc()
            nursery.cancel_scope.cancel()
        for f in functions:
            await nursery.start(until_finished(f))
    return result

The devil is in the details, though, because that's not code – that's a pattern which you most likely need to flesh out with your own code if you do need it.

Also on that list: Futures; you can't get by without them when you write a client for an async network protocol, but the details WRT handling cancellation and aborts vary wildly, thus no generic common solution is so obvious as to merit inclusion into the core.

We might want to create a separate section in the docs, or even a separate repository, for that kind of thing.

belm0 commented 5 years ago

wait_any() is even simpler, not dealing with return values. My argument is that this particular function doesn't have anything to flesh out. It's empirically of high value as-is.

If something is that useful, then we want to eliminate any friction to using it. No copy & paste or separate package.

njsmith commented 5 years ago

We might want to create a separate section in the docs, or even a separate repository, for that kind of thing.

There's an example of essentially this in the docs already, and #472 is the issue for being more systematic about our doc examples. So I guess we can reserve this issue specifically for discussion on whether this should be built in to Trio.

I guess a Trio-iffic name would be run_race. Returning the winner's return value seems intuitively reasonable in a generic tool like this, since it would cover a broader range of cases without being any harder to use.

It seems plausible to me as a potential addition, but right now there's a backlog of foundational decisions to sort out that (Stream/Channel/Listener, #611, etc.), which seem more urgent b/c they can't be stuck in a 6-line utility function while they're waiting. So it might take a bit until I have the bandwidth to make a decision here.

@belm0 I can understand theoretically why this abstraction makes sense, but that's not a substitute for the experience of actually using it in real-world situations. Are you able to share any examples of how you've used this "in the wild"?

njsmith commented 5 years ago

Note: this is linked in the first message, but in case anyone missed it, this discourse post has some more stats on how @belm0's group uses wait_any (and a related wait_all).

belm0 commented 5 years ago

We haven't had a use case for return value from the winner. I guess the app is more about using concurrency for logic and control than data? As a name, run_race seems more obscure than wait_any.

Is my app an outlier as far as utility of this function? I tried looking for any large use of Trio in github as another sample, couldn't find anything.

Anyway, some use snippets from our project:

category: run task(s) in parallel along side a monitor which may abort them (very common)

async def wander_loop():
    """Advance when wander is enabled."""
    while True:
        await wander_enabled_event.wait_value(True)
        await wait_any(
            partial(advance_with_recovery, velocity_max),
            partial(wander_enabled_event.wait_value, False)
        )
# yelp until obstruction is cleared or wheels no longer deployed
await wait_any(
    partial(voice_mutate, ...),
    partial(locomotion.blocked_event.wait_value, False),
    partial(physical_state_event.wait_value, lambda val: val is not WHEELS_DEPLOYED)
)

category: just do stuff in parallel. There's a main task expected to exit first, the others are subsidiary. (As opposed to having a specific monitor to exit early.)

await wait_any(
    stretch_legs_animation,
    partial(play_voice, ...),
)

category: composing signals

await wait_any(
    object_near_left_wheel_event.wait_transition,
    object_near_right_wheel_event.wait_transition
)

That covers the roughly 200 cases.

mehaase commented 5 years ago

We have a pretty big Trio application that's open source: https://github.com/hyperiongray/starbelly

I can't think of any places where I would use a wait_any() function in this application. Back when this was written in asyncio, I had a wait_first() function that I used a lot (a wrapper around asyncio.wait() that cancels the unfinished tasks), but when I rewrote everything code in Trio, I found that I didn't need it anymore.

There is definitely value in having a library of high quality implementations for these kinds of primitives. I'm ambivalent about whether they should be part of Trio core or not.

Tronic commented 5 years ago

run_race and gather (run_all) with return values are quite useful because these patterns repeat often. However, because these are missing from Trio, I find myself usually implementing something slightly more tailored, like "happy eyeballs" style approach instead of instantly running all the tasks.

async for result in trio.run_parallel(...) could be a more flexible than having to choose between race and waiting for all (process responses as they arrive and break after you don't need more results). However, since task idx might be useful to map answers back to original ordering, and because one might not want exceptions to cancel everything, this quickly leads to something like async for idx, result, exception in trio.run_parallel(..., return_exceptions=True, return_index=True) which is already convoluted, and instead of this an awaitable proxy might be a better choice:

async for result in trio.run_parallel(func0("foo"), func1("bar")):
    try:
        value = await result
        print(f"func{result.idx} returned {value}")
    except RuntimeError:
        print(f"func{result.idx} had an error we'll ignore here")

This is not as simple as a one-liner call but still beats having to setup a nursery and write a task runner function that sends task return values back via a memory channel. And nothing stops adding ".any" and ".all" awaitables on the run_parallel return value, implementing the respective functions, if one does not wish to iterate over results.

Another open question, relevant also to run_race and gather, and apparently already discussed elsewhere, is how tasks would be presented to such function. In this example I used coro objects, which isn't trionic -- but the alternatives, either forbidding arguments[1], or requiring several function-argument tuples -- aren't too hot either.

[1] ... which leads to a hack I already use: nursery.start_soon(lambda: my_afunc(my_keyword_argument=True))

njsmith commented 5 years ago

@Tronic unfortunately, you can't hide a nursery inside an iterator, because an iterator can be abandoned at any time without warning, and because it becomes too ambiguous which code is inside the nursery's cancel scope. (See #264 and the many threads linked from there for more details, including multiple proposals for language changes... It's a whole thing.)

You could have an async with that gives a kind of "iterable nursery" – curio's TaskGroup class has some interesting ideas in that direction.

which leads to a hack I already use: nursery.start_soon(lambda: my_afunc(my_keyword_argument=True))

Heh, that's a clever trick! But I do think functools.partial is about as nice to use, and more idiomatic, and produces better debugging info (in particular trio can see through the partial object to set the task name to my_afunc instead of <lambda 0x12345678>).

Tronic commented 5 years ago

Exactly my thoughts... First I thought that it could be suddenly abandoned (and thought of abusing async with), but apparently GeneratorExit actually gets thrown inside as soon as the async for ends, so we are good on that department -- except as you say, nursery won't work inside the asyncgen, and I get "RuntimeError: async generator ignored GeneratorExit" and then TrioInternalErrors, presumably because the cleanup code in nursery would like to do a few more async steps, and that is not allowed when the generator is exiting.

Here is the working version, with nursery moved outside:

import trio

async def run_parallel(nursery, *coros):
    class Result:
        def __init__(self, idx, value = None, exc = None):
            self.idx = idx
            self.value = value
            self.exc = exc

        def __repr__(self):
            val = f"raise {self.exc!r}" if self.exc else f"return {self.value!r}"
            return f"<run_parallel.Result #{self.idx} {val}>"

        async def result(self):
            if self.exc: raise self.exc
            return self.value

        def __await__(self):
            return self.result().__await__()

    async def runner(sender, idx, coro):
        async with sender:
            try:
                await sender.send(Result(idx, await coro))
            except Exception as e:  # Regular exceptions only?
                await sender.send(Result(idx, exc=e))

    sender, receiver = trio.open_memory_channel(0)
    async with sender:
        for idx, coro in enumerate(coros):
            nursery.start_soon(runner, sender.clone(), idx, coro)
    async for result in receiver:
        try:
            yield result
        except GeneratorExit:
            nursery.cancel_scope.cancel()
            raise

async def mytask(delay, ret):
    await trio.sleep(delay)
    if isinstance(ret, Exception): raise ret
    return ret

async def main():
    tasks = mytask(3, Exception("task #0 not cancelled")), mytask(2, RuntimeError("task #1")), mytask(1, "I'm task #2")
    async with trio.open_nursery() as nursery:
        async for result in run_parallel(nursery, *tasks):
            print("Result object:", repr(result))
            # Any error handling here is optional
            try:
                print("Result value:", await result)
            except RuntimeError as e:
                print("Result exception:", repr(e))
                break

trio.run(main)

It is not entirely bad with an external nursery block, although 3+ lines and two indents is much more user code than a single-liner like await run_race(...). This also makes apparent how much boilerplate code is required to accomplish the fairly simple task of running things in parallel and obtaining the return values.

Tronic commented 5 years ago

And yes, it would seem if the async code running the async for gets cancelled, the asyncgen is not properly terminated either :(

Tronic commented 5 years ago

FWIW, a gather implementation that supports both coros and fn-arg tuples, with proper task names:

async def gather(*tasks, name="gather"):
    async def run_cr(idx, coro): results[idx] = await coro
    async def run_fn(idx, fn, *args): results[idx] = await fn(*args)

    results = len(tasks) * [None]
    async with trio.open_nursery() as nursery:
        for idx, task in enumerate(tasks):
            if inspect.isawaitable(task):
                n, runner = task.__qualname__, run_cr(idx, task)
            else:
                n, runner = task[0].__qualname__, run_fn(idx, *task)
            nursery.start_soon(lambda: runner, name=f"{name}[{idx}] {n}")
    return results
belm0 commented 5 years ago

wait_any() is now packaged in trio-util

While adding some documentation, I described wait_any's particular niche as follows:

wait_any() and wait_all() are used to simultaneously run async functions which either have side effects and don’t return a value, or signal merely by exiting

This seems to cover a wide number of use cases and gives these trivial implementations high value. A key observation is that the functions passed in are often heterogeneous.

We still don't have any cases in our project of running parallel homogenous things and needing return values. And for heterogeneous parallel calls with return values, the hand-coded nursery is king.

Tronic commented 5 years ago

Heterogeneous return values make sense in the context of wait_all (gather) but not wait_any. A hypothetical example:

article_likes, current_weather = await gather(
    (db.get_likes, ...),
    (asks.get, 'http://weather.com/')
)

In practice things like this usually end up being done sequentially because parallel async is too tedious (maybe not so with Trio nurseries but certainly with asyncio and even more so with non-async frameworks).

belm0 commented 4 years ago

I'll forgo asking for wait_any to be in the Trio lib and am OK with closing this bug. I realize wait_any() in the form I proposed is specific to domains that are more logic than data driven, or in applications using a side-effect style of programming.

oremanj commented 4 years ago

@belm0 thank you for packaging wait_any() in trio-util! I'm going to close this issue since it sounds like there's not anything left for us to decide here.

arthur-tacca commented 6 months ago

Just to add another alternative: There is a wait_any() implementation in my aioresult library. It will probably have different users to the original request and the one in trio-util though, it's more like the C# Task.WaitAny() function. Rather than cancelling all the other tasks, it returns the ResultCapture of the one that finishes and lets the others continue. You could use it get a similar effect to the original post by manually spawning and cancelling the nursery:

async with trio.open_nursery() as nursery:
    winner = await aioresult.wait_any([aioresult.ResultCapture.start_soon(nursery, my_func, i) for i in range(10)])
    nursery.cancel_scope.cancel()
print("Winning result:", winner.result())
belm0 commented 6 months ago

wait_any() is still our short-circuiting powerhouse and most always used with heterogeneous inputs

However since wait_any() doesn't propagate return values, we've observed users resorting to nonlocal variables to communicate which function ended the wait, result status, etc.

Currently I'm considering extending the wait_any() signature with optional kwargs, similar to what's done for compose_values(). Then the caller can explicitly opt-in to certain results, and overhead will only be incurred when kwargs are provided.

results = await wait_any(foo,      # We don't care about foo()'s return value.
                         bar=bar,  # If bar() ends the wait, capture its return value.
                         baz=baz)  # etc.
if results.bar: ...

Where results is a dynamic namedtuple if there are kwargs, else None. I'd like to have proper typing on the results members, but I doubt old Python (3.8) is up to the task.