riker-rs / riker

Easily build efficient, highly concurrent and resilient applications. An Actor Framework for Rust.
https://riker.rs
MIT License
1.02k stars 69 forks source link

Storing / removing children / parents in container. #168

Closed BratSinot closed 3 years ago

BratSinot commented 3 years ago

Greetings!

First of all, thanks for great library!

So, since I can't check children existing through select(), I need to store BasicActorRef to some container. But, I can't figure out how to know when I must remove actor from that container. For example, this tree:

test
└─ user
   └─ Foo0
      └─ Bar0
         └─ Noo0
      └─ Bar1
         └─ Noo1

After Noo0 removed from Bar.noos how can I notify Foo and remove Bar0 from Foo.bars? I can catch moment then noos is empty and send some notification to parent here #1. But I can get a race here #2 (because someone can send CreateNoo and create new children).

use riker::actors::{
    actor, Actor, ActorFactoryArgs, ActorRefFactory, BasicActorRef, Context, Receive, Sender,
    SystemBuilder, Tell,
};
use std::collections::HashMap;

#[actor(CreateNoo, DestroyNoo, LastNoo)]
pub struct Foo {
    foo_id: String,
    bars: HashMap<String, BasicActorRef>,
}

impl ActorFactoryArgs<String> for Foo {
    fn create_args(foo_id: String) -> Self {
        Foo {
            foo_id,
            bars: HashMap::new(),
        }
    }
}

impl Actor for Foo {
    type Msg = FooMsg;

    fn recv(&mut self, ctx: &Context<Self::Msg>, msg: Self::Msg, sender: Sender) {
        self.receive(ctx, msg, sender);
    }
}

impl Receive<CreateNoo> for Foo {
    type Msg = FooMsg;

    fn receive(&mut self, ctx: &Context<Self::Msg>, msg: CreateNoo, _sender: Sender) {
        let bar_id = msg.bar_id.clone();
        if let Some(bar) = self.bars.get(&bar_id) {
            println!("bar {} already exist in foo {}", bar_id, self.foo_id);
            bar.try_tell(BarMsg::CreateNoo(msg), None).unwrap();
        } else {
            println!("create bar {} for foo {}", bar_id, self.foo_id);
            let agent = ctx
                .actor_of_args::<Bar, _>(&bar_id, bar_id.clone())
                .unwrap();
            agent.tell(msg, None);
            self.bars.insert(bar_id, BasicActorRef::from(agent));
        }
    }
}

impl Receive<DestroyNoo> for Foo {
    type Msg = FooMsg;

    fn receive(&mut self, ctx: &Context<Self::Msg>, msg: DestroyNoo, _sender: Sender) {
        println!(
            "remove noo {} in bar {} in foo {}",
            msg.id, msg.bar_id, msg.foo_id
        );
        if let Some(bar) = self.bars.get(&msg.bar_id) {
            bar.try_tell(
                BarMsg::DestroyNoo(msg),
                Some(BasicActorRef::from(ctx.myself())),
            )
            .unwrap();
        }
    }
}

impl Receive<LastNoo> for Foo {
    type Msg = FooMsg;

    fn receive(&mut self, ctx: &Context<Self::Msg>, msg: LastNoo, _sender: Sender) {
        println!("remove bar {} in foo {}", self.foo_id, msg.bar_id);

        match self.bars.remove(&msg.bar_id) {
            // <--- #2
            Some(actor) => {
                println!("bar {} removed in foo {}", msg.bar_id, self.foo_id);
                ctx.stop(actor);
            }
            None => println!("bar {} doesn't exist in foo {}", self.foo_id, msg.bar_id),
        }
    }
}

#[actor(CreateNoo, DestroyNoo)]
pub struct Bar {
    pub bar_id: String,
    pub noos: HashMap<String, BasicActorRef>,
}

impl Actor for Bar {
    type Msg = BarMsg;

    fn recv(&mut self, ctx: &Context<Self::Msg>, msg: Self::Msg, sender: Sender) {
        self.receive(ctx, msg, sender);
    }
}

