Pauan / rust-signals

Zero-cost functional reactive Signals for Rust
MIT License
675 stars 37 forks source link

Fully working examples of how Mutable can be used to notify observers when changed. #57

Open CraigBuilds opened 2 years ago

CraigBuilds commented 2 years ago

The tutorial explains the concept of Mutable, but not how to actually run the code so the callbacks get called. After some trial and error, and a bit of stack overflow, I have come up with the following. Feel free to put this in an examples or test folder if this is how it is actually intended to be used.

use futures_signals::signal::Mutable;
use futures_signals::signal::SignalExt; //for Iterator trait (gives for_each)
use futures::executor::LocalPool;
use std::thread;
use futures::join;

fn main(){

    //create my_state, and a clone that will be moved to the thread
    let my_state = Mutable::new(5);
    let my_state_shared = my_state.clone();

    //increment my_state by 1 in a loop, until it reaches 10
    thread::spawn(move || {
        loop {
            my_state_shared.set(my_state_shared.get() + 1);
            thread::sleep(std::time::Duration::from_secs(1));
            if my_state_shared.get() == 10 {
                break;
            }
        } //my_state_shared dropped here
    });

    //create observers 
    let obs_a_future = my_state.signal().for_each(|value| {
        println!("Observer A {}", value);
        async {}
    });
    let obs_b_future = my_state.signal().for_each(|value| {
        println!("Observer B {}", value);
        async {}
    });

    drop(my_state); //decrement ref count by one. (my_state_shared is still active)

    //run the app until my_state_shared is dropped.
    let mut pool = LocalPool::new();
    pool.run_until( async {
        join!(obs_a_future, obs_b_future);
    });

    println!("!");

}
Pauan commented 2 years ago

There are many different ways of running Futures. You chose to use LocalPool::run_until, which is fine, but there are plenty of other ways:

use futures::executor::LocalPool;
use futures::task::LocalSpawnExt;

let spawner = LocalPool::new().spawner();

spawner.spawn_local(async move {
    some_signal.for_each(|value| {
        // ...
        async {}
    }).await;
});
use futures::executor::ThreadPool;

let pool = ThreadPool::new().unwrap();

pool.spawn_ok(async move {
    some_signal.for_each(|value| {
        // ...
        async {}
    }).await;
});
use futures::executor::block_on;

block_on(async move {
    some_signal.for_each(|value| {
        // ...
        async {}
    }).await;
});
smoll::block_on(async move {
    some_signal.for_each(|value| {
        // ...
        async {}
    }).await;
});
wasm_bindgen_futures::spawn_local(async move {
    some_signal.for_each(|value| {
        // ...
        async {}
    }).await;
});
use async_std::task;

task::spawn(async move {
    some_signal.for_each(|value| {
        // ...
        async {}
    }).await;
});
smol::spawn(async move {
    some_signal.for_each(|value| {
        // ...
        async {}
    }).await;
});
tokio::spawn(async move {
    some_signal.for_each(|value| {
        // ...
        async {}
    }).await;
});
#[wasm_bindgen(start)]
pub async main_js() -> Result<(), JsValue> {
    some_signal.for_each(|value| {
        // ...
        async {}
    }).await;

    Ok(())
}
#[async_std::main]
async fn main() {
    some_signal.for_each(|value| {
        // ...
        async {}
    }).await;
}
#[tokio::main]
async fn main() {
    some_signal.for_each(|value| {
        // ...
        async {}
    }).await;
}

futures-signals doesn't handle Future spawning at all, instead that's handled by other crates (futures, tokio, async-std, smol, wasm-bindgen-futures, etc.)

Because there are so many different ways to spawn, it's difficult to cover all of them.

CraigBuilds commented 2 years ago

I see. It's impressive how flexible this crate is. I still think it may be useful to have a few examples with common runtimes to demonstrate to new comers like me how to run the futures.

I understand that "how to run a future" is a topic outside of the scope of this project. The Rust Book or the tutorials for the crates you demonstrated above would be a better place to find that. But the slight nuances of using this library within an executor could be demonstrated somewhere. For example, I didn't know that all references to the mutable had to be dropped before the future it created is marked as complete.

But maybe not. Before trying out this crate I have never used rust futures (but I have in C++ and Python). If you disagree with the need for examples, please close this issue :)

Pauan commented 2 years ago

