NimonSour / interthread

Apache License 2.0
14 stars 2 forks source link

include methods with `self` receivers #8

Open NimonSour opened 1 month ago

NimonSour commented 1 month ago

Hi @guydols ! Thanks for recently starring this crate!

Just wanted to share this example and I open this issue (consider yours) as a continuation of our earlier conversation this year.

Here is a little example that I made for the model, under certain conditions ( actor option debut is on ) it is safe to take ownership of the actor.

// cargo add oneshot

use std::sync::{mpsc,Arc,Mutex};
use std::time::{Duration,SystemTime};
use std::thread::{spawn,sleep};

pub struct Actor {
    pub value: i8,
}
impl Actor {
    pub fn new(v: i8) -> Self {
        Self { value: v }
    }
    pub fn increment(&mut self) {

        // sleep for a while
        // just to make sure the channel 
        // has messages to process in it 
        // it's important to imitate this condition
        // when we call the non-blocking 'stop',
        // mpsc::Receiver is droped when the last 
        // mpsc::Sender is droped, but before it will receive
        // all the messages  

        sleep(Duration::from_secs(1));

        self.value += 1;
    }
}

pub enum ActorScript {
    Increment {},
    // we add a special variant
    Stop{ debut: Arc<SystemTime>, inter_send: oneshot::Sender<Option<Actor>> }
}

impl ActorScript {

    pub fn debut() -> Arc<SystemTime> {
        static LAST: Mutex<SystemTime> = Mutex::new(
            SystemTime::UNIX_EPOCH,
        );
        let mut last_time = LAST.lock().unwrap();
        let mut next_time = SystemTime::now();
        while !(*last_time < next_time) {
            if *last_time == next_time {
                next_time += Duration::new(0, 1);
            } else {
                next_time = SystemTime::now();
            }
        }
        *last_time = next_time.clone();
        Arc::new(next_time)
    }

    pub fn direct(self, actor: &mut Actor) {
        match self {
            ActorScript::Increment {} => {
                actor.increment();
            }
            _ => (),
        }
    }

    pub fn play(
        receiver: mpsc::Receiver<ActorScript>,
        mut actor: Actor,
        debut: SystemTime,
    ) {
        while let std::result::Result::Ok(msg) = receiver.recv() {
            if let ActorScript::Stop {debut: live_debut, inter_send} = msg {

                // we check the time on debuts
                // this is second check first check 
                // occures in Live::stop 
                if debut == *live_debut{
                    // if there's only one instance of Live 
                    // it's safe to retun the Actor and drop the mpsc::Receiver
                    if !(Arc::strong_count(&live_debut) > 2) {
                        let _ = inter_send.send(Some(actor));
                        return ;
                    }
                }
                let _ = inter_send.send(None);

            } else {
                msg.direct(&mut actor);
            }
        }
    }
}

#[derive(Clone)]
pub struct ActorLive {
    sender: mpsc::Sender<ActorScript>,
    pub debut: Arc<SystemTime>,
    pub name: String,
}
impl ActorLive {
    pub fn new(v: i8) -> Self {
        let actor = Actor::new(v);
        let (sender, receiver) = mpsc::channel();
        let debut = ActorScript::debut();
        let debut_for_play = *Arc::clone(&debut);
        spawn(move || {
            ActorScript::play(receiver, actor, debut_for_play)
        });
        Self {
            debut: Arc::clone(&debut),
            name: format!("{:?}", * debut),
            sender,
        }
    }
    pub fn increment(&mut self) {
        let msg = ActorScript::Increment {};
        let _ = self
            .sender
            .send(msg)
            .expect("'ActorLive::method.send'. Channel is closed!");
    }

    // /*
    // will block untill all the messages 
    // are processed and then return the actor 
    pub fn stop(self) -> Option<Actor> {
        // since the method is self consuming 
        // no instances of mpsc::Sender can be cloned
        // while we block on this nor after the call

        // we check the strong count 
        if !Arc::strong_count(&self.debut) > 1 {
            let (inter_send, inter_recv) = oneshot::channel();
            let debut = Arc::clone(&self.debut);
            let msg = ActorScript::Stop{ debut, inter_send };
            let _ = self.sender.send(msg);

            // actually we could return the oneshot::Receiver itself
            // and it will be safe still as the method consumes
            // the Live instance ( comment this and uncomment non-blocking version)
            return  inter_recv.recv().unwrap();
        } 

        None
    }
    //  */

