/// Send a message to the event sender.
/// If the message queue is full, the function will return the message.
/// If the receiver is dropped, the function will return an error.
///
/// # Return
///
/// - [Ok(None)] if the message is sent.
/// - [Ok(Some(T))] if the message queue is full.
/// - [Err(T)] if the receiver is dropped.
pub fn send_non_blocking<T>(
sender: &tokio::sync::mpsc::Sender<T>,
message: T,
) -> Result<Option<T>, T> {
match sender.try_send(message) {
Ok(()) => Ok(None),
Err(tokio::sync::mpsc::error::TrySendError::Full(message)) => Ok(Some(message)),
Err(tokio::sync::mpsc::error::TrySendError::Closed(message)) => Err(message),
}
}
#[cfg(test)]
mod test_tokio_send {
use super::*;
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_send_non_blocking() {
let (sender, mut receiver) = tokio::sync::mpsc::channel(1);
let message = "Hello world";
const N: usize = 1024;
tokio::spawn(async move {
for _ in 0..N {
let mut temp_message = Some(message);
loop {
tokio::task::yield_now().await;
match send_non_blocking(&sender, temp_message.take().unwrap()).unwrap() {
None => break,
Some(message) => {
temp_message = Some(message);
}
}
}
}
});
// Check that the receiver received all the messages
for _ in 0..N {
assert_eq!(receiver.recv().await.unwrap(), message);
}
assert!(receiver.recv().await.is_none(), "sender should drop");
}
}
Cargo.toml:
[package]
name = "test_tokio_crash"
version = "0.1.0"
edition = "2021"
[dependencies]
tokio = { version = "1.27", features = ["full"] }
Code
src/lib.rs
:Cargo.toml
:Meta
rustc --version --verbose
:Error output
Backtrace
```
```