impl ActorFactoryArgs<String> for Bar {
    fn create_args(bar_id: String) -> Self {
        Bar {
            bar_id,
            noos: HashMap::new(),
        }
    }
}

impl Receive<CreateNoo> for Bar {
    type Msg = BarMsg;

    fn receive(&mut self, ctx: &Context<Self::Msg>, msg: CreateNoo, _sender: Sender) {
        match self.noos.contains_key(&msg.id) {
            true => println!("noo {} already exists in bar {}", msg.id, self.bar_id),
            false => {
                println!("adding noo {} in bar {}", msg.id, self.bar_id);
                self.noos.insert(
                    msg.id.clone(),
                    BasicActorRef::from(
                        ctx.actor_of_args::<Noo, _>(&msg.id, msg.id.clone())
                            .unwrap(),
                    ),
                );
            }
        }
    }
}

impl Receive<DestroyNoo> for Bar {
    type Msg = BarMsg;

    fn receive(&mut self, ctx: &Context<Self::Msg>, msg: DestroyNoo, sender: Sender) {
        match self.noos.remove(&msg.id) {
            Some(actor) => {
                println!("noo {} removed in bar {}..", msg.id, self.bar_id);
                ctx.stop(actor);
                if self.noos.is_empty() {
                    // <---- #1
                    println!("last noo {} in bar {} was removed", msg.id, self.bar_id);
                    if let Some(sender) = sender {
                        sender
                            .try_tell(
                                FooMsg::LastNoo(LastNoo {
                                    bar_id: msg.bar_id,
                                    foo_id: msg.foo_id,
                                }),
                                None,
                            )
                            .unwrap();
                    }
                }
            }
            None => println!("noo {} doesn't exist in bar {}", msg.id, self.bar_id),
        }
    }
}

#[actor()]
#[derive(Clone, Debug)]
pub struct Noo {
    id: String,
}

impl Actor for Noo {
    type Msg = NooMsg;

    fn recv(&mut self, ctx: &Context<Self::Msg>, msg: Self::Msg, sender: Sender) {
        self.receive(ctx, msg, sender);
    }
}

impl ActorFactoryArgs<String> for Noo {
    fn create_args(id: String) -> Self {
        Noo { id }
    }
}

#[derive(Clone, Debug)]
pub struct CreateNoo {
    pub id: String,
    pub bar_id: String,
    pub foo_id: String,
}

#[derive(Clone, Debug)]
pub struct DestroyNoo {
    pub id: String,
    pub bar_id: String,
    pub foo_id: String,
}

#[derive(Clone, Debug)]
pub struct LastNoo {
    pub bar_id: String,
    pub foo_id: String,
}

fn main() {
    let sys = SystemBuilder::new().name("test").create().unwrap();

    let foo = sys
        .actor_of_args::<Foo, _>("Foo0", "Foo0".to_owned())
        .unwrap();

    foo.tell(
        CreateNoo {
            id: "Noo0".to_owned(),
            bar_id: "Bar0".to_owned(),
            foo_id: "Foo0".to_owned(),
        },
        None,
    );
    foo.tell(
        CreateNoo {
            id: "Noo1".to_owned(),
            bar_id: "Bar1".to_owned(),
            foo_id: "Foo0".to_owned(),
        },
        None,
    );

    std::thread::sleep(std::time::Duration::from_millis(500));

    sys.print_tree();

    foo.tell(
        DestroyNoo {
            id: "Noo0".to_owned(),
            bar_id: "Bar0".to_owned(),
            foo_id: "Foo0".to_owned(),
        },
        None,
    );
    foo.tell(
        DestroyNoo {
            id: "Noo1".to_owned(),
            bar_id: "Bar1".to_owned(),
            foo_id: "Foo0".to_owned(),
        },
        None,
    );

    std::thread::sleep(std::time::Duration::from_millis(500));

    sys.print_tree();
}
BratSinot commented 3 years ago

Ok, I figure it out. Instead of sending response to parent, I'm just using counter in parent:

use riker::actors::{
    actor, Actor, ActorFactory, ActorRef, ActorRefFactory, ActorReference, BasicActorRef, Context,
    Receive, Sender, SystemBuilder, Tell,
};
use std::{
    collections::HashMap,
    sync::atomic::{AtomicUsize, Ordering},
};

