riker-rs / riker

Easily build efficient, highly concurrent and resilient applications. An Actor Framework for Rust.
https://riker.rs
MIT License
1.02k stars 69 forks source link

tokio executor #152

Open mankinskin opened 3 years ago

mankinskin commented 3 years ago

Hi, this PR makes it possible to run futures on a tokio runtime in riker:

#[derive(Default)]
struct AsyncActor;

impl Actor for AsyncActor {
    type Msg = MessageType;
    fn recv(&mut self, ctx: &Context<Self::Msg>, msg: Self::Msg, sender: Sender) {
        let handle: tokio::task::JoinHandle<String> = ctx.run(async move {
            // runs in tokio runtime!
            tokio::time::delay_for(tokio::time::Duration::from_millis(10));
            String::from("Done!")
        }).unwrap();
    }
}

#[tokio::main]
async fn main() {
    let mut sys = ActorSystem::new().unwrap();
    let actor = sys.actor_of::<AsyncActor>("async-actor").unwrap()
    loop {
        update(actor);
    }
}

This PR adds a cargo feature tokio_executor which basically replaces the futures::executor::ThreadPool in ActorSystem with tokio::runtime::Handle and changes all dependent code accordingly.

This makes the code a bit messy and I would like to refactor this to encapsulate it more. The only real problem is the different APIs of tokio::JoinHandle and futures::RemoteHandle, which are returned by the respective runtime handles when starting a future execution:

I would introduce an internal type for representing either of the task handles, with an API respecting both. Depending on the feature, this type would be a RemoteHandle or a JoinHandle under the hood. Maybe the same can be done for the executor.

mankinskin commented 3 years ago

150

leenozara commented 3 years ago

@mankinskin thanks for taking this on. It's been a much requested feature and is a significant step in bringing Riker to the rest of the async/futures environment. I'll start to review this!

mankinskin commented 3 years ago

Some general notes:

Then tokio works a bit differently than the futures ThreadPool. Since tokio is itself a runtime, it is usually started before anything else. This is different than the ThreadPool field that was hosted by the ActorSystem, with tokio the ActorSystem would probably be hosted by tokio.

I am not that familiar with the riker code right now, but i would very much like to help making it async. A lot in riker already seems to work asynchronously, so we might be able to integrate that with tokio fairly easily and eventually provide async Actor trait implementations, that run directly as tasks on a tokio runtime.

mankinskin commented 3 years ago

A question still to be answered is if we want to drop the futures::ThreadPool entirely and only support tokio, or if we want to keep the futures::ThreadPool dependency. I don't think there would be anything wrong with transitioning to tokio completely.

nothingismagick commented 3 years ago

Doesn't tokio bring in another rather heavy dependency? And is this then a massive breaking change that would require everyone to retool?

mankinskin commented 3 years ago

Doesn't tokio bring in another rather heavy dependency?

Yes, tokio is another dependency, but since it is the fastest and most active async runtime I know of, I think it would be benefitial.

And is this then a massive breaking change that would require everyone to retool?

Yes it does take transitioning from the futures based API to the tokio API, to talk to the runtime.

Originally I wanted to put the different executors behind features, but I am not sure if this is worth it. Maintaining multiple executors makes the code more complex.

What we could do is introduce something like a runtime module, that encapsulates any runtime that is selected with a feature. But this would require some more work and I am not sure if I know what this project really needs/wants.

I could however revert the removal of the tokio_executor feature, so we can at least opt-into the tokio runtime, instead of defaulting.

mankinskin commented 3 years ago

What could be a solution is to transition to async-std, which has the tokio-async-std crate, which instead of running on the futures executor, runs on tokio. Then users could talk to the runtime they selected, while riker uses async-std or tokio-async-std internally.

hardliner66 commented 3 years ago

I agree that tokio is the most prominent executor and probably even the fastest, but there a other things that people might take into consideration. For example: if you want to use riker on embedded platforms, you might want an executor that is more lightweight.

I think it would be beneficial to support multiple executors or to be completely executor agnostic (e.g.: async_executors or agnostik).

