Closed th7nder closed 1 year ago
As we don't want to introduce queuing mechanism (offline talk with @mcches), we run into blocking issues with this PR. Localhost CAN busy loop:
#[test]
fn localhost_communication_does_not_block_simulation() -> Result {
if std::env::var("RUST_LOG").is_err() {
std::env::set_var("RUST_LOG", "trace");
}
tracing_subscriber::fmt::init();
let mut sim = Builder::new().build();
sim.client("server", async {
let listener = TcpListener::bind((Ipv4Addr::LOCALHOST, PORT)).await?;
tokio::spawn(async move {
if let Ok((mut stream, _)) = listener.accept().await {
loop {
let _ = stream.write_u8(1).await;
assert_eq!(2, stream.read_u8().await.unwrap());
}
}
});
let mut stream = TcpStream::connect((Ipv4Addr::LOCALHOST, PORT)).await?;
loop {
assert_eq!(1, stream.read_u8().await?);
stream.write_u8(2).await?;
}
});
assert!(!sim.step()?);
The issue is that writing on stream does not yield to
pub(crate) fn tick(&mut self, duration: Duration) -> Result<bool> {
self.tokio.block_on(async {
self.local
.run_until(async {
sleep(duration).await;
})
.await
});
A proposed solution is to implement a simpler channel for the Turmoil use case, example (courtesy of @mcches):
#[test]
fn busy_bee() -> Result {
let mut sim = Builder::new().build();
sim.client("c1", async {
let tx1 = Arc::new(Notify::new());
let rx1 = tx1.clone();
let tx2 = Arc::new(Notify::new());
let rx2 = tx2.clone();
tokio::spawn(async move {
loop {
rx1.notified().await;
tx2.notify_waiters();
}
});
loop {
tx1.notify_waiters();
rx2.notified().await;
}
});
assert!(!sim.step()?);
Ok(())
}
However I don't have time to do that, so closing this PR and giving someone else a chance to implement it. Minor changes will come in separate PRs.
79