riker-rs / riker

Easily build efficient, highly concurrent and resilient applications. An Actor Framework for Rust.
https://riker.rs
MIT License
1.02k stars 69 forks source link

How do I get a message response? #35

Closed snnsnn closed 5 years ago

snnsnn commented 5 years ago

Hi, this is more of a question than an issue.

I am new to Actor model and trying to figure out how it works while reading some papers on it. So please bear with me.

Here I am trying to get response for the messages I sent.

extern crate riker;
extern crate riker_default;
#[macro_use]
extern crate log;

use riker::actors::*;
use riker_default::DefaultModel;
use std::time::Duration;

struct Counter {
    count: i32,
}

impl Counter {
    fn get_count(&mut self) -> i32 {
        self.count
    }

    fn new() -> BoxActor<Opr> {
        Box::new(Counter{ count: 0 })
    }
}

#[derive(Debug, Clone)]
enum Opr {
    Add(i32),
    Sub(i32)
}

impl Into<ActorMsg<Opr>> for Opr {
    fn into(self) -> ActorMsg<Opr> {
        ActorMsg::User(self)
    }
}

impl Actor for Counter {
    type Msg = Opr;

    fn receive(
        &mut self,
        _ctx: &Context<Self::Msg>,
        msg: Self::Msg,
        _sender: Option<ActorRef<Self::Msg>>,
    ) {
        match msg {
            Opr::Add(n) => {
                self.count += n;
            }
            Opr::Sub(n) => {
                self.count -= n;
            }
        }
    }
}

fn main() {
    let model: DefaultModel<Opr> = DefaultModel::new();
    let sys = ActorSystem::new(&model).unwrap();

    let props = Props::new(Box::new(Counter::new));
    let my_actor = sys.actor_of(props, "my-actor").unwrap();

    my_actor.tell(Opr::Add(10), None);
    my_actor.tell(Opr::Sub(3), None);

    std::thread::sleep(Duration::from_millis(500));
}

From looking at the Actor trait's receive method, currently that should not be possible, right?

pub trait Actor : Send {
    type Msg: Message;

    // ...

    fn receive(&mut self, ctx: &Context<Self::Msg>, msg: Self::Msg, sender: Option<ActorRef<Self::Msg>>);

    // ...
}

Hewitt writes,

Messages in the Actor Model are decoupled from the sender and are delivered by the system on a best efforts basis.

If an Actor is sent a request, then the continuation must be one of the following two mutually exclusive possibilities:

  1. to process the response resulting from the recipient receiving the request
  2. to throw a Messaging exception

Just sitting there forever after a request has been sent is a silent failure, which is unacceptable. So, in due course, the infrastructure must throw a Messaging exception as governed by the policies in place if a response (return value or exception) to the request has not been received.

Ideally, if the continuation of sending a request is to throw a Messaging exception, then the sender of a response to the request also receives a Messaging exception saying that the response could not be processed.

If desired, things can be arranged so that Messaging exceptions are distinguished from all other exceptions.

Runtime failures are always a possibility in Actor systems and are dealt with by runtime infrastructures. Message acknowledgement, reception, and response cannot be guaranteed although best efforts are made.

So, does current implementation fit into this description? How can I get a response for the message I sent?

Also, let's say I like to have two different messages rather than an enum, so that each has its own impl blocks:

struct Add(i32);

impl Add {
   // ...
}

struct Sub(i32);

impl Sub {
   // ...
}

So, to handle them, still I have to use pattern matching in receive function, right? Wouldn't it be better to have a separate handler for each message?

leenozara commented 5 years ago

Hi @snnsnn,

Happy to help you. Messaging actors using .tell is essentially fire-and-forget. There is no concept of request/response from that perspective. However it is a common actor pattern for Actor A to send a message to Actor B and Actor B send a message back to Actor A. This is easy to implement:

Your message protocol would look like:

#[derive(Debug, Clone)]
enum Opr {
    Request(u32),
    Result(u32)
}

