Closed haguettaz closed 1 week ago
Available on crate feature sync only
Another option is to use broadcast
instead of mpsc
.
This is used to send many values from many producers to many consumers.
use tokio::sync::broadcast;
#[tokio::main]
async fn main() {
let (tx, mut rx1) = broadcast::channel(16);
let mut rx2 = tx.subscribe();
tokio::spawn(async move {
assert_eq!(rx1.recv().await.unwrap(), 10);
assert_eq!(rx1.recv().await.unwrap(), 20);
});
tokio::spawn(async move {
assert_eq!(rx2.recv().await.unwrap(), 10);
assert_eq!(rx2.recv().await.unwrap(), 20);
});
tx.send(10).unwrap();
tx.send(20).unwrap();
}
Minimal working example of computational units broadcasting messages.
use tokio::sync::broadcast;
use tokio::task;
use tokio::time::{sleep, Duration};
use std::sync::Arc;
#[tokio::main]
async fn main() {
// Create a broadcast channel with a buffer size of 16
let (tx, _) = broadcast::channel(16);
// Create a barrier to synchronize the units
let barrier = Arc::new(tokio::sync::Barrier::new(3));
// Spawn computational units
let mut handles = vec![];
for i in [2,3,5].iter() {
let tx = tx.clone();
let mut rx = tx.subscribe();
let barrier = barrier.clone();
let handle = task::spawn(async move {
let mut result = 0;
loop {
// Check for new messages
match rx.try_recv() {
Ok(msg) => println!("Unit {} received new message: {}", i, msg),
Err(broadcast::error::TryRecvError::Empty) => {
// No message available, continue with computation
println!("Unit {} is doing computation", i);
sleep(Duration::from_millis(100)).await;
// Perform some computation
result += 1; // Example computation
// If computation yields a specific value, send a message
if result % i == 0 {
println!("Unit {} sends new message from result {}", i, result);
tx.send(1).unwrap();
}
}
Err(broadcast::error::TryRecvError::Closed) => break,
Err(broadcast::error::TryRecvError::Lagged(_)) => {
println!("Unit {} lagged", i);
}
}
// Synchronize with other units
barrier.wait().await;
}
});
handles.push(handle);
}
// Wait for all tasks to complete
for handle in handles {
handle.await.unwrap();
}
}
Mode of Operation
min_delay
tau0
. The message buffer should be able to containnum_neurons
spikes. The received spikes are stored in the local memory (=inputs
)