mankinskin commented 3 years ago

@hardliner66 valid points. And thank you for these suggestions. I will try to integrate one of them in this PR when I have time 👍

siriux commented 3 years ago

I've tried to integrate an agnostic executor into riker and both options doesn't play well with the current riker architecture.

Agnostik executor trait is not object-safe, therefore, to include it in the ActorSystem is not as simple as boxing it inside an Arc, you need to add a type parameter. And this type parameter leaks and ends up affecting many many structs and impl, you need to introduce it all over riker.

This option might work (if no other problems are found). I was in the middle of adding the type parameter where needed to test it. But I decided is better to ask first if this is the way you want to go before fixing the new type parameter everywhere.

The other option is async_executors, and its executor trait is object-safe, but if you want to receive a JoinHandle after spawning you need to provide a type parameter for the output type of the spawned futures. This, AFAIK, wouldn't work well with riker architecture.

I think it's important to discuss if we really need riak to be executor agnostic, if we want it to use tokio executor, as it's the most used executor today, or even implement a custom solution for a fixed set of executors based on enums instead of traits.

If I have some time, I might try to finish the test with Agnostik to see if it's even an option and think more about a possible enum based solution.

siriux commented 3 years ago

Another option to cover the different use cases would be to keep the current executor under a feature flag (when something lightweight is needed) and the tokio executor under another feature flag for general use case and compatibility.

This is probably easier to implement than using Agnostik and better than the Enum idea. And it could be extended in the future with other flags if needed.

mankinskin commented 3 years ago

I implemented the different executors behind feature flags, but it was a very simple implementation, basically just adding cfg annotations everywhere where compiler errors occurred. It worked, but was very ugly because the cfg annotations were spread all over different modules.

It would be great to have something like agnostik work, especially if they support more executors in the future. If that does not work, I think the best solution would be to encapsulate all of the Executor and SpawnHandle logic into internal types/traits so that all of the riker code can target a single API, and then select the specific implementation using the cfg feature annotation only in that module.

This should be fairly simple to do, but I really don't have any time until after next week.

mankinskin commented 3 years ago

One other important thing is that the riker-testkit has to be adapted to this too. So the same steps should be taken there. This is the related pull request https://github.com/riker-rs/riker-testkit/pull/1

siriux commented 3 years ago

Just to let you know that the past weekend I've been investigating the solutions for agnostic async executors, and I'm trying to write a new crate without the limitations of the others that made it so difficult to integrate into riker.

I have something working already that would be enough for riker, but ideally this should be a generally useful crate beyond that, and I'm still trying to figure out how much can be done in a completely agnostic way.

mankinskin commented 3 years ago

Cool! I can try to integrate it into this patch soon and give you feedback. Don't hesitate to submit your own pull request when you're ready before though. I just have some assignments to finish first.

mankinskin commented 3 years ago

@siriux can you explain what the limitations of agnostik or async_executors were?

siriux commented 3 years ago

@mankinskin I've continued with this the last few days and the rabbit hole of agnostic async executors is really deep, but I'm starting to see the light.

The main architectural problem with agnostik (apart from bugs, missing features, ... that could be fixed) is that is not object-safe.

This is on purpose, to be able to give you a nice API on the trait. You can see it here, the output type of the join handle is a type parameter of the spawn* function. But this makes the trait not object-safe. This means we cannot store it as a trait object and we have to put a type parameter for it wherever we want to store it, and in the case of riker this parameter spreads everywhere which is not good.

The alternative using traits is what async_executors is doing, have a spawn handle trait with an output type parameter on the trait itself. This is object-safe, but it requires that you know in advance the return type. This might work for some libraries but not for something as generic as riker, and is a pretty big limitation in general. Also, the way of working with this is completely different to working with the underlying executors. You can see it here inside the declaration.

Both of this ways of trying to be agnostic using traits don't have a big performance impact, but impose big limitations. An alternative, that is used in the futures crate (currently used by riker) is to have an executor trait that doesn't return anything, and therefore you don't have to worry about the output type. And when you need an output you can create a oneshot channel and receive the result trough it (see RemoteHandle implementation). This removes all the limitations imposed before but it has some overhead (that riker is already paying).

