Thomasdezeeuw / heph

Heph is an actor library for Rust based on asynchronous functions.
MIT License
128 stars 6 forks source link

Virtual Actors #270

Open Thomasdezeeuw opened 4 years ago

Thomasdezeeuw commented 4 years ago

Orleans, an Actor network for .NET, has the concept of "Virtual Actors". A virtual actor handles only a single message and doesn't have any internal state (beyond what is required to handle the message) between invocations. With these limitations its possible to start a single actor per message, easily scaling the actor.

Some initial design ideas:

The VirtualActor trait that only handles a single message.

trait VirtualActor {
    type Message;
    type Future = Future<Output = Result<(), Self::Error>>;
    type Error;
    fn handle_message(&self, ctx: actor::Context<Self::Message>, msg: Self::Message) -> Self::Future;
}

Alternatively we can not pass the initial message and have it in the actor's inbox.

trait VirtualActor {
    type Message;
    type Future = Future<Output = Result<(), Self::Error>>;
    type Error;

    /// `ctx` is guaranteed to contain at least one message.
    fn start(&self, ctx: actor::Context<Self::Message>) -> Self::Future;
}

Open questions:

Resources:

Thomasdezeeuw commented 3 years ago

Some more design ideas.

An updated version of the trait:

pub trait VirtualActor {
    type Message;
    type Actor: Actor;
    type Error;
    type RuntimeAccess;

    /// Create a new actor that handles `msg`.
    fn new(rt: Self::RuntimeAccess, msg: Self::Message) -> Result<Self::Actor, Self::Error>;
}

It's similar to the NewActor trait, but it doesn't give the actor an actor::Context. Instead it only gets access to the runtime through RuntimeAccess (the RT part of the actor::Context).

Spawning would look something like the following.

impl RuntimeRef {
    /// Spawn a new virtual actor.
    /// 
    /// For each message send to the returned actor reference a new virtual
    /// actor is created to handle the message.
    fn spawn_virtual_actor<S, VA>(&mut self, supervisor: S, options: ActorOptions) -> ActorRef<VA::Message>
    where
        S: Supervisor<VA>,
        VA: VirtualActor,
    {
        // ...
    }
}
Thomasdezeeuw commented 2 years ago
diff --git a/src/actor/mod.rs b/src/actor/mod.rs
index 464e722..73931e0 100644
--- a/src/actor/mod.rs
+++ b/src/actor/mod.rs
@@ -144,9 +144,12 @@
 mod sync;
 #[cfg(test)]
 mod tests;
+mod r#virtual;

 #[doc(inline)]
 pub use context::{Context, NoMessages, ReceiveMessage, RecvError};
+#[doc(inline)]
+pub use r#virtual::VirtualActor;
 #[cfg(any(test, feature = "test"))]
 pub(crate) use sync::SyncWaker;
 #[doc(inline)]
