Closed GallagherCommaJack closed 5 years ago
I don't know what exactly you're trying to do, but I'd like to point out that stackful coroutines are inherently incompatible with work-stealing in Rust. The problem in Corona is not using refcells and thread locals. That's just consequence of the fact that it makes no sense to use thread-aware primitives.
The problem is, if you ever move something that is !Send
between two threads, you get Undefined Behaviour. And Rust doesn't check what lives on the stack, so you always have to be conservative about what lives there and consider the stack to be !Send
. There's therefore no safe way in Rust to migrate a coroutine from one thread to another once it lives.
Furthermore, Corona mostly exists as a temporary solution, until the real async-await lands.
You can mix & match coroutines and futures on the same single-threaded executor, if you don't really need multiple threads or you can have multiple threads each with its own single-threaded executor. Before a coroutine is born, you pick one of the threads and send the function there and create the coroutine on that executor. But once it starts, it sticks to the same thread. You can probably improve the ergonomics of the latter somehow, but this can be built on top, without thread-aware mechanisms inside corona itself.
Also note that while the builder can be stored inside the thread local storage, this is only convenience interface. You can always pass a Coroutine around explicitly and call spawn on that. And that one is even Clone + Send + Sync, so you could pick a thread to execute on, bundle a copy of Coroutine with the function and send the bundle to the thread to create the coroutine there.
...you pick one of the threads and send the function there and create the coroutine on that executor. But once it starts, it sticks to the same thread. You can probably improve the ergonomics of the latter somehow, but this can be built on top, without thread-aware mechanisms inside corona itself.
Yeah this sounds much closer to what I want to do here. Work stealing doesn't seem as important as ergonomic multi-core at all. Seems similar in spirit to how may
spawns threads (where the closure is required to implement Send
).
Furthermore, Corona mostly exists as a temporary solution, until the real async-await lands.
Because Rust doesn't have monads I find coroutines to often be a good bit easier to write/think about than async/await, so I'm somewhat motivated to work on this anyway.
Probably the right thing is going to be to set up an MPMC queue of closures, though I'll have to think about the right way to measure load on a thread.
Work stealing doesn't seem as important as ergonomic multi-core at all. Seems similar in spirit to how may spawns threads (where the closure is required to implement Send).
The last time I looked, may
did require the closure to be Send
, but still did work stealing despite the fact it is a potential disaster. But it's true I've looked only during its starting days and maybe they stopped doing that.
Probably the right thing is going to be to set up an MPMC queue of closures, though I'll have to think about the right way to measure load on a thread.
I'd suggest the easiest way might be to:
.foreach(|task| spawn(task))
on the endThat would allow to first settle on how the API would look like and then worry about making it more fair or balanced.
As for measuring the load, with custom-built runtime (inserting a layer just on top of the reactor) could tell you how much time it spends sleeping vs. how much time it spends running something. The other option is have some kind of futures-aware MPMS (I don't know if these exist) and the first thread to actually be able to pull a task gets it ‒ the less busy ones will be faster to pick them up.
If this turned out nicely, I think I'd be OK with this living directly in corona, if you preferred.
As for measuring the load, with custom-built runtime (inserting a layer just on top of the reactor) could tell you how much time it spends sleeping vs. how much time it spends running something. The other option is have some kind of futures-aware MPMS (I don't know if these exist) and the first thread to actually be able to pull a task gets it ‒ the less busy ones will be faster to pick them up.
There are some easy ways to implement something like that which wouldn't be very performant (e.g. futures_locks::Mutex<VecDeque<Job>>
, though there are probably better options.
I may not get around to building this in a sufficiently fleshed out way for a few weeks, but I'll definitely submit a pull request when I do. Depends on how my work/leisure coding balance ends up shaking out :)
Ok, I now have some capacity to work on this - here's my guess re how best to implement it:
1) spawn n
threads, each running a tokio_current_thread
runtime
2) on each thread, create a tokio::sync::mpsc
channel of closures, which we tokio::spawn
a task to consume and coroutine::spawn
each task from it.
3) spawn a global task manager thread, which uses a crossbeam::queue::SeqQueue
to receive tasks, and assigns them in a round-robin to the tokio
threads using their mpsc
channels (might be possible to do something smarter by preferentially assigning to unblocked threads, but the stupid solution is often good enough).
4) create a new function coroutine::spawn_global
which takes a Send
closure and schedules it on the global system
hmm, so it looks like tokio-io-pool
has already ~implemented this. is there an easy way to adapt corona
to just use that?
For the first, few gut-feeling things:
I'd also think it would be nice to have some kind of handle to that set of threads ‒ currently, corona let's you either do corona::run
, when it starts everything, but it also allows for manually using whatever runtime the user hands it. It would be nice to be still able to do that even with the multi-threaded case.
As for the tokio-io-pool, I'll have to study it a bit and I don't have the time today, I'll get to it in few days. But from a quick glance it doesn't seem to offer a way to spawn on a specific thread/get a handle that spawns on the same thread. But maybe the pool could be extended to allow that?
- This seems unnecessary. Having a vec of mpsc senders + one atomic counter incremented on each use and modulo by the number of senders would be enough and wouldn't need a dedicated thread.
Yep, that is much more sensible.
I'd also think it would be nice to have some kind of handle to that set of threads ‒ currently, corona let's you either do corona::run, when it starts everything, but it also allows for manually using whatever runtime the user hands it. It would be nice to be still able to do that even with the multi-threaded case.
This can probably be done by hiding the runtime behind a trait with a spawn_coro
method.
Actually, one interesting possible approach would be to reorganize the rest of the API around a trait like that, which has spawn_send_coro
and spawn_coro
methods, as well as (if there's a way to make this safe) a spawn_stealable_coro
method (though that one really sketches me out - maybe closures that are Send+Sync
can be stolen?)
This can probably be done by hiding the runtime behind a trait with a spawn_coro method.
I was already assuming that :-). However, I'm still not sure I know what to do in that trait.
In particular, this needs a task executor that can spawn on the same thread I'm currently running on:
https://github.com/vorner/corona/blob/master/src/switch.rs#L244
Because I might be waiting on non-send future that's sitting on my stack. I don't think I've seen such method in the pool's API. But I haven't looked properly.
There are no safely stealable coroutines in Rust, fullstop 😇. Rust doesn't consider the possibility of a stack moving from one thread to another. If ever that stack contains !Send
something and it moves, it means undefined behaviour.
I'm starting to wonder, just maybe it'll just work. The TaskExecutor
thing I link actually accesses a thread-local storage. The tokio-io-pool will likely be setting it up.
There are no safely stealable coroutines in Rust, fullstop innocent. Rust doesn't consider the possibility of a stack moving from one thread to another. If ever that stack contains !Send something and it moves, it means undefined behaviour.
Ah, right, and a closure being send
doesn't necessarily mean everything in the body is Send
, just that the struct representing it is.
I'm starting to wonder, just maybe it'll just work. The TaskExecutor thing I link actually accesses a thread-local storage. The tokio-io-pool will likely be setting it up.
tokio-io-pool
just calls current_thread::Runtime::new()
, which AIUI is exactly what's required for Corona. The (less general, but working at all) implementation is probably to use the after_start
method to create the runtime.
I was already assuming that :-). However, I'm still not sure I know what to do in that trait.
The stupid option is that it has one method, spawn(&self, coro: Coroutine, job: Task<R>) -> CoroutineResult<R>
, as well as possibly a spinup method (which maybe takes a coroutine builder?). What else do you think it ought to be able to do?
The stupid option is that it has one method, spawn(&self, coro: Coroutine, job: Task
) -> CoroutineResult
Sorry, I think I confused you a bit more than necessary 😇. I was worrying about how to do the spawning inside the pool. But as I said above, it might just work, so I was probably just expecting problems where there are none.
I guess it should be possible to simply spawn a future inside the pool. That future will land in one of the threads and it can start the actual coroutine on that thread. Maybe there are better options than that, but I think this makes it possible.
I think I'll just leave you to do the experiments and see where it goes :-). If you need help or you want to show some work in progress solution, don't hesitate to ask.
Ok, I've got a preliminary implementation using tokio-io-pool
up on my fork (relevant commit here). I haven't tested it yet (planning to do that tomorrow), but it typechecks, and I put decent odds it'll actually work.
Things I still need to do:
1) make sure it works, because duh
2) make a Runtime
trait, replace the coroutine::run
feature with another implementation of it
3) hide this behind a feature flag
4) documentation that's even halfway up to the standard you set in the rest of the code
5) implement smarter scheduling - I think we can do something smarter than tokio-io-pool
by having a queue which allows free threads to steal jobs that other threads haven't gotten around to starting on, and are thus still just Send
closures which are safe to steal.
Once they're stabilized I'll probably also update the library to use std::future
futures instead of futures::future
futures.
Ah it doesn't work - AFAICT the after_start
method runs before the runtime starts, not after. After some initial attempts at a fix, I think it'll actually be easier to just rig up the thread pool myself. We'll see how that goes...
EDIT: nevermind, there was just a downcasting issue - initial test is passing on latest commit!
That's nice to hear :-).
I think it's fine to go through a review & merge the first version before all the documentation and stuff is added (though the feature flag would probably be a nice thing). In general, I prefer reviewing and merging smaller chunks if possible.
As for the Coroutine::run ‒ I'd like that one to still stay single-threaded. But I'm fine with either some kind of Coroutine::run_multithreaded
or Coroutine::new().multithreaded(true).run(...)
(though, the later would stop working with !Send
closures, wouldn't it?).
The std::future
would be interesting endeavour too, but I believe that should definitely go into a separate branch and be merged separately.
Just asking… are you still working on this? Are you just busy, or would you use some help?
I ran into a snag and then was busy with other stuff. It seems that tokio-io-pool
almost works but there are some rough edges to using it, so instead I'm inclined to roll my own threadpool with crossbeam-deque
, though not using the work-stealing functionality, at least at first (though we actually can have work stealing here, because the coroutines are safe to send before we've started running them, we just can't steal work from the actual tokio-current-thread
runtime since that contains stacks which are definitely not Send
).
That was actually even easier than I expected - I'd basically written all the code I needed already. Here's the relevant commit on my fork. I'm not going to be very confident it works until I've written more tests, but I think this is promising.
Do you want me to start reading it now, or do you want me to wait for little bit of cleanup, some tests and a pull request O:-).
From a very very quick glimpse, the feeling I get from the code is it makes sense, though I might have some ideas for improvements.
Ping? Is this still valid, are you working on it? Is there anything I can help with? Or does the rapidly approaching async/await notation decrease the need in that space?
I haven't been working on it because async/await is mostly good enough. with the new futures API a lot of the things that had me disliking futures are gone.
Main advantage I see to coroutines given async/await is probably asynchronous implementation of traits that were written with synchronous behavior in mind, but that's not a pressing need for me at the moment.
On Mon, Oct 28, 2019, 03:13 vorner notifications@github.com wrote:
Ping? Is this still valid, are you working on it? Is there anything I can help with? Or does the rapidly approaching async/await notation decrease the need in that space?
— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/vorner/corona/issues/5?email_source=notifications&email_token=ABLJLZMPM66VH5A3GC6MM6LQQ23KZA5CNFSM4HMG7OSKYY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGOECMLPCY#issuecomment-546879371, or unsubscribe https://github.com/notifications/unsubscribe-auth/ABLJLZJMOUK5WZCTQ7D25ETQQ23KZANCNFSM4HMG7OSA .
Thank you for letting me know. I think we can close this issue, then. If you ever find the motivation to continue on this, we can reopen it.
Right now the only way to have
Coroutine
s executing on multiple threads is to manually configure the threadpool upfront and again manually handle passing work between them. This is not ideal for being able to mix and matchcorona
andfutures
based code, and additionally requires users to figure out inter-thread scheduling themselves.I'm working on a fix to this in my fork right now, any tips on how I might go about it? Seems like the obvious things to do are to replace thread-local
RefCell
s with something fromfutures-locks
, and replace theunsync::oneshot
channels withsync::oneshot
.Also, using thread-local storage for the coroutine builder is probably the wrong approach for this use case. Is there something better we could do instead? Maybe a global
futures_locks::RwLock
?