compio-rs / compio

A thread-per-core Rust runtime with IOCP/io_uring/polling.
MIT License
420 stars 37 forks source link

[Performance] Replace flume-based dispatcher with crossbeam-channel #281

Closed TapGhoul closed 3 months ago

TapGhoul commented 3 months ago

A while back, flume claimed the performance crown for throughput. However, this has not been the case for a while. Under my own benchmarks, I can consistently cause crossbeam to outperform flume. The benchmark here is taken from Crossbeam's own benchmarks - https://github.com/crossbeam-rs/crossbeam/tree/master/crossbeam-channel/benchmarks - and modified slightly to be testable with criterion (variable message count, and an off-by-one fix for allowing that variable number of messages to work).

This change reverts this, removing flume from the dependency tree completely and switching to the existing crossbeam-channel dependency.

Tested with criterion - the only case that flume is competitive here is seq - which is not the pattern used by the Dispatcher, it's closer to spmc, as highlighted here: image

Dependencies for the benchmark:

crossbeam-channel = "0.5.13"
flume = "0.11.0"
quanta = "0.12.3"
criterion = { version = "0.5.1", features = ["async_tokio"] }

quanta here can be replaced with std::time::Instant, but it uses the performance counter.

Benchmark source code ```rs use criterion::{criterion_group, criterion_main, Criterion}; mod message { use std::fmt; const LEN: usize = 1; #[derive(Clone, Copy)] pub(crate) struct Message(#[allow(dead_code)] [usize; LEN]); impl fmt::Debug for Message { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.pad("Message") } } #[inline] pub(crate) fn new(num: usize) -> Message { Message([num; LEN]) } } const THREADS: usize = 4; mod flume { use crate::{message, THREADS}; use quanta::Instant; use std::sync::{Arc, Barrier}; use std::time::Duration; pub fn seq_unbounded(messages: usize) -> Duration { let (tx, rx) = flume::unbounded(); let start = Instant::now(); for i in 0..messages { tx.send(message::new(i)).unwrap(); } for _ in 0..messages { rx.recv().unwrap(); } start.elapsed() } pub fn spsc_unbounded(messages: usize) -> Duration { let (tx, rx) = flume::unbounded(); std::thread::scope(|scope| { let b = Arc::new(Barrier::new(2)); { let b = b.clone(); scope.spawn(move || { b.wait(); for i in 0..messages { tx.send(message::new(i)).unwrap(); } }); } b.wait(); let start = Instant::now(); for _ in 0..messages { rx.recv().unwrap(); } start }).elapsed() } pub fn mpsc_unbounded(messages: usize) -> Duration { let (tx, rx) = flume::unbounded(); std::thread::scope(|scope| { let b = Arc::new(Barrier::new(THREADS + 1)); for _ in 0..THREADS { let b = b.clone(); let tx = tx.clone(); scope.spawn(move || { b.wait(); // Hack to allow criterion to work as intended for i in 0..(messages / THREADS + 1) { tx.send(message::new(i)).unwrap(); } }); } b.wait(); let start = Instant::now(); for _ in 0..messages { rx.recv().unwrap(); } start }).elapsed() } pub fn mpmc_unbounded(messages: usize) -> Duration { let (tx, rx) = flume::unbounded(); std::thread::scope(|scope| { let b = Arc::new(Barrier::new(THREADS * 2 + 1)); for _ in 0..THREADS { let b = b.clone(); let tx = tx.clone(); scope.spawn(move || { b.wait(); // Hack to allow criterion to work as intended for i in 0..(messages / THREADS + 1) { tx.send(message::new(i)).unwrap(); } }); } for _ in 0..THREADS { let b = b.clone(); let rx = rx.clone(); scope.spawn(move || { b.wait(); // Hack to allow criterion to work as intended for _ in 0..(messages / THREADS + 1) { rx.recv().unwrap(); } }); } b.wait(); Instant::now() }).elapsed() } pub fn spmc_unbounded(messages: usize) -> Duration { let (tx, rx) = flume::unbounded(); std::thread::scope(|scope| { let b = Arc::new(Barrier::new(THREADS + 1)); for _ in 0..THREADS { let b = b.clone(); let rx = rx.clone(); scope.spawn(move || { b.wait(); // Hack to allow criterion to work as intended for _ in 0..messages / THREADS { rx.recv().unwrap(); } }); } b.wait(); let start = Instant::now(); for i in 0..messages { tx.send(message::new(i)).unwrap(); } start }).elapsed() } } mod crossbeam { use crate::{message, THREADS}; use quanta::Instant; use std::sync::{Arc, Barrier}; use std::time::Duration; pub fn seq_unbounded(messages: usize) -> Duration { let (tx, rx) = crossbeam_channel::unbounded(); let start = Instant::now(); for i in 0..messages { tx.send(message::new(i)).unwrap(); } for _ in 0..messages { rx.recv().unwrap(); } start.elapsed() } pub fn spsc_unbounded(messages: usize) -> Duration { let (tx, rx) = crossbeam_channel::unbounded(); std::thread::scope(|scope| { let b = Arc::new(Barrier::new(2)); { let b = b.clone(); scope.spawn(move || { b.wait(); for i in 0..messages { tx.send(message::new(i)).unwrap(); } }); } b.wait(); let start = Instant::now(); for _ in 0..messages { rx.recv().unwrap(); } start }).elapsed() } pub fn mpsc_unbounded(messages: usize) -> Duration { let (tx, rx) = crossbeam_channel::unbounded(); std::thread::scope(|scope| { let b = Arc::new(Barrier::new(THREADS + 1)); for _ in 0..THREADS { let b = b.clone(); let tx = tx.clone(); scope.spawn(move || { b.wait(); // Hack to allow criterion to work as intended for i in 0..(messages / THREADS + 1) { tx.send(message::new(i)).unwrap(); } }); } b.wait(); let start = Instant::now(); for _ in 0..messages { rx.recv().unwrap(); } start }).elapsed() } pub fn mpmc_unbounded(messages: usize) -> Duration { let (tx, rx) = crossbeam_channel::unbounded(); std::thread::scope(|scope| { let b = Arc::new(Barrier::new(THREADS * 2 + 1)); for _ in 0..THREADS { let b = b.clone(); let tx = tx.clone(); scope.spawn(move || { b.wait(); // Hack to allow criterion to work as intended for i in 0..(messages / THREADS + 1) { tx.send(message::new(i)).unwrap(); } }); } for _ in 0..THREADS { let b = b.clone(); let rx = rx.clone(); scope.spawn(move || { b.wait(); // Hack to allow criterion to work as intended for _ in 0..(messages / THREADS + 1) { rx.recv().unwrap(); } }); } b.wait(); Instant::now() }).elapsed() } pub fn spmc_unbounded(messages: usize) -> Duration { let (tx, rx) = crossbeam_channel::unbounded(); std::thread::scope(|scope| { let b = Arc::new(Barrier::new(THREADS + 1)); for _ in 0..THREADS { let b = b.clone(); let rx = rx.clone(); scope.spawn(move || { b.wait(); // Hack to allow criterion to work as intended for _ in 0..messages / THREADS { rx.recv().unwrap(); } }); } b.wait(); let start = Instant::now(); for i in 0..messages { tx.send(message::new(i)).unwrap(); } start }).elapsed() } } fn bench(c: &mut Criterion) { let mut g = c.benchmark_group("channel-impls"); g.bench_function("flume seq", |b| { b.iter_custom(|c| flume::seq_unbounded(c as usize)) }); g.bench_function("flume spsc", |b| { b.iter_custom(|c| flume::spsc_unbounded(c as usize)) }); g.bench_function("flume mpsc", |b| { b.iter_custom(|c| flume::mpsc_unbounded(c as usize)) }); g.bench_function("flume mpmc", |b| { b.iter_custom(|c| flume::mpmc_unbounded(c as usize)) }); g.bench_function("flume spmc", |b| { b.iter_custom(|c| flume::spmc_unbounded(c as usize)) }); g.bench_function("crossbeam seq", |b| { b.iter_custom(|c| crossbeam::seq_unbounded(c as usize)) }); g.bench_function("crossbeam spsc", |b| { b.iter_custom(|c| crossbeam::spsc_unbounded(c as usize)) }); g.bench_function("crossbeam mpsc", |b| { b.iter_custom(|c| crossbeam::mpsc_unbounded(c as usize)) }); g.bench_function("crossbeam mpmc", |b| { b.iter_custom(|c| crossbeam::mpmc_unbounded(c as usize)) }); g.bench_function("crossbeam spmc", |b| { b.iter_custom(|c| crossbeam::spmc_unbounded(c as usize)) }); } criterion_group!(benches, bench); criterion_main!(benches); ```
TapGhoul commented 3 months ago

Just marking this draft while I improve the benchmark accuracy

George-Miao commented 3 months ago

We need asyncness brought by flume. Performance is not the problem here.

TapGhoul commented 3 months ago

... I can't believe I missed that. Still, good practice learning to benchmark on my end. I'll close this!