Diggsey / act-zero

Apache License 2.0
122 stars 7 forks source link

Using in a supervision tree style hierarchy #4

Closed praveenperera closed 3 years ago

praveenperera commented 3 years ago

Hey here is how I'm exploring it right now, let me know what you think.

Main function creates the supervision tree root and starts its children:

#[tokio::main]
async fn main() -> Result<()> {
    color_eyre::install()?;
    env_logger::init();

    let cluster_manager = spawn_actor(ClusterManager::new().await);
    let _ = call!(cluster_manager.start_children()).await?;

    Ok(())
}

On root create, save self Addr and creates children and saves their addrs as well

#[async_trait]
impl Actor for ClusterManager {
    async fn started(&mut self, pid: Addr<Self>) -> ActorResult<()> {
        self.pid = pid.downgrade();

        self.app_manager_pid = spawn_actor(AppManager::new(self).await);
        self.build_manager_pid = spawn_actor(BuildManager::new(self).await);

        Produces::ok(())
    }
}

Root start children function

    async fn start_children(&mut self) -> ActorResult<Vec<()>> {
        use futures::stream::FuturesUnordered;
        use futures::StreamExt;

        let children = FuturesUnordered::new();
        children.push(call!(self.build_manager_pid.start_children()));
        children.push(call!(self.app_manager_pid.start_children()));

        // some weird nested async wrangling goes on here
        let ran: Vec<_> = children.collect().await;
        let filtered: Vec<()> = ran.into_iter().flatten().flatten().collect();

        Produces::ok(filtered)
    }

Child actor also has children of its own

    //BuildManager
    pub async fn start_children(&self) -> ActorResult<Vec<()>> {
        use futures::stream::FuturesUnordered;
        info!("Starting build manager");

        let bm_worker = spawn_actor(BuildMessageWorker::new(self).await);
        let incoming_worker = spawn_actor(IncomingMessageWorker::new(self).await);

        let children = FuturesUnordered::new();
        children.push(call!(bm_worker.listen()));
        children.push(call!(incoming_worker.listen()));

        let ran: Vec<ActorCallResult<()>> = children.collect().await;
        let filtered = ran.into_iter().filter_map(Result::ok).collect();

        Produces::ok(filtered)
    }

The ClusterManager BuildManager and AppManager actors need to stick around for ever always listening to (and handling) new messages coming from outside the system (through NATS).

Diggsey commented 3 years ago

I'm unsure exactly what's going on, but I can give some general advice:

1) Try to avoid having async methods that take a really long time if they borrow the actor state. Nothing else can run on that actor if its state is already borrowed. If you need to run longer tasks then you can simply spawn futures onto the actor to run them in the background.

2) If you need to run code when an actor stops (such as you might in a supervision tree) you can use the termination() method. This returns a future that resolves when the actor stops (even if that's because it panicked).

You might use these together in a supervisor with something like this:

async fn spawn_child(&mut self) {
    self.child = spawn_actor(...);
    let termination = self.child.termination();
    self.addr.send_fut_with(|addr| async move {
        termination.await;
        // The child stopped for some reason, re-spawn it again
        send!(addr.spawn_child());
    });
}
praveenperera commented 3 years ago

Thanks for the feedback, I'm going look into send_fut_with and try that out tomorrow

Here is a diagram of the supervision tree structure, maybe this makes more sense.

All the actors listed above need to live for the entirety of the program. The children at the bottom of the tree, all need to stick around for the entirety of the program because they are the ones that listen to the messages from NATS (outside the Rust app). They wait for messages and then passes it their parent to handle it.:

IMG_2320

At first I tried starting the children with

send!(bm_worker.listen())

But then the children async functions did not run

Diggsey commented 3 years ago

But then the children async functions did not run

Yeah, there's currently not a general way to forward all messages to another actor (other than just implementing the methods on the parent and having them call the child methods.

One thing you can do, it to delegate a response to another actor or future, without holding onto the parent's state.

You'll notice that call!(...) returns a Produces<T>. Instead of .awaiting it, which would wait until the method returns, you can return the deferred value immediately, with Ok(call!(...)).

When the method returns, the value will be returned directly to the person who called the parent, without the parent actor being involved at all.

praveenperera commented 3 years ago

I am able to do this:

Child (starts its own children)

    pub async fn start_children(&self) -> ActorResult<Vec<Produces<()>>> {
        info!("Starting build manager");

        let bm_worker = spawn_actor(BuildMessageWorker::new(self).await);
        let incoming_worker = spawn_actor(IncomingMessageWorker::new(self).await);

        let mut children = Vec::new();
        children.push(call!(bm_worker.listen()));
        children.push(call!(incoming_worker.listen()));

        Produces::ok(children)
    }
}