fn receive(&mut self,
                ctx: &Context<Self::Msg>,
                msg: Self::Msg,
                sender: Option<ActorRef<Self::Msg>>) {

                if let Opr::Request(m) = msg {
                    self.count += m;
                    sender.try_tell(Opr::Result(self.count)).unwrap();
                }
}

This models a basic request/response. You would likely add two other parts to this:

  1. Using Result<u32, OprError> as the type to send back, e.g. sender.try_tell(Opr::Result(Ok(self.count)));.
  2. Using a timeout pattern that schedules a message to be sent to myself after a certain delay. Then the calling request actor waits to receive either the Result from the Counter actor or the timeout message from itself after the delay.

Implementing 1 is easy and allows you to manage any error that occurs in Counter.receive. If there's an error you return Err.

Implementing 2 is more involved because it is best used with session based or temporary actors that exist for a specific request. It need to know for what request has timed out when it receives a timeout message.

I'm in transit right now, so please leave this issue open and I'll reply to your second question or any follow up to the above in about 24 hours.

Thanks.

kitsuneninetails commented 5 years ago

@snnsnn

@leenozara already answered about request-response, but I can go a little more into detail on another perspective of the "how to get a pass/fail response back from a tell to an actor" issue if you don't mind.

Basically, actor systems are built around asynchronous messaging. In that sense, waiting around for a response (synchronous processing, as in normal functional/imperative programming where you call a function and wait for an answer) isn't a pattern that fits into that model. Instead, there are two patterns that handle passing a message back to a caller:

1) Double "tells", where a tell to one actor is "answered" by a tell from that actor back to the caller. 2) "Ask" pattern. This pattern is essentially wrapping the first by using a Future to represent the wait until the far side sends back its message (this Future can be passed to other functions, mapped over, or blocked on until an answer arrives).

A simple example of a double tell and an ask would look like:

(Cargo deps):

[dependencies]
futures-preview = "*"
riker = "0.1.9"
riker-default = "0.1.9"
riker-patterns = "0.1.9"

Working Rust code:

extern crate futures;
extern crate riker;
extern crate riker_default;
extern crate riker_patterns;

use futures::executor::block_on;
use riker::actors::*;
use riker_default::DefaultModel;
use riker_patterns::ask::ask;

#[derive(Clone, Debug)]
enum Operation {
    Add(u32, u32),
    Subtract(u32, u32),
    Divide(u32, u32),
}

#[derive(Clone, Debug)]
enum Protocol {
    Answer(u32),
    Error(String),
    Run(Operation)
}

impl Into<ActorMsg<Protocol>> for Protocol {
    fn into(self) -> ActorMsg<Protocol> {
        ActorMsg::User(self)
    }
}

struct Program {
    calc_actor: Option<ActorRef<Protocol>>,
    asker: Option<ActorRef<Protocol>>,
}

impl Program {
    pub fn actor() -> BoxActor<Protocol> {
        Box::new(Program { calc_actor: None, asker: None })
    }
}

impl Actor for Program {
    type Msg = Protocol;
    fn pre_start(&mut self,
                 ctx: &Context<Self::Msg>) {
        self.calc_actor = Some(ctx.actor_of(Props::new(Box::new(Calculator::actor)),
                                            "calc_actor").unwrap());
    }

    fn receive(&mut self,
               ctx: &Context<Self::Msg>,
               msg: Self::Msg,
               sender: Option<ActorRef<Protocol>>) {
        match msg {
            Protocol::Run(op) => {
                println!("Running operation: {:?}", op);
                self.asker = sender;
                self.calc_actor.as_ref().unwrap().tell(Protocol::Run(op), Some(ctx.myself()));
            },
            m => {
                println!("Received response: {:?}", m);
                self.asker.take().unwrap().tell(m, Some(ctx.myself()));
            },
        }
    }
}