#[actor(CreateNoo, DestroyNoo)]
pub struct Foo {
    bars: HashMap<String, (AtomicUsize, ActorRef<BarMsg>)>,
}

impl ActorFactory for Foo {
    fn create() -> Self {
        Self {
            bars: HashMap::new(),
        }
    }
}

impl Actor for Foo {
    type Msg = FooMsg;

    fn recv(&mut self, ctx: &Context<Self::Msg>, msg: Self::Msg, sender: Sender) {
        self.receive(ctx, msg, sender);
    }
}

impl Receive<CreateNoo> for Foo {
    type Msg = FooMsg;

    fn receive(&mut self, ctx: &Context<Self::Msg>, msg: CreateNoo, _sender: Sender) {
        let (counter, bar) = self.bars.entry(msg.bar_id.clone()).or_insert_with(|| {
            println!("create bar {} for foo {}", msg.bar_id, ctx.myself.name());
            (
                AtomicUsize::new(0),
                ctx.actor_of::<Bar>(&msg.bar_id).unwrap(),
            )
        });
        counter.fetch_add(1, Ordering::SeqCst);
        bar.tell(msg, None);
    }
}

impl Receive<DestroyNoo> for Foo {
    type Msg = FooMsg;

    fn receive(&mut self, ctx: &Context<Self::Msg>, msg: DestroyNoo, _: Sender) {
        println!(
            "remove noo {} in bar {} in foo {}",
            msg.id, msg.bar_id, msg.foo_id
        );
        if let Some((counter, bar)) = self.bars.get(&msg.bar_id) {
            let bar_id = msg.bar_id.clone();
            bar.tell(msg, None);
            if counter.fetch_sub(1, Ordering::SeqCst) == 1 {
                match self.bars.remove(&bar_id) {
                    Some((_, bar)) => {
                        ctx.stop(bar);
                        println!("Bar {} removed from foo {}", bar_id, ctx.myself.name());
                    }
                    None => println!(
                        "Bar {} does not exist for foo {}",
                        bar_id,
                        ctx.myself.name()
                    ),
                }
            }
        }
    }
}

#[actor(CreateNoo, DestroyNoo)]
pub struct Bar {
    pub noos: HashMap<String, ActorRef<NooMsg>>,
}

impl Actor for Bar {
    type Msg = BarMsg;

    fn recv(&mut self, ctx: &Context<Self::Msg>, msg: Self::Msg, sender: Sender) {
        self.receive(ctx, msg, sender);
    }
}

impl ActorFactory for Bar {
    fn create() -> Self {
        Self {
            noos: HashMap::new(),
        }
    }
}

impl Receive<CreateNoo> for Bar {
    type Msg = BarMsg;

    fn receive(&mut self, ctx: &Context<Self::Msg>, msg: CreateNoo, _sender: Sender) {
        match self.noos.contains_key(&msg.id) {
            true => println!("noo {} already exists in bar {}", msg.id, ctx.myself.name()),
            false => {
                println!("adding noo {} in bar {}", msg.id, ctx.myself.name());
                self.noos
                    .insert(msg.id.clone(), ctx.actor_of::<Noo>(&msg.id).unwrap());
            }
        }
    }
}

impl Receive<DestroyNoo> for Bar {
    type Msg = BarMsg;

    fn receive(&mut self, ctx: &Context<Self::Msg>, msg: DestroyNoo, _: Sender) {
        match self.noos.remove(&msg.id) {
            Some(actor) => {
                println!("noo {} removed in bar {}..", msg.id, ctx.myself.name());
                ctx.stop(actor);
            }
            None => println!("noo {} doesn't exist in bar {}", msg.id, ctx.myself.name()),
        }
    }
}

#[actor()]
#[derive(Clone, Debug)]
pub struct Noo {}

impl Actor for Noo {
    type Msg = NooMsg;

    fn recv(&mut self, ctx: &Context<Self::Msg>, msg: Self::Msg, sender: Sender) {
        self.receive(ctx, msg, sender);
    }
}

impl ActorFactory for Noo {
    fn create() -> Self {
        Noo {}
    }
}