When I wrote my last comment I had an implementation based on this idea (based on this library) that I improved and extended, but at the same time I continued learning and thinking and I found a better solution.

The main thing those libraries are doing is hiding the executor behind a trait and then implementing it. In theory, you could implement this trait yourself outside of the library, but in practice this is really hard and many times even impossible, so this libraries are not really agnostic, but agnostic over a set of predefined executors and if you need more you have to add them to the library itself.

If that's the case, why do we need the trait at all? We can use an enum with different variants that are under different config features for each executor/option. This doesn't have any of the problems of the traits or the performance overhead (and has some extra benefits). Of course, it's important to cover many uses cases and all the main libraries (and to be open to include new ones if they get traction) but in practice this is as flexible as the other "agnostic" libraries.

But in addition to this, if we want a library that can be used in many projects, we need to do it right and support a shared subset of all the libraries that really works.

What I've been working on is tokio, async_std, smol, futures, and wasm_bindgen_futures (and I've been loking to async_global_executor), and there are many many differences and details between them but they are the most used and cover all the main use cases. Big systems and compatibility with tokio and async_std, minimal with smol and futures, and wasm with wasm_bindgen_futures. Maybe embedded systems and no-std are the only things missing.

Also I've tested many variations of the architecture of the library until I've been able to find something that works well. The main problem has been having a spawn_local that is useful and works with tokio and futures. After trying many things that didn't work I have a solution that should work half implemented.

Another important thing is to be able to support timers that work everywhere and that reuse the internal timers when possible because they have a really good implementation in some of the executors. This is already working, and can be used as the base for even more agnostic things like IO, but that's another story.

You can see what I have right now here (except some things I'm still working on I didn't commit yet). I think I'm on the right track and I can have something complete soon, but the problem is much deeper than I thought when I started.

mankinskin commented 3 years ago

Thanks for this write-up! If I understand correctly, I think you wouldn't even need the enum, right? unless you want to support multiple executors at the same time, you can just implement the JoinHandle and ExecutorHandle types directly for each feature.

Another thing I thought about was to let users pass an executor handle to the ActorSystem when creating it. That would be great, because users could pick any executor and riker wouldn't have to maintain compatibility crates, also riker can still provide defaults. The problems here are of course

But if riker already relies on oneshot channels to return the output, we could easily define an Executor trait (similar to async_executors::SpawnHandle), that can only spawn futures returning (). That would be object safe, so we can store it in a trait object in non-generic ActorSystem, and code using the executor to spawn futures can wrap the JoinHandle<()> in a generic handle along with a channel receiver for the return value. Something like

struct FutureHandle<T: Send> {
    handle: JoinHandle<()>,
    recv: Receiver<T>,
}

impl<T: Send> Future for FutureHandle<T> {
    type Output = T;
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
         // poll handle
         // then receiver
         // return value
    }
}

And ActorSystem's run function could wrap the already generic future with the channel and spawn it on the executor as a Future<Output=()>, then return a FutureHandle<T>.

What do you think about this?

siriux commented 3 years ago

you can just implement the JoinHandle and ExecutorHandle types directly for each feature.

In theory you don't need the enum, but I think it's more convenient. As you said, you can enable multiple executors at the same time and that's pretty convenient for testing against all backends, but I'm sure some apps might need this too.

Also, if you don't use an enum you need to ensure all the implementations have the same signatures, that all are clone, send, sync, ... and like with tests, you cannot just enable all features and check once, you need to do it one by one. And not only for the main executor type, for JoinHandle, ...

Another thing I thought about was to let users pass an executor handle to the ActorSystem when creating it.

The idea is that libraries that want to be executor agnostic, like riker, depend on agnostic_async_executor without any executor dependent features. They still have access to an AgnosticExecutor type (a struct that wraps an enum) and build anything around it.

Then, final apps that use those libraries depend on one or more executor features that let them create an instance AgnosticExecutor to provide to the library.

