riker-rs / riker

Easily build efficient, highly concurrent and resilient applications. An Actor Framework for Rust.
https://riker.rs
MIT License
1.02k stars 69 forks source link

terminate actor with no more messages guarantee #104

Open aav opened 4 years ago

aav commented 4 years ago

Currently, there is no way to terminate the actor from recv callback in a way, that no messages will arrive (recv will not be called) after termination. Clearly ctx.stop(ctx.myself()); does not satisfy this criteria.

Here is the typical use case:

impl Actor for MyActor {
    type Msg = MyEvent;

    fn post_start(&mut self, ctx: &Context<Self::Msg>) {
        // do request wait for MyEvent::Response
        issue_request_to_someont(ctx.myself());

        // or MyEvent::Timeout 
        ctx.schedule_once(ctx.myself(), Duration::from_sec(1), MyEvent::Timeout)
    }

    fn recv(&mut self, ctx: &Context<Self::Msg>, msg: Self::Msg, _sender: Sender) {
        match msg {
            MyEvent::Response(_) => {
                // handle response
            },

            MyEvent::Timeout => {
                // handle timeout
            }
        }

        // missing :(
        ctx.terminate();
   }
}

In Erlang, I would simply stop receiving messages and leave the process function. Since here we are based on callback, I believe that such a function is really important.

leenozara commented 4 years ago

@aav can you confirm that this is the case? ctx.stop is a system message. Any system messages in an actor’s mailbox will be handled before user messages. A check for system messages is done after every user message is handled.

aav commented 4 years ago

@leenozara I will double-check (almost sure, I observed this behavior) and create an example. In case I will not be able to make it (my error) - I will close the issue.

aav commented 4 years ago

Here is the code example:

use std::time::Duration;
use riker::actors::*;

#[derive(Default)]
struct MyActor;

impl Actor for MyActor {
    type Msg = i32;

    fn recv(&mut self, ctx: &Context<Self::Msg>, msg: Self::Msg, _sender: Sender) {
        println!("message: {:?}", msg);

        if msg == 5 {
            println!("stop");
            ctx.stop(ctx.myself());
        }
    }
}

fn main() {
    let sys = ActorSystem::new().unwrap();

    let actor = sys.actor_of::<MyActor>("actor").unwrap();

    for n in 0..10 {
        actor.tell(n, None);
    }

    std::thread::sleep(Duration::from_millis(5000));
}

and here is the output:

2020-06-20 15:09:15+00:00 DEBG [riker::system] Starting actor system: System[riker]
2020-06-20 15:09:15+00:00 DEBG [riker::system] Actor system [bfd38e4c-1949-4a8a-b814-e9bd83397ff4] [riker] started
message: 0
message: 1
message: 2
message: 3
message: 4
message: 5
stop
message: 6
message: 7
message: 8
message: 9

I would expect that after ctx.stop(ctx.myself()) recv callback will never be called again.

leenozara commented 4 years ago

Thanks for checking and confirming that. I’ll take a look to see why this is the case.

milgner commented 2 years ago

@aav did you happen to find a solution to this? I'm currently struggling with a similar situation where even though we're calling context.stop(actor_ref) the post_stop method isn't invoked and the actor is kept alive.

Update: for some reason everything works as expected when using tmp_actor_of but not with regular child actors...