struct Calculator;
impl Calculator {
    pub fn actor() -> BoxActor<Protocol> {
        Box::new(Calculator)
    }
}

impl Actor for Calculator {
    type Msg = Protocol;
    fn receive(&mut self,
               ctx: &Context<Self::Msg>,
               msg: Self::Msg,
               sender: Option<ActorRef<Protocol>>) {
        match msg {
            Protocol::Run(op) => {
                match op {
                    Operation::Add(a, b) => {
                        println!("Calculating {} + {}", a, b);
                        sender.unwrap().tell(Protocol::Answer(a + b), Some(ctx.myself()));
                    },
                    Operation::Subtract(a, b) => {
                        sender.unwrap().tell(Protocol::Answer(a - b), Some(ctx.myself()));
                    },
                    Operation::Divide(a, b) => {
                        if b == 0 {
                            sender.unwrap().tell(Protocol::Error("Cannot divide by 0".to_string()), Some(ctx.myself()));
                        } else {
                            sender.unwrap().tell(Protocol::Answer(a / b), Some(ctx.myself()));
                        }
                    }
                }
            },
            m => {
                sender.unwrap().tell(Protocol::Error(format!("Unknown command: {:?}", m)),
                                     Some(ctx.myself()));
            }
        }
    }
}

fn main() {

    let asys = ActorSystem::new(&DefaultModel::new()).unwrap();

    let prog_props = Props::new(Box::new(Program ::actor));
    let prog_actor = asys.actor_of(prog_props , "prog_actor").unwrap();

    let fut = ask(&asys, &prog_actor , Protocol::Run(Operation::Add(2, 3)));
    match block_on(fut) {
        Err(e) => {
            panic!("System error: {:?}", e);
        },
        Ok(Protocol::Answer(answer)) => {
            println!("Answer is: {}", answer)
        },
        Ok(Protocol::Error(e)) => {
            println!("Error occurred: {}", e)
        },
        _ => panic!("Unknown message reponse!")
    }

    let fut = ask(&asys, &prog_actor , Protocol::Run(Operation::Divide(5, 0)));
    match block_on(fut) {
        Err(e) => {
            panic!("System error: {:?}", e);
        },
        Ok(Protocol::Answer(answer)) => {
            println!("Answer is: {}", answer)
        },
        Ok(Protocol::Error(e)) => {
            println!("Error occurred: {}", e)
        },
        _ => panic!("Unknown message reponse!")
    }
}

I am still using Riker 0.1.9, so please keep that in mind (especially as it deals with futures).

So, this uses both an ask and a double tell model to show how they work. The ask model is used from main, because main itself is not an actor and cannot receive messages. This pattern is useful in context of an HTTP server handler, where you need to wait for a response from the actor system before you can send back the response to the caller. It also works well when you are using any kind of function which can map on that future without having to explicitly block on the response.

The double tell is also shown here between the Program and the Calculator, where the Calculator sends back an Answer message with the valid "response" or an Error message with an error response.