    /*   
    // non-blocking version

    pub fn stop(self) -> oneshot::Receiver<Option<Actor>> {
        let (inter_send, inter_recv) = oneshot::channel();
        let debut = Arc::clone(&self.debut);
        let msg = ActorScript::Stop{ debut, inter_send };
        let _ = self.sender.send(msg);

        inter_recv
    }
    // */

}

fn main() {

    let actor_live = ActorLive::new(0);

    // /*
    // blocking
    if let Some(actor) = actor_live.stop() {
        assert_eq!(1,actor.value)
    }
    //  */

    /*
    // non-blocking version
    for _ in 0..10 {
        actor_live.increment();
    }
    if let Some(actor) = actor_live.stop().recv().expect("Didn't receive") {

        assert_eq!(10,actor.value)

    } else { eprintln!("-> None");}
    //  */

}

there could be a resolution with 'self' receivers (as function signature not channel) ..

Conditions: 1) the model is not Clone ( one Live instance only ) 2) the Live methods return oneshot::Receivers ... It may happen! anyway, just have a look at the example when it's possible, what you think ? 👍

NimonSour commented 1 month ago

Hi again @guydols! I'm working on Generics and Traits within the Model at the moment. At the time you stared the crate I was experimenting with few cases when it's possible to return the actor safely, so I thought to write a little example to share with you, as it is directly concerning the matter we've discussed ( self receivers in function signatures ), but while writting the example I've relised such an Actor Model may be really useful.

If generated methods for ActorLive are blocking (returning the type) the Model is obsolete a much slower and double expensive RwLock'ed object.. but if the methods will return oneshot::Receivers ( as with oplion interact) than that's a different story..

All that has to be done is to make sure that the model is not Clone if self or mut self detected in method receivers.

Oneshot::Receiver has methods:

So a very short calling time, every party will recv() on its own..

So far so good.

But it kind of like ~brakes what I was working on earlier, here are some versions that I've been experimenting with ( as it adds up so quick so much would be nice to share it just to make sure I'm not going into wrong dirrection ):

It does look like it pollutes the code logic in original Actor, but wait untill one gets into scaling, worst enemy for clear logic and clean code.

The signatures don't have to match 100% they could have additional arguments and return types, as long as they include required types and the model can access them (in last two cases).

So to recap, my early guess is that the the macro should be able to watch for at least these three cases in case of self receivers and generate a normal Model, otherwise generate a non-clone version.

All I've stated above it's just a raw idea public speaking, anyone who spoted some logic loop holes PLEASE express them, it may save tones of dev time !

NimonSour commented 1 month ago

We will regenerate for ActorLive all public self-consuming Actor methods, but will recosider their visibility.

ActorLive can be allowed to consume the Actor (and itself) if it is the owner of the Actor. We will decide on its ownership at runtime, on the condition that there is no other potential owner (no other owner or no other).

Two conditions for generating a public self-consuming method for ActorLive:

1) If the model is debut, we can use Arc::strongcount to count the references and decide ownership at runtime. 2) If the method's return type is `Option<>orResult<_>(I will mention some ofResult's limitations below), we can inject ano other owner` check. Otherwise, we'll change the method's visibility to private.

Implementation

The model will generate an additional private method, fn inter_play_stop(self) -> (Actor, Receiver, Sender) or return (Actor, Receiver, Sender, debut) (if debut), with a corresponding ActorScript::InterPlayStop variant that is matched in play, returning (Actor, Receiver). Additionally, (not implemented in this example) there could be an equality check for debut (SystemTime), panicking if false.

// cargo add oneshot 

use std::sync::{mpsc,Arc,Mutex};
use std::time::{Duration,SystemTime};
use std::thread::{spawn,sleep};

pub struct Actor {
    pub value: i8,
}
impl Actor {
    pub fn new(v: i8) -> Self {
        Self { value: v }
    }
    pub fn increment(&mut self) {
        sleep(Duration::from_secs(1));
        self.value += 1;
    }

    pub fn public<T:ToString>(self, arg: T ) -> Option<String> 
    { Some(arg.to_string()) }

    pub fn private<T:ToString>(self, arg: T ) -> String 
    { arg.to_string() }
}

pub enum ActorScript {
    Increment {},
    // we add a special variant
    InterPlayStop {  inter_send: oneshot::Sender<(Actor,mpsc::Receiver<ActorScript>)> }
}

impl ActorScript {

    pub fn debut() -> Arc<SystemTime> {
        static LAST: Mutex<SystemTime> = Mutex::new(
            SystemTime::UNIX_EPOCH,
        );
        let mut last_time = LAST.lock().unwrap();
        let mut next_time = SystemTime::now();
        while !(*last_time < next_time) {
            if *last_time == next_time {
                next_time += Duration::new(0, 1);
            } else {
                next_time = SystemTime::now();
            }
        }
        *last_time = next_time.clone();
        Arc::new(next_time)
    }

    pub fn direct(self, actor: &mut Actor) {
        match self {
            ActorScript::Increment {} => {
                actor.increment();
            }
            _ => (),
        }
    }

    pub fn play(
        receiver: mpsc::Receiver<ActorScript>,
        mut actor: Actor,
        debut: SystemTime,
    )
    {
        while let std::result::Result::Ok(msg) = receiver.recv() {
            if let ActorScript::InterPlayStop { inter_send} = msg {
                    let _ = inter_send.send((actor,receiver));
                    return;
            } else {
                msg.direct(&mut actor);
            }
        }
    }
}

#[derive(Clone)]
pub struct ActorLive {
    sender: mpsc::Sender<ActorScript>,
    pub debut: Arc<SystemTime>,
    pub name: String,
}

impl ActorLive {

    pub fn new(v: i8) -> Self {
        let actor = Actor::new(v);
        let (sender, receiver) = mpsc::channel();
        let debut = ActorScript::debut();
        let debut_for_play = *Arc::clone(&debut);
        spawn(move || {
            ActorScript::play(receiver, actor, debut_for_play)
        });
        Self {
            debut: Arc::clone(&debut),
            name: format!("{:?}", * debut),
            sender,
        }
    }

    pub fn increment(&mut self) {
        let msg = ActorScript::Increment {};
        let _ = self
            .sender
            .send(msg)
            .expect("'ActorLive::method.send'. Channel is closed!");
    }

    fn inter_play_stop(self) -> ( Actor,
                                  mpsc::Receiver<ActorScript>,
                                  mpsc::Sender<ActorScript>,
                                  Arc<SystemTime> )
    {
        let (inter_send, inter_recv) = oneshot::channel();
        let msg = ActorScript::InterPlayStop{ inter_send };
        let _ = self.sender.send(msg);
        let (actor,receiver) = inter_recv.recv().unwrap();

        let sender = self.sender;
        let debut = self.debut;
        (actor,receiver,sender,debut)
    }

    pub fn public<T:ToString>(self, arg: T ) -> Option<String> {

        if Arc::strong_count(&self.debut) <= 1 {
            let ( actor,_,_,_) = self.inter_play_stop();
            return actor.public(arg);
        }
        None
    }

    fn private<T:ToString>(self, arg: T ) -> String {
        let ( actor,_,_,_) = self.inter_play_stop();
        return actor.private(arg);
    }
}

fn main() {
    let act = ActorLive::new(0);
    let act_cl = act.clone();

    assert_eq!(None, act.public(""));
    assert_eq!(Some("Success".to_string()), act_cl.public("Success"));
}

So the methods will be served on the ActorLive end, which is beneficial for the Model:

Objections:

This is not going to happen any time soon, but I'm glad there's some sort of raw resolution on it so I'll make room for it in current dev also I'll possible open two additional parallel issues concerning traits and generic types all three resulting in a new release🤞.

guydols commented 3 weeks ago

Hey @NimonSour,

No problem, I should've starred this repo a year ago. I've been on a trend of using OS threads instead of async runtimes. This create is my go to for the infrastructure between threads.

As for your example from the first post, I still feel under qualified, but I get the gist. I had to think about scenarios where this could be behavior that is wanted. But there are definitely some designs that can benefit from this. I can think of some kind of workers where after they're done can be consumed in this way.

I'm glad that my error last year contributed more than just explanation work for you ;). As for help to think about the logic you've laid out, I see no holes in your theory. Maybe you could share this topic on a few places, that could attract some people more understand of the inner workings of your crate. Either way, to me, it looks like you are on the right track.

As an additional effect, this give programmers using self-consuming method a more common use-after-free error if they unintentionally did not reference self. This would lead to a situation where cargo or rust-analyser gives a clear explanation and a programmer would come to a resolution more quickly.

Anyway, looks like a lot of hard work, I wish I could've been more help to you!