#[derive(Clone, Debug)]
pub struct CreateNoo {
    pub id: String,
    pub bar_id: String,
    pub foo_id: String,
}

#[derive(Clone, Debug)]
pub struct DestroyNoo {
    pub id: String,
    pub bar_id: String,
    pub foo_id: String,
}

fn main() {
    let sys = SystemBuilder::new().name("test").create().unwrap();

    let foo = sys.actor_of::<Foo>("Foo0").unwrap();

    foo.tell(
        CreateNoo {
            id: "Noo0".to_owned(),
            bar_id: "Bar0".to_owned(),
            foo_id: "Foo0".to_owned(),
        },
        None,
    );
    foo.tell(
        CreateNoo {
            id: "Noo1".to_owned(),
            bar_id: "Bar1".to_owned(),
            foo_id: "Foo0".to_owned(),
        },
        None,
    );

    std::thread::sleep(std::time::Duration::from_millis(500));

    foo.tell(
        DestroyNoo {
            id: "Noo0".to_owned(),
            bar_id: "Bar0".to_owned(),
            foo_id: "Foo0".to_owned(),
        },
        None,
    );
    foo.tell(
        DestroyNoo {
            id: "Noo1".to_owned(),
            bar_id: "Bar1".to_owned(),
            foo_id: "Foo0".to_owned(),
        },
        None,
    );

    std::thread::sleep(std::time::Duration::from_millis(500));
}
BratSinot commented 3 years ago

Okey, I just got another problem with dead letter -_-"

BratSinot commented 3 years ago

Ok, I can store Foo parent in Bar, stop Bar myself and, after last Noo was removed, send message to parent in post_stop and then remove Bar from Foo cache.

use riker::actors::{
    actor, Actor, ActorFactory, ActorFactoryArgs, ActorRef, ActorRefFactory, ActorReference,
    BasicActorRef, Context, Receive, Sender, SystemBuilder, Tell,
};
use std::collections::HashMap;

#[actor(CreateNoo, DestroyNoo, LastNoo)]
pub struct Foo {
    bars: HashMap<String, ActorRef<BarMsg>>,
}

impl ActorFactory for Foo {
    fn create() -> Self {
        Self {
            bars: HashMap::new(),
        }
    }
}

impl Actor for Foo {
    type Msg = FooMsg;

    fn recv(&mut self, ctx: &Context<Self::Msg>, msg: Self::Msg, sender: Sender) {
        self.receive(ctx, msg, sender);
    }
}

impl Receive<CreateNoo> for Foo {
    type Msg = FooMsg;

    fn receive(&mut self, ctx: &Context<Self::Msg>, msg: CreateNoo, _sender: Sender) {
        self.bars
            .entry(msg.bar_id.clone())
            .or_insert_with(|| {
                println!("create bar {} for foo {}", msg.bar_id, ctx.myself.name());
                ctx.actor_of_args::<Bar, _>(&msg.bar_id, (ctx.myself(),))
                    .unwrap()
            })
            .tell(msg, None);
    }
}

impl Receive<DestroyNoo> for Foo {
    type Msg = FooMsg;

    fn receive(&mut self, _: &Context<Self::Msg>, msg: DestroyNoo, _: Sender) {
        println!(
            "remove noo {} in bar {} in foo {}",
            msg.id, msg.bar_id, msg.foo_id
        );
        if let Some(bar) = self.bars.get(&msg.bar_id) {
            bar.tell(msg, None);
        }
    }
}

impl Receive<LastNoo> for Foo {
    type Msg = FooMsg;

    fn receive(&mut self, ctx: &Context<Self::Msg>, msg: LastNoo, _: Sender) {
        if let Some(bar) = self.bars.get(&msg.bar_id) {
            match bar.has_children() {
                true => println!("bar {} still have children!", msg.bar_id),
                false => match self.bars.remove(&msg.bar_id) {
                    Some(bar) => {
                        println!("removing bar {}", msg.bar_id);
                        ctx.stop(bar);
                    }
                    None => println!("can't find bar {}", msg.bar_id),
                },
            }
        }
    }
}

