crossbeam-rs / crossbeam

Tools for concurrent programming in Rust
Apache License 2.0
7.49k stars 470 forks source link

crossbeam-channel has better performance with binded CPU ? #1104

Closed lluckydog closed 7 months ago

lluckydog commented 7 months ago

I'm very impressed about the performance in the crossbeam library. However, when I bind CPU to the thread, the channel seems to perform poorly. I wonder if I mistake with anything? The result is listed below:

$ python analyse.py bean.log
mean_diff(s):  1.144686126e-06
var_diff:  2.2268946826017946e-12
max_diff:  4.2315e-05
min_diff:  3e-07
$ python analyse.py bean_bind.log
mean_diff(s):  0.000460610578848
var_diff:  4.490146877390725e-08
max_diff:  0.000723233
min_diff:  1.79e-07

the code is like follows:

use std::io::BufWriter;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::{io::Write, thread};
use std::fs::{File, OpenOptions};
use crossbeam_channel::bounded;
use crossbeam_channel::TryRecvError;
use x86::time::rdtsc;
use core_affinity;

pub fn test_beam() {
    let (tx, rx) = bounded(8192);
    let (logger_tx, logger_rx) = bounded(8192);
    let signal = Arc::new(AtomicBool::new(false));

    let logger1_tx = logger_tx.clone();
    let signal1 = signal.clone();
    let handler = thread::spawn(move || {
        signal1.store(true, Ordering::SeqCst);

        loop {
            match rx.try_recv() {
                Ok(msg) => {
                    let time = unsafe {
                        rdtsc()
                    };

                    logger1_tx.send(format!("rece index {:?} at time {:?}\n", msg, time)).unwrap();

                },
                Err(TryRecvError::Disconnected)=> {
                    break;
                },
                Err(TryRecvError::Empty) => {
                    continue;
                },
            }
        }
        println!("handler break!");
    });

    let logger2_tx = logger_tx.clone();
    let signal2 = signal.clone();
    let sender = thread::spawn(move || {
        while !signal2.load(Ordering::Acquire) {}

        for i in 0..1000000 {
            let time: u64 = unsafe {
                rdtsc()
            };

            logger2_tx.send(format!("send index {:?} at time {:?}\n", i, time)).unwrap();
            tx.send(i).unwrap();
        }
        println!("sender break!");
    });

    let writer = thread::spawn(move || {
        let file = OpenOptions::new()
            .append(true)
            .create(true)
            .open("bean.log").expect("create file fail");

        let mut  writer: BufWriter<File> = BufWriter::new(file);

        loop {
            match logger_rx.try_recv() {
                Ok(data) => {
                    writer.write_all(data.as_bytes()).expect("write log fail");
                },
                Err(TryRecvError::Disconnected) => {break;},
                Err(TryRecvError::Empty) => {writer.flush().expect("flush log fail");},
            }
        }
        println!("writer break!");
    });

    drop(logger_tx);
    writer.join().unwrap();
    sender.join().unwrap();
    handler.join().unwrap();
}

pub fn test_beam_bind() {
    let (tx, rx) = bounded(8192);
    let (logger_tx, logger_rx) = bounded(8192);
    let cord_ids = core_affinity::get_core_ids().unwrap();
    let core_id_handler = cord_ids[0];
    let core_id_writer = cord_ids[1];
    let core_id_sender = cord_ids[2];
    let signal = Arc::new(AtomicBool::new(false));

    let logger1_tx = logger_tx.clone();
    let signal1 = signal.clone();
    let handler = thread::spawn(move || {

        core_affinity::set_for_current(core_id_handler);
        signal1.store(true, Ordering::SeqCst);

        loop {
            match rx.try_recv() {
                Ok(msg) => {
                    let time = unsafe {
                        rdtsc()
                    };

                    logger1_tx.send(format!("rece index {:?} at time {:?}\n", msg, time)).unwrap();

                },
                Err(TryRecvError::Disconnected)=> {
                    break;
                },
                Err(TryRecvError::Empty) => {
                    continue;
                },
            }
        }
        println!("handler break!");
    });

    let logger2_tx = logger_tx.clone();
    let signal2 = signal.clone();
    let sender = thread::spawn(move || {

        core_affinity::set_for_current(core_id_sender);

        while signal2.load(Ordering::Acquire) == false {}

        for i in 0..1000000 {
            let time: u64 = unsafe {
                rdtsc()
            };

            logger2_tx.send(format!("send index {:?} at time {:?}\n", i, time)).unwrap();
            tx.send(i).unwrap();
        }
        println!("sender break!");
    });

    let writer = thread::spawn(move || {

        core_affinity::set_for_current(core_id_writer);

        let file = OpenOptions::new()
            .append(true)
            .create(true)
            .open("bean_bind.log").expect("create file fail");

        let mut  writer: BufWriter<File> = BufWriter::new(file);

        loop {
            match logger_rx.try_recv() {
                Ok(data) => {
                    writer.write_all(data.as_bytes()).expect("write log fail");
                },
                Err(TryRecvError::Disconnected) => {break;},
                Err(TryRecvError::Empty) => {writer.flush().expect("flush log fail");},
            }
        }
        println!("writer break!");
    });

    drop(logger_tx);
    writer.join().unwrap();
    sender.join().unwrap();
    handler.join().unwrap();
}
caelunshun commented 7 months ago

One possibility is that core_affinity is returning one CoreId for each CPU thread. On a typical CPU with SMT, this means 2 CoreIds per physical CPU. As a result, by scheduling the three threads on the first three CoreIds, two of them end up running on the same CPU and contending with each other's resources.

You could try to spread out your core choices (e.g. cores 0, 2, and 4) and see if that fixes the issue.

caelunshun commented 7 months ago

Also, if you have one of the newer Intel CPUs with the P/E architecture, make sure you're binding to the P cores, not the E cores.

lluckydog commented 7 months ago

Thank you for your kind reply. I found the CPU with SMT in my situation, and your first advice helped me a lot!