tonarino / actor

A minimalist actor framework aiming for high performance and simplicity.
MIT License
40 stars 6 forks source link

Ideas to improve the speed of `shutdown` sequence #12

Open skywhale opened 3 years ago

facetious commented 4 months ago

You should split apart the control message and the join so that all threads can receive their shutdown control first. This will allow the threads to clean up independently of one another. Collect all the pending thread handles together after sending the control message, then iterate through them to join them and ensure they've terminated.

                    match entry {
                        RegistryEntry::CurrentThread(_) => None,
                        RegistryEntry::BackgroundThread(_control_addr, thread_handle) => {
                            if thread_handle.thread().id() == current_thread.id() {
                                return None;
                            }

                            Some((actor_name, i, thread_handle))
                        },
                    }
                })
                .collect::<Vec<(String, usize, JoinHandle<()>)>>()
                .iter()
                .map(|(actor_name, i, thread_handle)| {
                    debug!("[{}] joining actor thread: {}", self.name, actor_name);

                    match thread_handle.join() {
                        Ok(Ok(())) => {
                            debug!("[{}] actor thread joined: {}", self.name, actor_name);
                            None
                        },
                        Ok(Err(e)) => {
                            error!("[{}] actor thread panicked: {} ({})", self.name, actor_name, e);
                            Some(actor_name)
                        },
                        Err(e) => {
                            error!(
                                "[{}] actor thread join failed: {} ({})",
                                self.name, actor_name, e
                            );
                            Some(actor_name)
                        },
                    }
                })
strohel commented 4 months ago

@facetious good idea! If you want to take a stab at implementation, patches are welcome!

bschwind commented 4 months ago

I remember adding a patch for this a long time ago, before the actor crate got split out into its own repo. I'm actually not sure why it never got pulled along. Here was the original diff:

impl SystemHandle {
    /// Stops all actors spawned by this system.
    pub fn shutdown(&self) -> Result<(), Error> {
+        let shutdown_start = Instant::now();
+
        let current_thread = thread::current();
        let current_thread_name = current_thread.name().unwrap_or("Unknown thread id");
        info!("Thread [{}] shutting down the actor system", current_thread_name);
@@ -418,17 +420,22 @@ impl SystemHandle {
            let mut registry = self.registry.lock();
            debug!("[{}] joining {} actor threads.", self.name, registry.len());
            // Joining actors in the reverse order in which they are spawn.
+
+            for entry in registry.iter_mut().rev() {
+                let actor_name = entry.name();
+
+                if let Err(e) = entry.control_addr().stop() {
+                    warn!("control channel is closed: {} ({})", actor_name, e);
+                }
+            }

            registry
                .drain(..)
                .rev()
                .enumerate()
-                .filter_map(|(i, mut entry)| {
+                .filter_map(|(i, entry)| {
                    let actor_name = entry.name();

-                    if let Err(e) = entry.control_addr().stop() {
-                        warn!("control channel is closed: {} ({})", actor_name, e);
-                    }
-
                    match entry {
                        RegistryEntry::CurrentThread(_) => None,
                        RegistryEntry::BackgroundThread(_control_addr, thread_handle) => {
@@ -460,7 +467,7 @@ impl SystemHandle {
                .count()
        };

-        info!("[{}] system finished shutting down.", self.name);
+        info!("[{}] system finished shutting down in {:?}", self.name, shutdown_start.elapsed());

        if let Some(callback) = self.callbacks.postshutdown.as_ref() {
            info!("[{}] calling post-shutdown callback.", self.name);
facetious commented 4 months ago

lgtm