diff --git a/src/actor/virtual.rs b/src/actor/virtual.rs
new file mode 100644
index 0000000..6b01bbe
--- /dev/null
+++ b/src/actor/virtual.rs
@@ -0,0 +1,186 @@
+use crate::actor::{name, Actor};
+
+pub trait VirtualActor {
+    /// The type of messages the actor can receive.
+    ///
+    /// See [`NewActor::Message`] for more information and examples.
+    ///
+    /// [`NewActor::Message`]: crate::actor::NewActor::Message
+    type Message;
+
+    /// The type of the actor.
+    ///
+    /// See [`Actor`](Actor) for more.
+    type Actor: Actor;
+
+    /// The type of error.
+    ///
+    /// Note that if creating an actor is always successful the never type (`!`)
+    /// can be used. Asynchronous functions for example use the never type as
+    /// error.
+    type Error;
+
+    /// The kind of runtime access needed by the actor.
+    type RuntimeAccess;
+
+    /// Create a new actor that handles a single `msg`.
+    fn new(
+        &mut self,
+        rt: Self::RuntimeAccess,
+        msg: Self::Message,
+    ) -> Result<Self::Actor, Self::Error>;
+
+    /* TODO: maybe add arguments?
+    /// Wrap the `NewActor` to change the arguments its accepts.
+    ///
+    /// This can be used when additional arguments are needed to be passed to an
+    /// actor, where another function requires a certain argument list. For
+    /// example when using [`TcpServer`].
+    ///
+    /// [`TcpServer`]: crate::net::TcpServer
+    ///
+    /// # Examples
+    ///
+    /// Using [`TcpServer`] requires a `NewActor` that accepts `(TcpStream,
+    /// SocketAddr)` as arguments, but we need to pass the actor additional
+    /// arguments.
+    ///
+    /// ```
+    /// #![feature(never_type)]
+    ///
+    /// use std::io;
+    /// use std::net::SocketAddr;
+    ///
+    /// use heph::actor::{self, NewActor};
+    /// # use heph::actor::messages::Terminate;
+    /// # use heph::net::tcp::server;
+    /// use heph::net::{TcpServer, TcpStream};
+    /// use heph::rt::{self, Runtime, RuntimeRef, ThreadLocal};
+    /// use heph::spawn::ActorOptions;
+    /// # use heph::supervisor::{Supervisor, SupervisorStrategy};
+    /// # use log::error;
+    ///
+    /// fn main() -> Result<(), rt::Error> {
+    ///     // Create and run runtime
+    ///     let mut runtime = Runtime::new()?;
+    ///     runtime.run_on_workers(setup)?;
+    ///     runtime.start()
+    /// }
+    ///
+    /// /// In this setup function we'll spawn the `TcpServer` actor.
+    /// fn setup(mut runtime_ref: RuntimeRef) -> io::Result<()> {
+    ///     // Prepare for humans' expand to Mars.
+    ///     let greet_mars = true;
+    ///
+    ///     // Our actor that accepts three arguments.
+    ///     let new_actor = (conn_actor as fn(_, _, _, _) -> _)
+    ///         .map_arg(move |(stream, address)| (stream, address, greet_mars));
+    ///
+    ///     // For more information about the remainder of this example see
+    ///     // `TcpServer`.
+    ///     let address = "127.0.0.1:7890".parse().unwrap();
+    ///     let server = TcpServer::setup(address, conn_supervisor, new_actor, ActorOptions::default())?;
+    ///     # let actor_ref =
+    ///     runtime_ref.try_spawn_local(ServerSupervisor, server, (), ActorOptions::default())?;
+    ///     # actor_ref.try_send(Terminate).unwrap();
+    ///     Ok(())
+    /// }
+    ///
+    /// # #[derive(Copy, Clone, Debug)]
+    /// # struct ServerSupervisor;
+    /// #
+    /// # impl<S, NA> Supervisor<server::Setup<S, NA>> for ServerSupervisor
+    /// # where
+    /// #     S: Supervisor<NA> + Clone + 'static,
+    /// #     NA: NewActor<Argument = (TcpStream, SocketAddr), Error = !, RuntimeAccess = ThreadLocal> + Clone + 'static,
+    /// # {
+    /// #     fn decide(&mut self, err: server::Error<!>) -> SupervisorStrategy<()> {
+    /// #         use server::Error::*;
+    /// #         match err {
+    /// #             Accept(err) => {
+    /// #                 error!("error accepting new connection: {}", err);
+    /// #                 SupervisorStrategy::Restart(())
+    /// #             }
+    /// #             NewActor(_) => unreachable!(),
+    /// #         }
+    /// #     }
+    /// #
+    /// #     fn decide_on_restart_error(&mut self, err: io::Error) -> SupervisorStrategy<()> {
+    /// #         error!("error restarting the TCP server: {}", err);
+    /// #         SupervisorStrategy::Stop
+    /// #     }
+    /// #
+    /// #     fn second_restart_error(&mut self, _: io::Error) {
+    /// #         // We don't restart a second time, so this will never be called.
+    /// #         unreachable!();
+    /// #     }
+    /// # }
+    /// #
+    /// # fn conn_supervisor(err: io::Error) -> SupervisorStrategy<(TcpStream, SocketAddr)> {
+    /// #   error!("error handling connection: {}", err);
+    /// #   SupervisorStrategy::Stop
+    /// # }
+    /// #
+    /// // Actor that handles a connection.
+    /// async fn conn_actor(
+    ///     _: actor::Context<!, ThreadLocal>,
+    ///     mut stream: TcpStream,
+    ///     address: SocketAddr,
+    ///     greet_mars: bool
+    /// ) -> io::Result<()> {
+    /// #   drop(address); // Silence dead code warnings.
+    ///     if greet_mars {
+    ///         // In case this example ever reaches Mars.
+    ///         stream.send_all(b"Hello Mars").await
+    ///     } else {
+    ///         stream.send_all(b"Hello World").await
+    ///     }
+    /// }
+    /// ```
+    fn map_arg<F, Arg>(self, f: F) -> ArgMap<Self, F, Arg>
+    where
+        Self: Sized,
+        F: FnMut(Arg) -> Self::Argument,
+    {
+        ArgMap {
+            new_actor: self,
+            map: f,
+            _phantom: PhantomData,
+        }
+    }
+    */
+
+    /// Returns the name of the virtual actor.
+    ///
+    /// The default implementation creates the name based on the type name of
+    /// the actor.
+    ///
+    /// # Notes
+    ///
+    /// This uses [`type_name`] under the hood which does not have a stable
+    /// output. Like the `type_name` function the default implementation is
+    /// provided on a best effort basis.
+    ///
+    /// [`type_name`]: std::any::type_name
+    fn name(&self) -> &'static str {
+        name::<Self::Actor>()
+    }
+}
+
+impl<RT, M, A> VirtualActor for fn(rt: RT, msg: M) -> A
+where
+    A: Actor,
+{
+    type Message = M;
+    type Actor = A;
+    type Error = !;
+    type RuntimeAccess = RT;
+
+    fn new(
+        &mut self,
+        rt: Self::RuntimeAccess,
+        msg: Self::Message,
+    ) -> Result<Self::Actor, Self::Error> {
+        Ok((self)(rt, msg))
+    }
+}
Cloud33 commented 2 years ago

Any progress on virtual actors? I like its design concept very much

Thomasdezeeuw commented 2 years ago

Any progress on virtual actors? I like its design concept very much

I'm afraid not.