tokio-rs / tokio

A runtime for writing reliable asynchronous applications with Rust. Provides I/O, networking, scheduling, timers, ...
https://tokio.rs
MIT License
26.77k stars 2.47k forks source link

Scoped tasks #3162

Closed jzrake closed 4 months ago

jzrake commented 3 years ago

I would love to be able to spawn non-'static tasks into the Tokio runtime, like so:

let local_data = LocalData::new();

let result = runtime.block_on_scoped(|scope| {

    let future_result = scope.spawn(async {
        local_data.result()
    });

    async {
        future_result.await.unwrap()
    }
});

This type of thing exists in Crossbeam: https://docs.rs/crossbeam/0.8.0/crossbeam/fn.scope.html Someone wrote this for an older version of Tokio: https://docs.rs/tokio-scoped/0.1.0/tokio_scoped

akhilles commented 3 years ago

related to https://github.com/tokio-rs/tokio/issues/2013

Darksonn commented 3 years ago

I suppose that this would be possible to add as a block_on variant.

jzrake commented 3 years ago

There is also this, which is no longer maintained: https://github.com/Snnappie/tokio-scoped.

jzrake commented 3 years ago

And this: https://docs.rs/async-scoped/0.5.1/async_scoped, although it looks like it's hard-coded to use the "current thread" runtime.

farnz commented 3 years ago

I have an initial attempt at "proper" scoped tasks in https://github.com/farnz/tokio/commit/0ce475868061e8620fe52fb0d9e026f7db039ac8, but I need advice to continue.

Specifically, I'm not clear on how drop should forcibly transition a task to completed state so that the future it's running can be dropped.

Do I need to think through the locking so that I can swap the future I'm trying to drop for a "dummy" future and let the runtime complete a detached task that no longer owns the future I want to drop, or have I missed a better way to handle it?

Darksonn commented 3 years ago

Sorry, scoped tasks cannot rely on Drop because you can use mem::forget to skip running the destructor. There is no way to write a scoped task API that can be used from async code. The only scoped async API you can write that is also safe is one that allows you to spawn scoped async tasks from non-async code. Creating the scope in async code is fundamentally not possible.

farnz commented 3 years ago

Sorry, scoped tasks cannot rely on Drop because you can use mem::forget to skip running the destructor. There is no way to write a scoped task API that can be used from async code. The only scoped async API you can write that is also safe is one that allows you to spawn scoped async tasks from non-async code. Creating the scope in async code is fundamentally not possible.

Do you mind expanding on this? I would have thought that the same issues apply to crossbeam scoped tasks, which also rely on Drop to avoid the scope being lost before the thread can exit.

I'm on Discord #tokio-dev if you'd rather talk me through it there.

Darksonn commented 3 years ago

The answer is that crossbeam does not rely on Drop for this. They use a scope function, and if any of the threads are still running when you return from the closure, the scope function will wait for those before scope returns.

farnz commented 3 years ago

FTR, the reason we can't do crossbeam type scoping from an async context is that crossbeam blocks the thread that calls scope for the duration of the scoped threads execution. Any async equivalent has to return state so that we can go back to the executor instead of blocking, and that state could be forgotten in error.

theRookieCoder commented 1 year ago

Push for this since scoped threads were added in Rust 1.63.0, and this feature is really awesome! Code is so much cleaner and readable without the use of Arcs and clone()s all over the place.

Darksonn commented 1 year ago

The possible scoped tasks that can be implemented in async Rust are very limited for technical reasons. The outer scope would have to be a block_on call, but they cannot be used from within the runtime.

theRookieCoder commented 1 year ago

Is this some sort of limitation with Rust that could be resolved in the future, or is it a limitation with how Tokio implements async?

NobodyXu commented 1 year ago

Is this some sort of limitation with Rust that could be resolved in the future, or is it a limitation with how Tokio implements async?

This is a problem of how future works. Since future is always cancellable, tokio just cannot guarantee that the task is done (.awaited) before scoped_spawn (or sth like that) returns since they can be just cancelled and destruct the variable referenced in the task.

To solve this problem, we either need linear type to disallow dropping and requires the future returned bby scoped_spawn to be .awaited until it is ready, or having a new trait for Future that requires to either not be polled, or polled until Ready.

Or, if supports defer! or async drop, it can just cancel the task and wait for it to be done.

Darksonn commented 1 year ago

It's a limitation in Rust itself.

theRookieCoder commented 1 year ago

Interesting, thanks for the clarity!

mattbork commented 1 year ago

The possible scoped tasks that can be implemented in async Rust are very limited for technical reasons. The outer scope would have to be a block_on call, but they cannot be used from within the runtime.

Does this mean that a variant of LocalSet::block_on like the block_on_scoped originally suggested is possible or is not possible? Because if even just that is possible, it doesn't seem too limiting to me. My use case is that I want to write single-threaded servers using async code, much as one does with Node.js. With the server being single-threaded and structured entirely as a local task set, mutable state can be shared among the tasks with only a RefCell wrapper, which is quite nice. In this case, all my async code is naturally run within LocalSet::block_on using a current-thread runtime, so a block_on_scoped here would be perfect.