Of course, a library that uses agnostic_async_executor will need to depend on some executor features in it's dev dependencies, otherwise it cannot be tested.

... also riker can still provide defaults.

And if you want to provide a default executor, as you said, you need to depend on it. But this should be a feature (maybe default?) on riker, otherwise if you use other executor you are still depending on the default one.

The problems here are of course

I'm not sure what you mean about the problems you present. Those are a good summary of the problems of the previous agnostic libraries, but my new library has been designed to solve all of them.

In riker you don't need an executor trait, you use the AgnosticExecutor struct provided by agnostic_async_executor.

This struct wraps the enum we talked about before, so you don't have to match on it or even have access to it. And it's clone, send, sync, ... doesn't require mut to spawn, ... So in practice you can do whatever you want with it like any other simple struct. That's a big difference with respect to the trait based approaches.

(As a note, if you try to introduce spawn_local in a direct way, as I tried before, you lose the Send property of AgnosticExecutor, among many other compatibility problems and blocks. That's mainly due to the design of tokio and the futures crate, that is very limiting with respect to spawn_local. I'm working on a new design for spawn_local that solves this, a description of the idea is in the TODO file.)

But if riker already relies on oneshot channels to return the output, we could easily define an Executor trait

Riker currently relies on oneshot, but it would be better to avoid this cost when you don't use the futures executor, as other executors handle this case natively.

The spawn method of my library already returns a JoinHandle that provides a value, so no need to do anything else on the riker side. And it achieves this without any extra overhead. For example, it just wraps the handles that tokio, async_std and smol provide, so it should be as fast as them.

For futures and wasm we still use the RemoteHandle from the futures crate (using oneshot internally), but that overhead is imposed by the executors themselves not by my library, you would still pay the cost even if you use the executor directly. To avoid this when you don't need the result we could add an extra async spawn method that returns ().

Also, in some cases you pay a minimal wrapping cost to have a consistent api, for example with the timer errors, but in most cases the optimizer will remove it, even more so when you only use a single executor.

In summary, the purpose of this library is to handle all the details and give you a nice abstraction. So that you don't need to create or implement any traits, or create your own handle types, ... Just use AgnosticExecutor inside riker as if it where the real executor, with it's real JoinHandle, but instead of creating it yourself just receive it from the outside.

I plan to add example crates to the library that show a library crate and a binary crate that uses the library one. So maybe this way it's easier to grasp the idea.

mankinskin commented 3 years ago

My concern was that agnostic_async_executors would be another dependency that would need to maintain an API to multiple executors. That could lead to a lot of maintenance requirement, that is why I proposed the trait based solution so that users can set the executor themselves. This would however incur the performance cost of the oneshot channels, which we actually want to avoid, obviously.

And apparently it is not really possible to spawn generic futures on generic executors from a non-generic host struct, so we need the compile time feature set anyways.

So I will update this branch to use your library and let you know how it goes.

mankinskin commented 3 years ago

I looked into integrating your crate with riker, and it seems like you are still working on it. There are some tests that don't compile because method sleep doesn't exist for AgnosticExecutor.

I don't think it makes much sense to support multiple executors at the same time. At least I can't think of a use case where you would need multiple executors and I think that would be out of scope anyways. It would be better if the crate would just wrap those executors behind features, so you can change the executor just by compiling with a different feature. Currently, riker would have to use features to call the different versions of new_<>_executor, to get the selected executor. Basically it should only need to call AgnosticExecutor::new() and get the executor it selected through the feature.

siriux commented 3 years ago

@mankinskin I don't think the right way to make riker agnostic is to call something like AgnosticExecutor::new() inside riker (except for the default/compatibility case, if we want it).

To cover all the use cases (mine included), and be executor agnostic, the executor should be a parameter you provide to riker when you create the actor system, not something riker creates by itself.

This is needed because I might want to use the executor for something else, or to run async code inside the actor system, or to integrate it with other parts of my application and external libraries. In all those cases I need to have access to it, and to control it's configuration. Otherwise, why make it agnostic? Right now I can let riker run it's own executor internally and create another one for the rest (with problems/limitations, of course).

