tokio-rs / tokio

A runtime for writing reliable asynchronous applications with Rust. Provides I/O, networking, scheduling, timers, ...
https://tokio.rs
MIT License
25.42k stars 2.3k forks source link

Implement `join_all` and `try_join_all` on `JoinSet` #6664

Open FalkWoldmann opened 1 week ago

FalkWoldmann commented 1 week ago

Is your feature request related to a problem? Please describe. The futures crate currently provides two very convenient functions for awaiting an Iterator of futures with join_all and try_join_all. However, these functions are currently missing in JoinSet, requiring us to loop over the set manually.

Describe the solution you'd like I would like to see the addition of the aforementioned methods to JoinSet, as it would make its usage more ergonomic.

Additional context I am not quite sure what the best method definition is, since every task spawned by set the set could potentially fail with a JoinError. Here are some ideas I came up with:

join_all

Conservative

async fn join_all(mut self) -> Vec<Result<T, JoinError>>

This is the least opinionated approach, but also not that useful in my opinion. Granted, users could iterate again over the Vec, but then I am not really convinced by its utility over just constructing the initial while loop.

Filtered out JoinErrors

async fn join_all(mut self) -> Vec<T>

We could also just include all Okvariants, but this could be confusing because erroneous tasks are just silently ignored.

Opinionated join_all

async fn try_join_all(mut self) -> Result<Vec<T>, JoinError>

Just short-circuit as soon as one task failed. This is a lot more useful, but it could be quite confusing for the user, since it could also be understood as try_join_all

try_join_all

impl<T: 'static, K: 'static> JoinSet<Result<T, K>> {
    async fn try_join_all(mut self) -> Result<Vec<T>, TryJoinAllError<K>> {}
}

I would also like to have a try_join_all that reacts on some user provided Result, where TryJoinAllError Could be either K or JoinError.

In addition, I am not quite sure how do handle the remaining tasks in case of a panic or Error. Should we explicitly abort them, or just let them run? I would like to hear your thoughts on this.

SmnTin commented 1 week ago

join_all

try_join_all

Maybe return a nested result like Result<Result<Vec<T>, K>, JoinError> because one of the errors is "more local"?

The idea of JoinSet

In my opinion, JoinSet is the main tokio's primitive for structured concurrency:

I often find myself constructing the following loop to report errors and panics properly:

while let Some(join_result) = join_set.join_next().await {
    match join_result {
        Ok(result) => result?,
        Err(err) => panic::resume_unwind(err.into_panic()),
    }
}

This snippet assumes the tasks are not canceled except on join_set's drop.

With the proposed try_join_all, this can be rewritten as:

match join_set.try_join_all().await {
    Ok(_) => Ok(()),
    Err(TryJoinError::Err(err)) => return Err(err),
    Err(TryJoinError::JoinError(err)) => panic::resume_unwind(err.into_panic()),
}

It better conveys the idea but could be more concise.

SmnTin commented 1 week ago

Regarding this question:

In addition, I am not quite sure how to handle the remaining tasks in case of a panic or Error. Should we explicitly abort them, or just let them run? I would like to hear your thoughts on this.

All the methods you proposed consume the JoinSet, meaning it will be dropped, and the tasks will be aborted. As I have elaborated above, I like this behavior.

But I can see the confusion between the proposed try_join_all, which joins all the tasks but a task might return an error, and the existing try_join, which tries to join a finished task if there are any, and returns None, otherwise.

Darksonn commented 1 day ago

Hmm, I think it would make sense to have an opinionated join_all. It could even do something like this:

pub fn join_all(self) -> Vec<T> {
    let mut output = Vec::new();

    while let Some(out) = self.join_next().await {
        match out {
            Ok(res) => output.push(res),
            Err(err) if err.is_panic() => panic::resume_unwind(err.into_panic()),
            Err(err) => panic!("{err}"),
        }
    }

    output
}

Since this takes self, all tasks are cancelled on error.