Darksonn commented 1 year ago

A block_on_scoped is possible since it can, you know, block the thread. Though for the use-case you mention, you can easily achieve the same with an Rc.

npuichigo commented 1 year ago

What about the async scope (scoped spawn) design for C++'s p2300 sender/receiver async proposal? https://github.com/kirkshoop/async_scope/blob/main/asyncscope.md

As the paper says, since not every async-function has a clear chain of work to consume or block on the result, spawn is indeed useful. But the problem with spawn is that they provide unstructured concurrency. This is an unnecessary and unwelcome and undesirable property for concurrency. Using these algorithms leads to problems with lifetimes that are often 'fixed' using shared_ptr for ad-hoc garbage collection.

Structured Programming [Dahl72] transformed the software world by making it easier to reason about the code and build large software from simpler constructs. We want to achieve the same effect on concurrent programming by ensuring that we structure our concurrent code. The spawn algorithm fails the single entry, single exit point principle by behaving like a GOTO instruction. By calling spawn we essentially continue in two places: in the same function, and on different thread that executes the given work. Moreover, the lifetime of the work started by spawn cannot be bound to the local context. This will prevent local reasoning, which will make the program harder to understand.

To properly structure our concurrency, we need an abstraction that ensures that all the async-functions being spawned are attached to an enclosing async-function. This is the goal of scoped spawn.

@Darksonn

Darksonn commented 1 year ago

@npuichigo The problem we are facing here is that an async scope can be subject to mem::forget, which makes any async scope API unsound. It's not the kind of problem that a C++ proposal can help with.

npuichigo commented 1 year ago

I don't quite get the context. What do you mean we can't depend on Drop?

npuichigo commented 1 year ago

The answer is that crossbeam does not rely on Drop for this. They use a scope function, and if any of the threads are still running when you return from the closure, the scope function will wait for those before scope returns.

@Darksonn Can we build a crossbeam like spawn which does not rely on Drop but wait for completes?

Or something like:

let task = use_resources(
   |scope| {
      async {
         let data = get_data().await;
         for i in 0..10 {
             scope.spawn(do_work(data[i]));
         }
      }
   },
   AsyncScope::new()
)
rt::block_on(task);

spawn may be eager or lazy here, but will keep track of all the tasks it spawned and wait until they are completed.

Darksonn commented 1 year ago

Either the scope function is not async, in which case it will block the thread, or the scope function is async, in which case it will be subject to the mem::forget issue and be unsound.

Neither is really an acceptable solution in async code.

npuichigo commented 1 year ago

@Darksonn you mentioned here it's related to cancelation. Can u explain that? Thanks.

Darksonn commented 1 year ago

Let's say that you call scope, which borrows from a local variable called data. If the scope function is async, then it returns a future. You can poll that future once, which ensures that the background task is spawned. Then you can mem::forget the future returned by scope. Once its been forgotten, you can destroy data. However, the background task is still running, and if it tries to touch data, that's a use-after-free.

npuichigo commented 1 year ago

@Darksonn What about add one layer of abstraction like async resource to make sure attached resources like data will only be destroyed after all tasks complete.

rt::block_on(use_resource(
        |scope, data| async {
            scope.spawn(|| do_work(data));
        },
        AsyncScope::new(),
        data,
    ));
Darksonn commented 1 year ago

If you have ownership of data, then you can already implement something similar today with the existing APIs.

npuichigo commented 1 year ago

You are right @Darksonn , but tokio's spawn is eager only, which will start running in the background immediately when spawn is called, even if you don’t await the returned JoinHandle.

With scoped spawn, since the scope is the root of all spawned tasks, it can be lazy and only start the works when the parent future is polled.

In async scope proposal, it has three spawn methods spawn, spawn_future and nest

  1. spawn plays a fire and forget role and is eager. It's return value is void and the return value of async function will be disgarded.
  2. spawn_future is eager too, but will return a future, which can be used to await the result.
  3. nest is lazy, the async work will be started only when the root future is polled or be blocked on.

In terms of performance, nest() does not introduce any penalty. spawn() is more expensive than nest() as it needs to allocate memory for the operation. spawn_future() is even more expensive than spawn(); there's possible race condition needs to be resolved. nest() does not require allocations, so it can be used in a free-standing environment.

That is what the advantage of structured concurrency I mentioned before. Unstructured concurrency like fire-and-forget leads to problems with lifetimes that are often 'fixed' using reference count or something else.

Darksonn commented 1 year ago

The nest method sounds similar to FuturesUnordered and similar utilities from the futures crate. They don't enable parallelism, since everything runs within the same task, but is the closest you can get to scoped tasks.

npuichigo commented 1 year ago

The nest still allow parallelism. The future returned by a call to nest() holds a reference to the async_scope in order to actually spawn the task for scheduling when it is started. Start the future returned from nest() will start the given future and add it to the count of futures that the async_scope object will require to complete before it will destruct.