#[actor(CreateNoo, DestroyNoo)]
pub struct Bar {
    id: Option<String>,
    pub parent: ActorRef<FooMsg>,
    pub noos: HashMap<String, ActorRef<NooMsg>>,
}

impl Actor for Bar {
    type Msg = BarMsg;

    fn pre_start(&mut self, ctx: &Context<Self::Msg>) {
        self.id = Some(ctx.myself.name().to_string());
    }

    fn recv(&mut self, ctx: &Context<Self::Msg>, msg: Self::Msg, sender: Sender) {
        self.receive(ctx, msg, sender);
    }

    fn post_stop(&mut self) {
        self.parent.tell(
            LastNoo {
                bar_id: self.id.take().unwrap(),
            },
            None,
        );
    }
}

impl ActorFactoryArgs<(ActorRef<FooMsg>,)> for Bar {
    fn create_args((parent,): (ActorRef<FooMsg>,)) -> Self {
        Self {
            id: None,
            parent,
            noos: HashMap::new(),
        }
    }
}

impl Receive<CreateNoo> for Bar {
    type Msg = BarMsg;

    fn receive(&mut self, ctx: &Context<Self::Msg>, msg: CreateNoo, _sender: Sender) {
        match self.noos.contains_key(&msg.id) {
            true => println!("noo {} already exists in bar {}", msg.id, ctx.myself.name()),
            false => {
                println!("adding noo {} in bar {}", msg.id, ctx.myself.name());
                self.noos
                    .insert(msg.id.clone(), ctx.actor_of::<Noo>(&msg.id).unwrap());
            }
        }
    }
}

impl Receive<DestroyNoo> for Bar {
    type Msg = BarMsg;

    fn receive(&mut self, ctx: &Context<Self::Msg>, msg: DestroyNoo, _: Sender) {
        match self.noos.remove(&msg.id) {
            Some(actor) => {
                ctx.stop(actor);
                println!("noo {} removed in bar {}..", msg.id, ctx.myself.name());

                if self.noos.is_empty() {
                    println!("last noos for bar was removed");
                    ctx.stop(ctx.myself());
                }
            }
            None => println!("noo {} doesn't exist in bar {}", msg.id, ctx.myself.name()),
        }
    }
}

#[actor()]
#[derive(Clone, Debug)]
pub struct Noo {}

impl Actor for Noo {
    type Msg = NooMsg;

    fn recv(&mut self, ctx: &Context<Self::Msg>, msg: Self::Msg, sender: Sender) {
        self.receive(ctx, msg, sender);
    }
}

impl ActorFactory for Noo {
    fn create() -> Self {
        Noo {}
    }
}

#[derive(Clone, Debug)]
pub struct CreateNoo {
    pub id: String,
    pub bar_id: String,
    pub foo_id: String,
}

#[derive(Clone, Debug)]
pub struct DestroyNoo {
    pub id: String,
    pub bar_id: String,
    pub foo_id: String,
}

#[derive(Clone, Debug)]
pub struct LastNoo {
    pub bar_id: String,
}

fn main() {
    let sys = SystemBuilder::new().name("test").create().unwrap();

    let foo = sys.actor_of::<Foo>("Foo0").unwrap();

    foo.tell(
        CreateNoo {
            id: "Noo0".to_owned(),
            bar_id: "Bar0".to_owned(),
            foo_id: "Foo0".to_owned(),
        },
        None,
    );
    foo.tell(
        CreateNoo {
            id: "Noo1".to_owned(),
            bar_id: "Bar1".to_owned(),
            foo_id: "Foo0".to_owned(),
        },
        None,
    );

    std::thread::sleep(std::time::Duration::from_millis(500));

    foo.tell(
        DestroyNoo {
            id: "Noo0".to_owned(),
            bar_id: "Bar0".to_owned(),
            foo_id: "Foo0".to_owned(),
        },
        None,
    );
    foo.tell(
        DestroyNoo {
            id: "Noo1".to_owned(),
            bar_id: "Bar1".to_owned(),
            foo_id: "Foo0".to_owned(),
        },
        None,
    );

    std::thread::sleep(std::time::Duration::from_millis(500));
}