Note: Combining double tell and ask can cause issues with the sender parameter, as that parameter is always set to whatever is passed into the tell. Normally, if you use an ask pattern, you should be forwarding the sender along like such:

    fn receive(&mut self,
               ctx: &Context<Self::Msg>,
               msg: Self::Msg,
               sender: Option<ActorRef<Protocol>>) {
        match msg {
            Protocol::Run(op) => {
                println!("Running operation: {:?}", op);
                // Pass the original "asker" to the Calculator actor as if it were the original "sender"
                self.calc_actor.as_ref().unwrap().tell(Protocol::Run(op), sender);
            },
            ...

and then have the called actor simply respond directly to the original sender (the asker) as such:

    fn receive(&mut self,
               ctx: &Context<Self::Msg>,
               msg: Self::Msg,
               sender: Option<ActorRef<Protocol>>) {
        match msg {
            Protocol::Run(op) => {
                match op {
                    Operation::Add(a, b) => {
                        println!("Calculating {} + {}", a, b);
                        // "sender" here is the original asker, NOT the "Program" actor!
                        sender.unwrap().tell(Protocol::Answer(a + b), Some(ctx.myself()));
                    },
                    ...

However, in order to show the double tell here, I had to make sure the sender in the Calculator actor was actually the Program actor and not the original "asker", which is why I had to store the original asker parameter off for later. This is not recommended normally unless it is absolutely necessary, as it can become complicated to make sure the correct asker gets the correct response from the Program actor.

Either way, I'd choose one: using double tells and handling the "answers" within the actor system (recommended if possible) or use the ask pattern and forward along the original asker in the sender parameter of each tell so that the actor which delivers the answer can directly give the answer back to the original "asker" (useful if you are using an HTTP server handler to handle the response, or if you have to process the result in a main function outside the actor system).


Finally, note that I used an explicit message to handle error conditions (in this case my Error type was a String). This makes it easier to differentiate between a good response and an error response. In addition, using try_tell, etc. can also help with the actor system's failure to deliver messages, etc.

I suggest to keep in mind the clear boundary between "problems with the actor system in accomplishing its mission (i.e. delivering messages and keeping actors alive)" and "problems within the actors themselves in dealing with errors in user space, i.e. errors of protocol and errors processing the message contents themselves". The former should be handed with actor system calls (like try_tell, etc.) while the latter I feel are best handled through the Protocol (as in my Error message type above) and inside the actor code itself (by panicking an actor and invoking supervision strategy, by recovery code blocks, etc.). I used a separate message type with a string to embody my concept of "error", but there is no reason you can't use Result types in your Protocol as well to embody user-level errors (in this case the Answer message type would have a Result<u32, String> instead of passing the error as a separate message).

To answer your last question:

If you want to use different structs for messages, I suggest simply using the enum to be a variant between your structs:

#[derive(Clone, Debug)]
Struct A;

#[derive(Clone, Debug)]
Struct B;

#[derive(Clone, Debug)]
enum Protocol {
    AMsg(A),
    BMsg(B),
}

Then, in your receive, you just pattern match on "Protocol" to differentiate between Protocol::AMsg and Protocol::BMsg, ignoring the other. I do this regularly in my usage of riker in all of my actor systems.

If all you are looking for is to handle two messages in the same actor system, this is the way I recommend. Don't think of the type parameter to Actor<> and ActorSystem<> as a "Message Type", but rather as a "Protocol Type" that the entire system has to agree on and follow, and that might make it clearer why one Protocol type can hold many different message types.

If you are looking to have a single actor change its accepted message types based off of a state change, then we should look at transform! (basically the become semantics from Akka). This is a slightly different question than what I think you are asking about, however.

Hope that all helps you out! Please reply with questions or clarifications if needed, and I'll be happy to help further!

snnsnn commented 5 years ago

Thank you for the detailed answer. I will send a pull request for the documentation once I digest it. Few more questions may follow. Then, I will close the issue.

leenozara commented 5 years ago

To add to @kitsuneninetails excellent answer I would note that ask is great for bridging in and outside of the actor system, like in his example. This works well for things such as HTTP requests from main to sys and back out to main again.

If you're working between two or more actors within the system then you don't need ask - just .tell between the two actors is fine.

leenozara commented 5 years ago

Regarding your question about different message types, it is difficult since the actors in Riker are Actor trait objects and in Rust trait objects can't have generic functions. This means that fn receive has to accept the same msg type for all actors.

We're looking at ways to improve this in the future. I agree that from a type and function perspective it is less optimal. From a messaging perspective you might find as I do that the match makes it easy to reason about, however.

snnsnn commented 5 years ago

Thank you for the detailed answer. I will send a pull request for the documentation once I digest it.

I was still planning on sending that PR but I see someone already did it (https://riker.rs/messaging/). Please forgive my extreme procrastination, life is very hectic and demanding these days.

Thank you for the great work.