To avoid breaking changes we can introduce a new method ActorSystem::new_with_executor(...) and leave the previous new() hard coded to the futures crate executor. This would be an exception where we create the executor inside riker for backwards compatibility/simplicity.

About the problems you found, if you took the crate from crates.io, that's probably not really working, I just wanted to learn the process to upload a crate.

The version in github should work (the previous one, and the current one) but you need to enable the right features otherwise you find missing implementations. It's a little bit tricky because wasm-packer has a bug that prevents us from providing features during the tests execution and the only way to do it right now is commenting and uncommenting things in Cargo.toml. Of course all of this will go in the documentation.

But anyway, the library is still under heavy development, so expect broken things.

mankinskin commented 3 years ago

the executor should be a parameter you provide to riker when you create the actor system

Perfect, this is exactly what I just did. I implemented the idea I had earlier with which you could pass an executor as an argument to ActorSystem. The drawback is that this still uses oneshot channels every time. But I don't see how this can be avoided without making ActorSystem generic or compiling it differently with features. The options I see here are

I don't think we can throw out the oneshot channels if we want to pass the executor as an argument. If we want to use arbitrary executors with riker (without oneshot), we have to compile riker for each one. But I am not sure how bad the overhead of oneshot really is, and since riker already uses it, I don't think it would be a problem to use it. Also once your crate is finished, we can still use features to install AgnosticExecutor natively and give up the oneshot channels.

My concern for now is to make the riker/tests and riker-testkit agnostic to the executor. I think once that is done it will be much easier to integrate any other kind of executor implementation into riker.

mankinskin commented 3 years ago

Now the default executor is ThreadPool as before, and tokio can be selected with the tokio_executor feature. Tests for both features pass, but I guess the CI jobs still need to run them for tokio_executor, or all features in general.

A minor issue is that I don't know if we can test riker-macros for different features of riker, because of this issue. riker-macros can only use riker as dev-dependencies, because riker depends on riker-macros, but due to a bug dev-dependencies (riker) can not have features (tokio_executor) enabled through features of the depending crate (riker-macros). And the tests in riker-macros do use ActorSystem and therefore the executor, so it would be nice if we could test those with every executor.

@leenozara I think this PR is ready to be reviewed.

leenozara commented 3 years ago

@mankinskin Sorry for the delay getting to reply to this significant PR. Are you able to provide here a short description that summarizes how to configure the tokio executor? Something that I can use as a starting point to review. This will help me build a progressive review rather than just going file by file. It will also help me updating the documentation, which we will need to do since this is a fairly significant update.

This is greatly appreciated!

mankinskin commented 3 years ago

@leenozara The way it works is that by selecting the tokio_executor feature (this name could still be changed):

# Cargo.toml
riker = { version = "*", features = ["tokio_executor"] }

riker's ActorSystem uses a tokio::runtime::Handle to spawn asynchronous tasks, instead of the default futures::executor::ThreadPool. This Handle is retrieved by tokio::runtime::Handle::current().

That entails that ActorSystem must be started on the executor, instead of starting the executor itself. So when using the tokio_executor, a riker application needs to start the tokio executor before starting the ActorSystem, for example using this:

#[tokio::main]
async fn main() {
    let sys = riker::ActorSystem::new().unwrap();
    ...
}

If there is no tokio runtime running, Handle::current() will panic.

So you can directly configure the tokio executor independently and ActorSystem will try to spawn tasks on the currently running executor through its runtime::Handle.

In executor.rs you can see the abstraction over executors and tasks which will hopefully make it easier to integrate other executors in the future, for example, @siriux 's agnostic_async_executor, which could use tokio more efficiently. Because right now, every spawned task uses a futures::channel::oneshot::channel (in impl Run for ActorSystem), to retrieve the return value through it's TaskHandle (spawning tasks returns a TaskHandle future, used to abort the task or to await its result).

