tokio-rs / loom

Concurrency permutation testing tool for Rust.
MIT License
2.08k stars 110 forks source link

not all case was run #362

Open feyleth opened 1 month ago

feyleth commented 1 month ago

in this case not all branch was pass

#[cfg(loom)]
use loom::sync::{Arc, Mutex};

#[cfg(not(loom))]
use std::sync::{Arc, Mutex};
use tokio::sync::broadcast;

#[derive(Debug, Clone, PartialEq)]
pub struct NodeValue {
    pub name: String,
}

#[derive(Clone, Debug, PartialEq)]
pub enum Event {
    Name(String),
}

#[derive(Clone)]
pub struct Node {
    value: Arc<Mutex<NodeValue>>,
    broadcast: broadcast::Sender<Event>,
}

impl Node {
    pub fn new() -> Self {
        let (broadcast, _) = broadcast::channel(25);
        Node {
            value: Arc::new(Mutex::new(NodeValue {
                name: "start".to_owned(),
            })),
            broadcast,
        }
    }
    pub fn change_name(&self, name: String) -> &Self {
        let mut node_value = self.value.lock().expect("Faile to get mutex");
        if node_value.name != name {
            node_value.name = name.clone();
            let _ = self.broadcast.send(Event::Name(name));
        }
        self
    }
    pub fn subcribe(&self) -> (NodeValue, broadcast::Receiver<Event>) {
        let node = self.value.lock().expect("Faile to get mutex").clone();
        let subscribe = self.broadcast.subscribe();
        ((node), subscribe)
    }
}

#[test]
#[cfg(loom)]
fn test() {
    use loom::thread;
    loom::model(move || {
        let node = Node::new();

        let clone_node = node.clone();
        let subcribe_thread = thread::spawn(move || clone_node.subcribe());
        thread::spawn(move || {
            node.change_name("change".to_owned());
        })
        .join()
        .unwrap();

        let (new_node, mut events) = subcribe_thread.join().unwrap();

        if new_node.name == "start" {
            assert_eq!(
                new_node,
                NodeValue {
                    name: "start".to_owned(),
                }
            );

            loom::future::block_on(async move {
                assert_eq!(events.recv().await, Ok(Event::Name("change".to_owned())));
            });
        } else {
            assert_eq!(
                new_node,
                NodeValue {
                    name: "change".to_owned(),
                }
            );
        }
    });
}

but if we run same test with shuttle it will failed as expected

#[cfg(all(feature = "shuttle", test))]
use shuttle::sync::{Arc, Mutex};

#[cfg(not(all(feature = "shuttle", test)))]
use std::sync::{Arc, Mutex};
use tokio::sync::broadcast;

#[derive(Debug, Clone, PartialEq)]
pub struct NodeValue {
    pub name: String,
}

#[derive(Clone, Debug, PartialEq)]
pub enum Event {
    Name(String),
}

#[derive(Clone)]
pub struct Node {
    value: Arc<Mutex<NodeValue>>,
    broadcast: broadcast::Sender<Event>,
}

impl Node {
    pub fn new() -> Self {
        let (broadcast, _) = broadcast::channel(25);
        Node {
            value: Arc::new(Mutex::new(NodeValue {
                name: "start".to_owned(),
            })),
            broadcast,
        }
    }
    pub fn change_name(&self, name: String) -> &Self {
        let mut node_value = self.value.lock().expect("Faile to get mutex");
        if node_value.name != name {
            node_value.name = name.clone();
            let _ = self.broadcast.send(Event::Name(name));
        }
        self
    }
    pub fn subcribe(&self) -> (NodeValue, broadcast::Receiver<Event>) {
        let node = self.value.lock().expect("Faile to get mutex").clone();
        let subscribe = self.broadcast.subscribe();
        ((node), subscribe)
    }
}

#[test]
#[cfg(feature = "shuttle")]
fn test() {
    use shuttle::thread;
    shuttle::check_random(
        move || {
            let node = Node::new();

            let clone_node = node.clone();
            let subcribe_thread = thread::spawn(move || clone_node.subcribe());
            thread::spawn(move || {
                node.change_name("change".to_owned());
            })
            .join()
            .unwrap();

            let (new_node, mut events) = subcribe_thread.join().unwrap();

            if new_node.name == "start" {
                assert_eq!(
                    new_node,
                    NodeValue {
                        name: "start".to_owned(),
                    }
                );

                shuttle::future::block_on(async move {
                    assert_eq!(events.recv().await, Ok(Event::Name("change".to_owned())));
                });
            } else {
                assert_eq!(
                    new_node,
                    NodeValue {
                        name: "change".to_owned(),
                    }
                );
            }
        },
        100,
    );
}
mox692 commented 1 month ago

My guess is that if the scheduling of the async task differs between loom::future::block_on and shuttle::future::block_on, it could produce different results.

feyleth commented 1 month ago

the block on might no have impact with that case

if we change loom::future::block_on by one runtime create manually ex : loom

#[test]
#[cfg(loom)]
fn test() {
    use loom::thread;
    let runtime = tokio::runtime::Builder::new_multi_thread().build().unwrap();
    loom::model(move || {
        let node = Node::new();

        let clone_node = node.clone();
        let subcribe_thread = thread::spawn(move || clone_node.subcribe());
        thread::spawn(move || {
            node.change_name("change".to_owned());
        })
        .join()
        .unwrap();

        let (new_node, mut events) = subcribe_thread.join().unwrap();

        if new_node.name == "start" {
            assert_eq!(
                new_node,
                NodeValue {
                    name: "start".to_owned(),
                }
            );

            runtime.block_on(async move {
                assert_eq!(events.recv().await, Ok(Event::Name("change".to_owned())));
            });
        } else {
            assert_eq!(
                new_node,
                NodeValue {
                    name: "change".to_owned(),
                }
            );
        }
    });
}

shuttle:

#[test]
#[cfg(feature = "shuttle")]
fn test() {
    use shuttle::thread;
    let runtime = tokio::runtime::Builder::new_multi_thread().build().unwrap();
    shuttle::check_random(
        move || {
            let node = Node::new();

            let clone_node = node.clone();
            let subcribe_thread = thread::spawn(move || clone_node.subcribe());
            thread::spawn(move || {
                node.change_name("change".to_owned());
            })
            .join()
            .unwrap();

            let (new_node, mut events) = subcribe_thread.join().unwrap();

            if new_node.name == "start" {
                assert_eq!(
                    new_node,
                    NodeValue {
                        name: "start".to_owned(),
                    }
                );

                runtime.block_on(async move {
                    assert_eq!(events.recv().await, Ok(Event::Name("change".to_owned())));
                });
            } else {
                assert_eq!(
                    new_node,
                    NodeValue {
                        name: "change".to_owned(),
                    }
                );
            }
        },
        100,
    );
}

still get same result

loom doesn't run this case:

    pub fn subcribe(&self) -> (NodeValue, broadcast::Receiver<Event>) {
        let node = self.value.lock().expect("Faile to get mutex").clone();
        // run the thread that change_name at this moment
        let subscribe = self.broadcast.subscribe();
        ((node), subscribe)
    }

that does the receiver is create after the message was send so receiver doesn't have that message (normal but doesn't run this loom)