elfo-rs / elfo

An asynchronous distributed actor framework in Rust with robust observability
217 stars 12 forks source link

Forwarding the `ResponseToken` associated with a network request to a different actor group results in a remote actor becoming stuck. #117

Open sargarass opened 10 months ago

sargarass commented 10 months ago

Consider a modified version of the Alice-Bob example. In this scenario, the consumer, Bob, forwards his token to another actor in a separate actor group known as the advocate group. Within this group, an actor calls the respond function using the forwarded token and conveys their greetings. The issue here is that the producer, Alice, located on a different node, will never receive this response, leading to been stucked.

This occurs due to the way ResponseToken interacts with network requests. An improvement we can make is to directly include the corresponding network handle address in the token, ensuring that it always responds in its drop method.

Additionally, we can implement a panic in the respond function when the token is used outside of the intended actor group. The remote actor will be notified when an unwind occurs and drop is called.

#[message]
struct TalkToMyAdvocate {
    token: Local<MoveOwnership<ResponseToken<AskName>>>
}

fn consumer() -> Blueprint {
    ActorGroup::new()
        .router(MapRouter::new(|e| {
            msg!(match e {
                Hello => Outcome::GentleUnicast(Singleton),
                AskName => Outcome::Unicast(Singleton),
                _ => Outcome::Discard,
            })
        }))
        .exec(|mut ctx| async move {
            while let Some(envelope) = ctx.recv().await {
                let sender = envelope.sender();

                msg!(match envelope {
                    Hello(i) => {
                        info!("received Hello({})", i);

                        if let Err(err) = ctx.send_to(sender, Hello(i)).await {
                            warn!("cannot say Hello({}) back: {}", i, err);
                        }
                    }
                    (AskName(i), token) => {
                        if let Err(err) = ctx.send(TalkToMyAdvocate { token: Local::from(MoveOwnership::from(token)) }).await {
                            warn!("cannot forward AskName({}) to 2nd consumer: {}", i, err);
                        }
                    }
                });
            }
        })
}

fn advocate() -> Blueprint {
    ActorGroup::new()
        .router(MapRouter::new(|e| {
            msg!(match e {
                TalkToMyAdvocate => Outcome::Unicast(Singleton),
                _ => Outcome::Discard,
            })
        }))
        .exec(|mut ctx| async move {
            while let Some(envelope) = ctx.recv().await {
                let sender = envelope.sender();

                msg!(match envelope {
                    m @ TalkToMyAdvocate => {
                        ctx.respond(m.token.into_inner().take().unwrap(), "Im Bob's advocate".to_string());
                    }
                });
            }
        })
}