This oneshot channel should not really be required, but it made implementing the executor agnostic code easier. Actually I am right now thinking it should be pretty easy to remove them entirely at this point, because we can simply implement the required components accordingly for each feature.. but I am not entirely sure if it is really that easy.

BratSinot commented 3 years ago

@mankinskin @leenozara Greetings!

Any update on this? I'm using warp/hyper in my project and having two runtimes (futures::executor::ThreadPool and tokio) not a good idea, and sometimes and sometimes it's just inconvenient..

mankinskin commented 3 years ago

@BratSinot the patch is usable, it is just not using the optimization I mentioned earlier, which would get rid of the overhead of one oneshot channel when using tokio. But that should not be a disadvantage to the version without tokio.

@leenozara would have to review this and merge it after we have resolved any issues.

BratSinot commented 3 years ago

the patch is usable, it is just not using the optimization I mentioned earlier, which would get rid of the overhead of one oneshot channel when using tokio. But that should not be a disadvantage to the version without tokio.

Thanks for reply!

BratSinot commented 3 years ago

@BratSinot the patch is usable, it is just not using the optimization I mentioned earlier, which would get rid of the overhead of one oneshot channel when using tokio. But that should not be a disadvantage to the version without tokio.

@leenozara would have to review this and merge it after we have resolved any issues.

Greeting again!

I have some problem with using your branch. Can you give me a hand?

riker = { git = "https://github.com/mankinskin/riker.git", rev = "d691195b133abd1d37678c531b74f2e81409bcf7", features = ["tokio_executor"] }
<...>
asinotov@fedora ➜  actors git:(master) ✗ cargo build
    Updating crates.io index
    Updating git repository `https://github.com/mankinskin/riker.git`
error: failed to select a version for `riker`.
    ... required by package `actors v0.1.0 (/home/asinotov/CLionProjects/actors)`
versions that meet the requirements `*` are: 0.4.2

the package `actors` depends on `riker`, with features: `riker-testkit` but `riker` does not have these features.

failed to select a version for `riker` which could resolve this conflict
mankinskin commented 3 years ago

I think this may be because of https://github.com/rust-lang/cargo/issues/9060 Removing the dev-dependency to riker-testkit should solve it: In Cargo.toml:

 [features]
 default = []
-tokio_executor = ["tokio", "riker-testkit/tokio_executor"]
+tokio_executor = ["tokio"]
...
-[dev-dependencies.riker-testkit]
-git = "https://github.com/mankinskin/riker-testkit"
-branch = "tokio_executor"

I just left this in because it is needed for the tests. If you want to test you will have to switch the "tokio_executor" feature manually for riker-testkit dev-dependency.. unfortunately that is the only way I got it working due to that cargo bug: cargo test

 [dev-dependencies.riker-testkit]
 git = "https://github.com/mankinskin/riker-testkit"
 branch = "tokio_executor"

cargo test --features tokio_executor

 [dev-dependencies.riker-testkit]
 git = "https://github.com/mankinskin/riker-testkit"
 branch = "tokio_executor"
+features = ["tokio_executor"]
BratSinot commented 3 years ago

I think this may be because of rust-lang/cargo#9060 Removing the dev-dependency to riker-testkit should solve it: In Cargo.toml:

 [features]
 default = []
-tokio_executor = ["tokio", "riker-testkit/tokio_executor"]
+tokio_executor = ["tokio"]
...
-[dev-dependencies.riker-testkit]
-git = "https://github.com/mankinskin/riker-testkit"
-branch = "tokio_executor"

I just left this in because it is needed for the tests. If you want to test you will have to switch the "tokio_executor" feature manually for riker-testkit dev-dependency.. unfortunately that is the only way I got it working due to that cargo bug: cargo test

 [dev-dependencies.riker-testkit]
 git = "https://github.com/mankinskin/riker-testkit"
 branch = "tokio_executor"

cargo test --features tokio_executor

 [dev-dependencies.riker-testkit]
 git = "https://github.com/mankinskin/riker-testkit"
 branch = "tokio_executor"
+features = ["tokio_executor"]

Thanks a lot!