I agree that examples are good, I'm just saying that it's hard to do, because of how complex the Rust Futures ecosystem is.

For example, I didn't know that all references to the mutable had to be dropped before the future it created is marked as complete.

That's quite normal for any sort of push-based stream. For example, mpsc Streams also behave like that.

As long as you have a reference to the Mutable, it's possible to change the value, and so the receiver has to assume that more changes might happen.

But when all references to the Mutable are dropped, then it knows that it's impossible for it to change, and so the receiver will stop listening.

What if you don't want to wait for the Mutable to drop? What if you want to stop listening earlier? In that case you can cancel the Future by using abortable:

let (future, handle) = abortable(some_signal.for_each(|value| {
    // ...
    async {}
}));

spawn_future_somehow(future);

Now you can call handle.abort(); to stop the Future, even if the Mutable still exists. The abortable function works with all Futures, all Futures are cancellable.

You can also use something like select, which will run 2 Futures in parallel, and as soon as 1 of the Futures is finished, it will then cancel the other unfinished Future:

spawn_future_somehow(async move {
    let result = select(
        async move {
            some_signal.for_each(|value| {
                // ...
                async {}
            }).await;
        },

        async move {
            sleep(1000.0).await;
        },
    ).await;
});

This will run the some_signal.for_each(...) Future and the sleep(1000.0) Future in parallel. After 1 second, the sleep Future will finish, and then it will cancel the for_each Future.

This is essentially a timeout: it will listen for values on some_signal, but after 1 second it will stop listening.

And just like abortable, this works on any Future, so every Future in Rust supports timeouts. Various crates like async-std have a timeout function which behaves just like the select code.

Also, Signals support a wait_for method, which returns a Future. That Future will wait until the Signal has that specific value, and then it will stop:

spawn_future_somehow(async move {
    some_signal.wait_for(5).await;
    // Now we know that some_signal is 5
});

In the above code, the wait_for will wait for some_signal to be 5. Once it becomes 5, the Future will complete, and it will then run the rest of the code.

Of course you can combine these in various ways... for example you can do something like this:

let running = Mutable::new(true);
let my_state = Mutable::new(...);

spawn_future_somehow({
    let running = running.clone();
    let my_state = my_state.clone();

    async move {
        let _ = select(
            async move {
                my_state.signal_cloned().for_each(...).await;
            },

            async move {
                running.signal().wait_for(false).await;
            },
        ).await;
    }
});

This will listen to changes to my_state, however if you do running.set(false); it will then stop listening.

This happens because select runs the for_each and wait_for Futures in parallel. The wait_for Future will wait until running is false.

After you set running to false, the wait_for Future will complete. And because one of the Futures completed, the select will then cancel the unfinished Future (which is my_state.for_each).

nothingIsSomething commented 2 years ago

How can I group the signals so I can update mutable values with the methods of a struct?, the examples have helped me a lot, thanks!

 async fn render(state:  Self) {.
   // do I have to clone the entire struct?, How to do it? , isn't it inefficient?
   // let state_clone_1 = state.clone(); //error!
    let state_clone_2 = state.is_running.clone();
    let state_clone_3 = state.game.score.clone();

    let obs_if_is_running = state_clone_2.signal().for_each(move |value| {
        println!("Observer A {}", value);

        state.start(true);
        async {}
    });

    let obs_game_score = state_clone_3.signal().for_each(move |value| { //<- error, used of moved value 'state'
        println!("Observer B {}", value);

        state.set_game_score(222);
        async {}
    });

    tokio::spawn(async move {
        obs_if_is_running.await;
    });
    tokio::spawn(async move {
        obs_game_score.await;
    });

    }
Pauan commented 2 years ago

@nothingIsSomething Generally you will have a struct which contains all of your Mutables, and then you wrap that struct in Arc:

struct Foo {
    is_running: Mutable<bool>,
    score: Mutable<usize>,
}

impl Foo {
    fn new() -> Arc<Self> {
        Arc::new(Self {
            is_running: Mutable::new(true),
            score: Mutable::new(0),
        })
    }

    fn start(&self, value: bool) {
        // ...
    }

    fn set_game_score(&self, value: usize) {
        // ...
    }

    fn render(state: Arc<Self>) {
        let state = state.clone();

        // ...
    }
}

Cloning an Arc is very cheap, it's just incrementing an integer.