Diggsey / act-zero

Apache License 2.0
122 stars 7 forks source link

Using in a supervision tree style hierarchy #3

Closed praveenperera closed 3 years ago

praveenperera commented 3 years ago

Hey here is how I'm exploring it right now, let me know what you think.

Main function, supervision tree root:

#[tokio::main]
async fn main() -> Result<()> {
    color_eyre::install()?;
    env_logger::init();

    let cluster_manager_struct = ClusterManager::new().await;
    let cluster_manager = spawn_actor(cluster_manager_struct.clone());

    let _ = call!(cluster_manager.start_children()).await?;

    Ok(())
}

Starts children on startup

#[async_trait]
impl Actor for ClusterManager {
    async fn started(&mut self, pid: Addr<Self>) -> ActorResult<()> {
        self.pid = pid.downgrade();

        self.app_manager_pid = spawn_actor(AppManager::new(self).await);
        self.build_manager_pid = spawn_actor(BuildManager::new(self).await);

        Produces::ok(())
    }
}

Root start children function

    async fn start_children(&mut self) -> ActorResult<Vec<()>> {
        use futures::stream::FuturesUnordered;
        use futures::StreamExt;

        let children = FuturesUnordered::new();
        children.push(call!(self.build_manager_pid.start_children()));
        children.push(call!(self.app_manager_pid.start_children()));

        let ran: Vec<_> = children.collect().await;
        let filtered: Vec<()> = ran.into_iter().flatten().flatten().collect();

        Produces::ok(filtered)
    }

Child actor also has children of its own

    //BuildManager
    pub async fn start_children(&self) -> ActorResult<Vec<()>> {
        use futures::stream::FuturesUnordered;
        info!("Starting build manager");

        let bm_worker = spawn_actor(BuildMessageWorker::new(self).await);
        let incoming_worker = spawn_actor(IncomingMessageWorker::new(self).await);

        let children = FuturesUnordered::new();
        children.push(call!(bm_worker.listen()));
        children.push(call!(incoming_worker.listen()));

        let ran: Vec<ActorCallResult<()>> = children.collect().await;
        let filtered = ran.into_iter().filter_map(Result::ok).collect();

        Produces::ok(filtered)
    }
Diggsey commented 3 years ago

Closing as it appears to be a duplicate