Darksonn commented 1 year ago

Even if you prevent the scope from being destroyed with a reference count, that's not enough. Consider:

use std::sync::Arc;

fn main() {
    let my_data = Box::new(10);

    // Some reference counted object that internally contains a pointer
    // to the allocation of `my_data`.
    let my_arc = Arc::new(&*my_data);

    // Forgetting the arc leaks the reference counted object, and it lives
    // forever.
    std::mem::forget(my_arc);

    // However, we can still free the allocation of `my_data`.
    drop(my_data);
}

If you've given away a clone of my_arc to another thread before you forget it, then you're now in trouble. That thread could use the value after my_data is dropped.

npuichigo commented 1 year ago

You mean you are not the owner of data? Since the scope must outlive all the tasks' completion (async scope's implementation requirements), can we just add lifetime restriction on data to outlive the lifetime of scope?

npuichigo commented 1 year ago

It seems that the spawn in async-task is lazy and only schedule if you ask. What's the difference between the lazy/eager scheduling implementation?

Darksonn commented 1 year ago

can we just add lifetime restriction on data to outlive the lifetime of scope?

Lifetime annotations can only talk about durations in which variables are borrowed. You can make the scope borrow from data to ensure that data stays alive, but that borrow only lasts until the last use of the scope, which mem::forget counts as.

npuichigo commented 1 year ago

Since It's a limitation in Rust itself. Does Rust have any rfc or maybe something related to async drop which may address this?

NobodyXu commented 1 year ago

Since It's a limitation in Rust itself. Does Rust have any rfc or maybe something related to async drop which may address this?

Other than async drop, the most reasonable addition is probably linear type, which enforces must_use or must_use exactly once logic.

npuichigo commented 1 year ago

Is this some sort of limitation with Rust that could be resolved in the future, or is it a limitation with how Tokio implements async?

This is a problem of how future works. Since future is always cancellable, tokio just cannot guarantee that the task is done (.awaited) before scoped_spawn (or sth like that) returns since they can be just cancelled and destruct the variable referenced in the task.

To solve this problem, we either need linear type to disallow dropping and requires the future returned bby scoped_spawn to be .awaited until it is ready, or having a new trait for Future that requires to either not be polled, or polled until Ready.

Or, if supports defer! or async drop, it can just cancel the task and wait for it to be done.

@NobodyXu sounds like the operation_state in P2300 which describe the whole async work. It can either not start or start to completion/error/cancelation.

NobodyXu commented 1 year ago

@NobodyXu sounds like the operation_state in P2300 which describe the whole async work. It can either not start or start to completion/error/cancelation.

I haven't read the paper, but its true that idea of linear type is to make sure the object created is always consumed by associated functions.

It doesn't have to poll completion, it can stop in middle and call async_drop or other function that consumed the object.

Must-complete is another linear type that is useful but less general since cancellation will be much harder.

Another big difference between Rust and C++ is that Rust guarantees that you can't get UBs in safe code, meaning they must enforce the must_consume or must_poll_to_complete, that's part of the reason why it's taking so long.

Another reason is that Rust language development is also open-source, unlike C++ which lock it behind committee which you have to pay and attend meeting to access.

It generates much more discussion and often discover flaws, combined with first nightly, then slow stabilisation, it usually takes longer and is better done than C++.

Just checkout C++'s complex overload rules, initialization rules and extreme complex ways of coroutine works (it seems to involve assmebly to define how task/context switching is done), I would rather to move slower than getting flaws into languages which can't break.

npuichigo commented 1 year ago

@NobodyXu Thanks. So maybe that's the reason why async-task returns an runnable which delays the scheduling to make sure the spawn return first?

// A future that will be spawned.
let future = async { 1 + 2 };

// A function that schedules the task when it gets woken up.
let schedule = move |runnable| sender.send(runnable).unwrap();

// Construct a task.
let (runnable, task) = async_task::spawn(future, schedule);

// Push the task into the queue by invoking its schedule function.
runnable.schedule();
NobodyXu commented 1 year ago

@npuichigo Yes, without linear type or async drop, the only way to do this safely is to block the thread in a similar manner to std::thread::scope.

npuichigo commented 1 year ago

Thanks. This blog gives me some insights and is useful to me https://carllerche.com/2021/06/17/six-ways-to-make-async-rust-easier/

AurevoirXavier commented 4 months ago

We would really like an async version of std::thread::scope.

If this could be a part of tokio it would be awesome.

https://docs.rs/async-scoped/latest/async_scoped/

Darksonn commented 4 months ago

Scoped tasks are impossible. There's a good blog post about it: The Scoped Task trilemma

As for the tokio-scoped crate, that does not do what you think it does. It blocks the thread, which means that it's only suitable for use from outside from a runtime. And the async-scoped crate requires unsafe, which you are extremely likely to get wrong in the face of cancellation.

I'm going to close this. We can reopen if the Rust language ever gets a feature that makes it possible.