riker-rs / riker

Easily build efficient, highly concurrent and resilient applications. An Actor Framework for Rust.
https://riker.rs
MIT License
1.04k stars 69 forks source link

exponential back-off strategy #130

Open igalic opened 4 years ago

igalic commented 4 years ago

when an Actor, responsible for processing messages for a networked service, receives a message that it cannot process because the remote service is down, i would like to retry the same message in an exponential back-off strategy.

hardliner66 commented 4 years ago

Currently riker doesn't handle backpressure in any way. So to get that behaviour, you will have to implement it yourself.

The actor could either respond to the sender, saying it should try again later. In this case different senders can implement different back-off strategies. Or it could schedule a message to itself (https://riker.rs/scheduling/). This way you can pack the old message and the timeout you used in a separate message and increase the timeout until it either succeeds or a max timeout is reached.

Here is an example of the second variant:

use riker::actors::*;
use std::time::Duration;

const MIN_TIMEOUT: u64 = 10;
const MAX_TIMEOUT: u64 = 2000;

#[derive(Clone, Debug)]
pub struct MyMessage {
    should_fail: bool,
}

#[derive(Clone, Debug)]
pub struct Retry {
    original_msg: MyMessage,
    last_timeout: u64,
}

#[actor(MyMessage, Retry)]
#[derive(Default, Debug)]
struct ActorBackOffTest;

impl Actor for ActorBackOffTest {
    type Msg = ActorBackOffTestMsg;

    fn supervisor_strategy(&self) -> Strategy {
        Strategy::Stop
    }

    fn recv(
        &mut self,
        ctx: &Context<ActorBackOffTestMsg>,
        msg: ActorBackOffTestMsg,
        sender: Sender,
    ) {
        self.receive(ctx, msg, sender);
    }
}

impl Receive<MyMessage> for ActorBackOffTest {
    type Msg = ActorBackOffTestMsg;

    fn receive(&mut self, ctx: &Context<Self::Msg>, msg: MyMessage, sender: Sender) {
        if msg.should_fail {
            println!("Failed first time, retrying in {} ms!", MIN_TIMEOUT);
            ctx.system.schedule_once(
                Duration::from_millis(MIN_TIMEOUT),
                ctx.myself(),
                sender,
                Retry {
                    original_msg: msg,
                    last_timeout: MIN_TIMEOUT,
                },
            );
        } else {
            println!("Success! Stopping system.");
            ctx.stop(&ctx.myself);
        }
    }
}

impl Receive<Retry> for ActorBackOffTest {
    type Msg = ActorBackOffTestMsg;

    fn receive(&mut self, ctx: &Context<Self::Msg>, msg: Retry, sender: Sender) {
        if msg.original_msg.should_fail && msg.last_timeout < MAX_TIMEOUT {
            let timeout = msg.last_timeout * 2;
            println!("Failed again, retrying in {} ms!", timeout);

            ctx.system.schedule_once(
                Duration::from_millis(timeout),
                ctx.myself(),
                sender,
                Retry {
                    original_msg: msg.original_msg,
                    last_timeout: timeout,
                },
            );
        } else {
            println!("Success! Stopping system.");
            ctx.stop(&ctx.myself);
        }
    }
}

fn main() {
    riker_bench::logging::init();

    let sys = ActorSystem::new().unwrap();

    let act = sys.actor_of::<ActorBackOffTest>("act").unwrap();

    act.tell(MyMessage { should_fail: true }, None);

    while sys.user_root().has_children() {
        std::thread::sleep(std::time::Duration::from_millis(50));
    }
}
leenozara commented 4 years ago

If you're referring to sending messages outside of the actor system, for example sending webhooks to customers' server URL then you'd need something like this:

Create one actor that is responsible for dispatching and a group of actors responsible for the work of making the network IO. You also need persistent storage to record the state of the content.

Your dispatcher is responsible for taking messages from persistent storage and distributing to the workers. The dispatcher can react by:

When you want to send your network message, you can add it to storage and send a message to your dispatcher that it needs to dispatch to a working. Dispatcher will pull from storage and send to a worker. If the worker fails due to network IO it can message the dispatcher so that it can update storage. The message will be reattempted as per your backoff strategy when the dispatcher receives its own scheduled message to pull the set of messages that need to be dispatched.

If your system restarts there is no impact since your messages were stored in the DB.

This is a very basic design and for production I would separate responsibility into multiple actors so that your dispatcher is handling only the distribution of work to the workers on the scheduled message. A separate actor can be responsible for updating the database. You might also want to only write to the DB if a message fails, i.e. first attempt is in-memory only.