Top parent:

    async fn start_children(&mut self) -> ActorResult<Vec<Produces<Vec<Produces<()>>>>> {
        let mut children = Vec::new();

        children.push(call!(self.build_manager_pid.start_children()));
        // children.push(call!(self.app_manager_pid.start_children()));

        Produces::ok(children)
    }

Then in the main function is the only place I call await. Awaiting for all the futures:

#[tokio::main]
async fn main() -> Result<()> {
    color_eyre::install()?;
    env_logger::init();

    let cluster_manager = spawn_actor(ClusterManager::new().await);

    let futures = call!(cluster_manager.start_children()).await?;

    for future in futures {
        let futures = future.await?;
        for future in futures {
            future.await?;
        }
    }

    Ok(())
}

This is ugly but it works. I am awaiting all the futures in the main function so the program stays alive.

A problem is problem is the return type signature:

ActorResult<FuturesUnordered<Produces<FuturesUnordered<Produces<()>>>>>

and the nested fors

Any time I add a layer of children this type signature gets crazier.

I could fix that by creating this:

pub enum NestedFutures {
    Produces(Box<Produces<NestedFutures>>),
    Nested(Vec<NestedFutures>),
    Item(Produces<()>),
}

But then I'm not sure how to await all the nested futures


Update:

Was able to simplify the root actor a little bit:

    async fn start_children(&mut self) -> ActorResult<impl futures::stream::StreamExt> {
        use futures::future::FutureExt;
        use futures::stream;
        use futures::stream::StreamExt;

        let children = vec![
            call!(self.build_manager_pid.start_children()),
            call!(self.app_manager_pid.start_children()),
        ]
        .into_iter();
        let stream = stream::iter(children).flat_map(|f| f.into_stream());

        Produces::ok(stream)
    }

    let cluster_manager = spawn_actor(ClusterManager::new().await);
    let mut futures = call!(cluster_manager.start_children()).await?;

    loop {
        futures.next().await;
    }
Diggsey commented 3 years ago

Hm.. I'm still unsure exactly what you're trying to accomplish?

If you just want the main() function to wait until the root actor stops, then just call root_actor.termination().await from main().

Diggsey commented 3 years ago

If you want that pattern all the way up the tree, then you can do it two ways:

1) Have each superivisor spawn a future onto itself that calls child.termination().await on all of its children. This is a bit of a pain as it requires you to spawn all the children up-front.

OR

2) Have each child actor store the Addr<_> of its parent in its state. When all children have stopped the reference count on the parent actor will hit zero, and that will stop too.

praveenperera commented 3 years ago

Hey @Diggsey the children at the lowest level should never die. As long as the app is running they should be alive waiting for new messages to come in.

So yes I want my main function to wait for all those actors. I think I will try you're second suggestion and in the main function try main_parent.termination().await.

Thanks!

praveenperera commented 3 years ago

Thanks @Diggsey that worked perfectly. My setup is MUCH MUCH simpler:

main parent

#[tokio::main]
async fn main() -> Result<()> {
    color_eyre::install()?;
    env_logger::init();

    let cluster_manager = spawn_actor(ClusterManager::new().await);
    call!(cluster_manager.start_children()).await?;

    cluster_manager.termination().await;

    Ok(())
}

Children:

    async fn start_children(&mut self) -> ActorResult<()> {
        send!(self.build_manager_pid.start_children());
        send!(self.app_manager_pid.start_children());

        Produces::ok(())
    }

Thanks for this awesome crate!

Diggsey commented 3 years ago

Glad to hear it 👍