nicholassm / disruptor-rs

Low latency inter-thread communication library in Rust inspired by the LMAX Disruptor.
MIT License
599 stars 17 forks source link

In MPSC setup, crossbeam is more performant #7

Closed Venkat2811 closed 2 months ago

Venkat2811 commented 2 months ago

Hi @nicholassm,

Thanks for your awesome work with disruptor rust port. It was very easy to get started. With a simple 2P1C setup, I noticed that crossbeam is more performant than this library. Am I missing something ?

Results: On my 13-inch, M1, 2020, 16 GB MacBook Pro:

$ make run-disruptor-demo-optimized
cargo run --release --bin disruptor_demo
    Finished release [optimized] target(s) in 0.01s
     Running `target/release/disruptor_demo`
SPSC Sum: 10000000, processed time: 238
MPSC Sum: 20000000, processed time: 709

$ make run-crossbeam-demo-optimized
cargo run --release --bin crossbeam_demo
    Finished release [optimized] target(s) in 0.00s
     Running `target/release/crossbeam_demo`
SPSC Sum: 10000000, processed time: 269
MPSC Sum: 20000000, processed time: 545

Test Script: disruptor_demo.rs

use std::thread;
use std::sync::Arc;
use std::sync::atomic::{AtomicI32, Ordering};
use std::time::Instant;
use disruptor::{build_multi_producer, build_single_producer, BusySpin, Producer};

struct Event {
    val: i32
}

//spsc
fn spsc_example() {
    let buf_size = 32_768;
    let producer_msg_no = 10_000_000;

    let factory = || { Event { val: 0 }}; //to initialize disruptor

    let sink = Arc::new(AtomicI32::new(0)); //bcos we read and print value from main thread
    // Consumer
    let processor = {
        let sink = Arc::clone(&sink);
        move |event: &Event, _sequence: i64, _end_of_batch: bool| {
            sink.fetch_add(event.val, Ordering::SeqCst);
        }
    };

    let mut producer = build_single_producer(buf_size, factory, BusySpin)
        .handle_events_with(
            processor
        )
        .build();

    let start_time = Instant::now();

    // Publish into the Disruptor.
    thread::scope(|s| {
        s.spawn(move || {
            for _ in 0..producer_msg_no {
                producer.publish(|e| {
                    e.val = 1 as i32;
                });
            }
        });
    });

    let d = Instant::now().duration_since(start_time);
    let delta = d.as_millis();

    let sum = sink.load(Ordering::SeqCst);
    println!("SPSC Sum: {}, processed time: {}", sum, delta);
}

//mpsc
fn mpsc_example() {
    let buf_size = 32_768;
    let producer_msg_no = 10_000_000;

    let factory = || { Event { val: 0 }}; //to initialize disruptor

    let sink = Arc::new(AtomicI32::new(0)); //bcos we read and print value from main thread
    // Consumer
    let processor = {
        let sink = Arc::clone(&sink);
        move |event: &Event, _sequence: i64, _end_of_batch: bool| {
            sink.fetch_add(event.val, Ordering::SeqCst);
        }
    };

    let mut producer1 = build_multi_producer(buf_size, factory, BusySpin)
        .handle_events_with(
            processor
        )
        .build();

    let mut producer2 = producer1.clone();

    let start_time = Instant::now();

    // Publish into the Disruptor.
    thread::scope(|s| {
        s.spawn(move || {
            for _ in 0..producer_msg_no {
                producer1.publish(|e| {
                    e.val = 1 as i32;
                });
            }
        });

        s.spawn(move || {
            for _ in 0..producer_msg_no {
                producer2.publish(|e| {
                    e.val = 1 as i32;
                });
            }
        });
    });

    let d = Instant::now().duration_since(start_time);
    let delta = d.as_millis();

    let sum = sink.load(Ordering::SeqCst);
    println!("MPSC Sum: {}, processed time: {}", sum, delta);
}

fn main() {
    spsc_example();
    mpsc_example();
}

crossbeam_demo.rs

use crossbeam::channel::*;
use std::thread::{self, JoinHandle};
use std::time::Instant;
use std::sync::Arc;
use std::sync::atomic::{AtomicI32, Ordering};

fn spsc_example() {
    let buf_size = 32_768;
    let producer_msg_no = 10_000_000;
    let (s, r) = bounded(buf_size);

    let start_time = Instant::now();
    // Producer 
    let t1 = thread::spawn(move || {
        for _ in 0..producer_msg_no {
            s.send(1).unwrap();
        }
    });

    let sink = Arc::new(AtomicI32::new(0)); //bcos we read and print value from main thread
    let sink_clone = Arc::clone(&sink);
    // Consumer
    let c1: JoinHandle<()> = thread::spawn(move || {
        for msg in r {
            let tmp = msg;
            sink_clone.fetch_add(tmp, Ordering::SeqCst);
        }
    });

    let _ = t1.join();
    let _ = c1.join();

    let d = Instant::now().duration_since(start_time);
    let delta = d.as_millis();

    let sum = sink.load(Ordering::SeqCst);
    println!("SPSC Sum: {}, processed time: {}", sum, delta);
}

fn mpsc_example() {
    let buf_size = 32_768;
    let producer_msg_no = 10_000_000;
    let (s, r) = bounded(buf_size);
    let s2 = s.clone();

    let start_time = Instant::now();
    // Producer 1
    let t1 = thread::spawn(move || {
        for _ in 0..producer_msg_no {
            s.send(1).unwrap();
        }
    });

    // Producer 2
    let t2 = thread::spawn(move || {
        for _ in 0..producer_msg_no {
            s2.send(1).unwrap();
        }
    });

    let sink = Arc::new(AtomicI32::new(0)); //bcos we read and print value from main thread
    let sink_clone = Arc::clone(&sink);
    // Consumer
    let c1: JoinHandle<()> = thread::spawn(move || {
        for msg in r {
            let tmp = msg;
            sink_clone.fetch_add(tmp, Ordering::SeqCst);
        }
    });

    let _ = t1.join();
    let _ = t2.join();
    let _ = c1.join();

    let d = Instant::now().duration_since(start_time);
    let delta = d.as_millis();

    let sum = sink.load(Ordering::SeqCst);
    println!("MPSC Sum: {}, processed time: {}", sum, delta);
}

fn main() {
    spsc_example();
    mpsc_example();
}

Repo: https://github.com/Venkat2811/producer-consumer-rust

nicholassm commented 2 months ago

Hi Venkat,

Interesting - thanks for writing the tests.

There are so many aspects that can impact the measurements so I'll get some coffee and read your code + try it on my Intel MacBook. 😀

I never got around to write benchmarks for the MPSC case - your code suggests that it would be a good idea - if there are performance regressions in the multi producer version.

I'll revert.

Kind regards Nicholas

lør. 4. maj 2024 11.59 skrev Venkat Raman @.***>:

Hi @nicholassm https://github.com/nicholassm,

Thanks for your awesome work with disruptor rust port. It was very easy to get started. With a simple 2P1C setup, I noticed that crossbeam is more performant than this library. Am I missing something ?

Results: On my 13-inch, M1, 2020, 16 GB MacBook Pro:

  • disruptor takes, ~230ms in SPSC & ~700ms in MPSC
  • crossbeam takes, ~270ms in SPSC & ~550ms in MPSC

$ make run-disruptor-demo-optimized cargo run --release --bin disruptor_demo Finished release [optimized] target(s) in 0.01s Running target/release/disruptor_demo SPSC Sum: 10000000, processed time: 238 MPSC Sum: 20000000, processed time: 709

$ make run-crossbeam-demo-optimized cargo run --release --bin crossbeam_demo Finished release [optimized] target(s) in 0.00s Running target/release/crossbeam_demo SPSC Sum: 10000000, processed time: 269 MPSC Sum: 20000000, processed time: 545

Test Script: disruptor_demo.rs

use std::thread; use std::sync::Arc; use std::sync::atomic::{AtomicI32, Ordering}; use std::time::Instant; use disruptor::{build_multi_producer, build_single_producer, BusySpin, Producer};

struct Event { val: i32 }

//spsc fn spsc_example() { let buf_size = 32_768; let producer_msg_no = 10_000_000;

let factory = || { Event { val: 0 }}; //to initialize disruptor

let sink = Arc::new(AtomicI32::new(0)); //bcos we read and print value from main thread
// Consumer
let processor = {
    let sink = Arc::clone(&sink);
    move |event: &Event, _sequence: i64, _end_of_batch: bool| {
        sink.fetch_add(event.val, Ordering::SeqCst);
    }
};

let mut producer = build_single_producer(buf_size, factory, BusySpin)
    .handle_events_with(
        processor
    )
    .build();

let start_time = Instant::now();

// Publish into the Disruptor.
thread::scope(|s| {
    s.spawn(move || {
        for _ in 0..producer_msg_no {
            producer.publish(|e| {
                e.val = 1 as i32;
            });
        }
    });
});

let d = Instant::now().duration_since(start_time);
let delta = d.as_millis();

let sum = sink.load(Ordering::SeqCst);
println!("SPSC Sum: {}, processed time: {}", sum, delta);

}

//mpsc fn mpsc_example() { let buf_size = 32_768; let producer_msg_no = 10_000_000;

let factory = || { Event { val: 0 }}; //to initialize disruptor

let sink = Arc::new(AtomicI32::new(0)); //bcos we read and print value from main thread
// Consumer
let processor = {
    let sink = Arc::clone(&sink);
    move |event: &Event, _sequence: i64, _end_of_batch: bool| {
        sink.fetch_add(event.val, Ordering::SeqCst);
    }
};

let mut producer1 = build_multi_producer(buf_size, factory, BusySpin)
    .handle_events_with(
        processor
    )
    .build();

let mut producer2 = producer1.clone();

let start_time = Instant::now();

// Publish into the Disruptor.
thread::scope(|s| {
    s.spawn(move || {
        for _ in 0..producer_msg_no {
            producer1.publish(|e| {
                e.val = 1 as i32;
            });
        }
    });

    s.spawn(move || {
        for _ in 0..producer_msg_no {
            producer2.publish(|e| {
                e.val = 1 as i32;
            });
        }
    });
});

let d = Instant::now().duration_since(start_time);
let delta = d.as_millis();

let sum = sink.load(Ordering::SeqCst);
println!("MPSC Sum: {}, processed time: {}", sum, delta);

}

fn main() { spsc_example(); mpsc_example(); }

crossbeam_demo.rs

use crossbeam::channel::*; use std::thread::{self, JoinHandle}; use std::time::Instant; use std::sync::Arc; use std::sync::atomic::{AtomicI32, Ordering};

fn spsc_example() { let buf_size = 32_768; let producer_msg_no = 10_000_000; let (s, r) = bounded(buf_size);

let start_time = Instant::now();
// Producer
let t1 = thread::spawn(move || {
    for _ in 0..producer_msg_no {
        s.send(1).unwrap();
    }
});

let sink = Arc::new(AtomicI32::new(0)); //bcos we read and print value from main thread
let sink_clone = Arc::clone(&sink);
// Consumer
let c1: JoinHandle<()> = thread::spawn(move || {
    for msg in r {
        let tmp = msg;
        sink_clone.fetch_add(tmp, Ordering::SeqCst);
    }
});

let _ = t1.join();
let _ = c1.join();

let d = Instant::now().duration_since(start_time);
let delta = d.as_millis();

let sum = sink.load(Ordering::SeqCst);
println!("SPSC Sum: {}, processed time: {}", sum, delta);

}

fn mpsc_example() { let buf_size = 32_768; let producer_msg_no = 10_000_000; let (s, r) = bounded(buf_size); let s2 = s.clone();

let start_time = Instant::now();
// Producer 1
let t1 = thread::spawn(move || {
    for _ in 0..producer_msg_no {
        s.send(1).unwrap();
    }
});

// Producer 2
let t2 = thread::spawn(move || {
    for _ in 0..producer_msg_no {
        s2.send(1).unwrap();
    }
});

let sink = Arc::new(AtomicI32::new(0)); //bcos we read and print value from main thread
let sink_clone = Arc::clone(&sink);
// Consumer
let c1: JoinHandle<()> = thread::spawn(move || {
    for msg in r {
        let tmp = msg;
        sink_clone.fetch_add(tmp, Ordering::SeqCst);
    }
});

let _ = t1.join();
let _ = t2.join();
let _ = c1.join();

let d = Instant::now().duration_since(start_time);
let delta = d.as_millis();

let sum = sink.load(Ordering::SeqCst);
println!("MPSC Sum: {}, processed time: {}", sum, delta);

}

fn main() { spsc_example(); mpsc_example(); }

Repo: https://github.com/Venkat2811/producer-consumer-rust

— Reply to this email directly, view it on GitHub https://github.com/nicholassm/disruptor-rs/issues/7, or unsubscribe https://github.com/notifications/unsubscribe-auth/AABJ7DZS5WGQ563UG2CBI7DZASWQZAVCNFSM6AAAAABHGWLYYWVHI2DSMVQWIX3LMV43ASLTON2WKOZSGI3TQOJQGEZDIMY . You are receiving this because you were mentioned.Message ID: @.***>

nicholassm commented 2 months ago

Hi @Venkat2811,

I've read your code and it looks good: A very precise comparison of the two libraries.

I ran your examples on my Quad-Core Intel Core i7 and these are the results (only minor variance over several runs):

Running `target/release/crossbeam_demo`
SPSC Sum: 10000000, processed time: 565
MPSC Sum: 20000000, processed time: 1189

Running `target/release/disruptor_demo`
SPSC Sum: 10000000, processed time: 317
MPSC Sum: 20000000, processed time: 786

As you can see, the Disruptor is much faster in the 2P1C setup.

I don't know much about the M1 processor other than it's absolutely awesome for laptops due to everything being integrated on a single die and having circuits optimized for user applications. Therefore, I don't know how well it will perform for low latency applications that are not for "the human eye". Traditionally, I believe Intel chips are used for low latency and the above results indicate why. However, I must say that I would very much like to test the disruptor library (and compare with Crossbeam) on a beefy Intel server with Linux and all the P and C-states disabled, interrupts isolated to a single core, cores generally isolated, threads pined to cores, etc. I think comparisons like yours would also show more realistic results when being run in an environment like that.

Kind regards, Nicholas

Venkat2811 commented 2 months ago

Hi Nicholas,

Thank you for taking the time to test the benchmark code. Credit goes to this post.

Hope your coffee tasted much better while benchmarking this :)

I'm new to rust, ChatGPT gave me 2 commands to run the rust bin.

When I use the former, I get same results as you shared above on my M1 as well. When using the latter command, crossbeam is still performant. Could you confirm which one did you use ?

I ran benchmarks on my ubuntu workstation (AMD) as well, got similar perf ratio as M1 mac.

My workstation's HW specs:

$ sudo lshw -class processor -class memory
  *-firmware                
       description: BIOS
       vendor: American Megatrends International, LLC.
       physical id: 0
       version: 2.30
       date: 03/06/2023
       size: 64KiB
       capacity: 32MiB
       capabilities: pci upgrade shadowing cdboot bootselect socketedrom edd int13floppynec int13floppytoshiba int13floppy360 int13floppy1200 int13floppy720 int13floppy2880 int5printscreen int9keyboard int14serial int17printer int10video usb biosbootspecification uefi
  *-memory
       description: System Memory
       physical id: 11
       slot: System board or motherboard
       size: 64GiB
     *-bank:0
          description: 3600 MHz (0,3 ns) [empty]
          product: Unknown
          vendor: Unknown
          physical id: 0
          serial: Unknown
          slot: DIMM 0
          clock: 3600MHz (0.3ns)
     *-bank:1
          description: DIMM DDR4 Synchronous Unbuffered (Unregistered) 3600 MHz (0,3 ns)
          product: F4-3600C18-32GTZN
          vendor: Unknown
          physical id: 1
          serial: 00000000
          slot: DIMM 1
          size: 32GiB
          width: 64 bits
          clock: 3600MHz (0.3ns)
     *-bank:2
          description: 3600 MHz (0,3 ns) [empty]
          product: Unknown
          vendor: Unknown
          physical id: 2
          serial: Unknown
          slot: DIMM 0
          clock: 3600MHz (0.3ns)
     *-bank:3
          description: DIMM DDR4 Synchronous Unbuffered (Unregistered) 3600 MHz (0,3 ns)
          product: F4-3600C18-32GTZN
          vendor: Unknown
          physical id: 3
          serial: 00000000
          slot: DIMM 1
          size: 32GiB
          width: 64 bits
          clock: 3600MHz (0.3ns)
  *-cache:0
       description: L1 cache
       physical id: 14
       slot: L1 - Cache
       size: 512KiB
       capacity: 512KiB
       clock: 1GHz (1.0ns)
       capabilities: pipeline-burst internal write-back unified
       configuration: level=1
  *-cache:1
       description: L2 cache
       physical id: 15
       slot: L2 - Cache
       size: 4MiB
       capacity: 4MiB
       clock: 1GHz (1.0ns)
       capabilities: pipeline-burst internal write-back unified
       configuration: level=2
  *-cache:2
       description: L3 cache
       physical id: 16
       slot: L3 - Cache
       size: 32MiB
       capacity: 32MiB
       clock: 1GHz (1.0ns)
       capabilities: pipeline-burst internal write-back unified
       configuration: level=3
  *-cpu
       description: CPU
       product: AMD Ryzen 7 5800X 8-Core Processor
       vendor: Advanced Micro Devices [AMD]
       physical id: 17
       bus info: cpu@0
       version: 25.33.2
       serial: Unknown
       slot: AM4
       size: 2200MHz
       capacity: 4850MHz
       width: 64 bits
       clock: 100MHz
       capabilities: lm fpu fpu_exception wp vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush mmx fxsr sse sse2 ht syscall nx mmxext fxsr_opt pdpe1gb rdtscp x86-64 constant_tsc rep_good nopl nonstop_tsc cpuid extd_apicid aperfmperf rapl pni pclmulqdq monitor ssse3 fma cx16 sse4_1 sse4_2 x2apic movbe popcnt aes xsave avx f16c rdrand lahf_lm cmp_legacy svm extapic cr8_legacy abm sse4a misalignsse 3dnowprefetch osvw ibs skinit wdt tce topoext perfctr_core perfctr_nb bpext perfctr_llc mwaitx cpb cat_l3 cdp_l3 hw_pstate ssbd mba ibrs ibpb stibp vmmcall fsgsbase bmi1 avx2 smep bmi2 erms invpcid cqm rdt_a rdseed adx smap clflushopt clwb sha_ni xsaveopt xsavec xgetbv1 xsaves cqm_llc cqm_occup_llc cqm_mbm_total cqm_mbm_local clzero irperf xsaveerptr rdpru wbnoinvd arat npt lbrv svm_lock nrip_save tsc_scale vmcb_clean flushbyasid decodeassists pausefilter pfthreshold avic v_vmsave_vmload vgif v_spec_ctrl umip pku ospke vaes vpclmulqdq rdpid overflow_recov succor smca fsrm cpufreq
       configuration: cores=8 enabledcores=8 microcode=169873930 threads=16

Yes, I also plan to pin threads to core and do some benchmarks on my workstation. Happy to share the results once done.

Thanks, Venkat

Venkat2811 commented 2 months ago

Running target/release/crossbeam_demo SPSC Sum: 10000000, processed time: 565 MPSC Sum: 20000000, processed time: 1189

Running target/release/disruptor_demo SPSC Sum: 10000000, processed time: 317 MPSC Sum: 20000000, processed time: 786

Ah, you are also running release. Sorry I just noticed. So, looks like the difference comes to processor arch then.

nicholassm commented 2 months ago

Yes, the coffee was great. :-)

Indeed, I also ran the optimized version (using your Makefile).

I see your desktop sports an AMD processor. I know AMD processors are also great but I only have low latency experience with Intel so again I'm uncertain what would make Crossbeam run faster on your desktop. The key difference between Crossbeam and the Disruptor is the busy spinning of the waiting threads. Maybe they are penalized on M1 and AMD or interacts poorly with the chip design?

So I think it just underlines the need for proper testing on a CPU typically used in low latency setting. :-)

Kind regards, Nicholas

Venkat2811 commented 2 months ago

So I think it just underlines the need for proper testing on a CPU typically used in low latency setting. :-)

I agree. Thanks again !

nicholassm commented 2 months ago

No thank you for the example/benchmark.

As a closing remark and as a Rustecean, I must say that it is superp that a general purpose library such as Crossbeam, has so excellent performance that it outperforms the highly specialized Disruptor on (here) two very different processor architectures. (And so the Disruptor library should only be used in low latency applications while the Crossbeam should be your first choice generally.)

Venkat2811 commented 2 months ago

Yes, I am pleasantly surprised with crossbeam as well. And also, optimizations done in disruptor ~12 years ago, is still very relevant and is industry leading :)

Do you think the examples I shared would be useful if added to this repo ? Under examples dir maybe ? Let me know, happy to raise a PR.

nicholassm commented 2 months ago

Yes absolutely. But I think it should be in the form of a benchmark to provide the most value. (See the existing SPSC benchmark for inspiration).

The Criterion crate used here for benchmarks is great: it does the warmup, statistics, outlier detection, etc. for you. If you don't know it, I think you would love it.

lør. 4. maj 2024 20.45 skrev Venkat Raman @.***>:

Yes, I am pleasantly surprised with crossbeam as well. And also, optimizations done in disruptor ~12 years ago, is still very relevant and is industry leading :)

Do you think the examples I shared would be useful if added to this repo ? Under examples dir maybe ? Let me know, happy to raise a PR.

— Reply to this email directly, view it on GitHub https://github.com/nicholassm/disruptor-rs/issues/7#issuecomment-2094344105, or unsubscribe https://github.com/notifications/unsubscribe-auth/AABJ7D7C3JAPX2VARWMUSKTZAUUD3AVCNFSM6AAAAABHGWLYYWVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDAOJUGM2DIMJQGU . You are receiving this because you were mentioned.Message ID: @.***>

Venkat2811 commented 2 months ago

Sure, happy to submit it as a benchmark !

nicholassm commented 2 months ago

Oh, a late insight: Why do you use Ordering::SeqCst and not Ordering::Release/Ordering::Acquire for the fetch_add and load operations respectively? Even though it's the same in both examples, it could impose restrictions that hurt performance. Can you please change that and try it out on M1/ARM respectively? It would be interesting to see the difference, if any.

Venkat2811 commented 2 months ago

I was just looking for closest to Java's volatile keyword as it is well known. I agree that Ordering::Release/Ordering::Acquire are better with finer control over cache lines.

On M1:

There is improvement in SPSC. Earlier with Ordering::SeqCst:

nicholassm commented 2 months ago

After reading the benchmark you submitted, I actually realized that it is measuring throughput and not latency. So, as we've established, Crossbeam in a MPSC setting can have better throughput on some architectures and worse on others. But what about latency - in particular when events happen after some pause? I will write a benchmark for that and then I would very much like you to try that on your different chipsets. (I expect to find for this later this week.)

Venkat2811 commented 2 months ago

From this project's README:

Both libraries greatly improves as the burst size goes up but the Disruptor's performance is more resilient to the pauses between bursts which is one of the design goals.

I was not aware of this.

Sure, would be happy to run your updated benchmarks.

I also wrote script for std::sync::mpsc. Here are the results for M1:

SPSC: Throughput: std > disruptor > crossbeam

Benchmarking crossbeam_spsc: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 13.1s, or reduce sample count to 30.
std_spsc                time:   [122.41 ms 124.10 ms 125.63 ms]
Found 15 outliers among 100 measurements (15.00%)
  15 (15.00%) low mild

Benchmarking crossbeam_spsc: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 28.2s, or reduce sample count to 10.
crossbeam_spsc          time:   [263.89 ms 271.59 ms 279.42 ms]
Found 10 outliers among 100 measurements (10.00%)
  7 (7.00%) low mild
  3 (3.00%) high mild

Benchmarking crossbeam_spsc: Warming up for 3.0000 s 
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 36.4s, or reduce sample count to 10.
disruptor_spsc          time:   [236.87 ms 263.13 ms 289.95 ms]

MPSC: Throughput: std > mpsc > disruptor

Benchmarking std_mpsc: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 28.5s, or reduce sample count to 10.
std_mpsc                time:   [278.84 ms 283.50 ms 288.88 ms]
Found 3 outliers among 100 measurements (3.00%)
  3 (3.00%) high severe

Benchmarking crossbeam_mpsc: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 54.7s, or reduce sample count to 10.
crossbeam_mpsc          time:   [507.17 ms 519.58 ms 532.98 ms]
Found 4 outliers among 100 measurements (4.00%)
  1 (1.00%) low mild
  1 (1.00%) high mild
  2 (2.00%) high severe

Benchmarking disruptor_mpsc: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 68.0s, or reduce sample count to 10.
disruptor_mpsc          time:   [704.01 ms 718.82 ms 735.05 ms]
Found 5 outliers among 100 measurements (5.00%)
  1 (1.00%) low mild
  3 (3.00%) high mild
  1 (1.00%) high severe

I'm also making changes to use struct Event { pub val: i32 } in crossbeam & std benchmarks instead of using i32. I expect to have same results, but doing this just in case.

For these libraries, I think there should be running benchmark measuring performance against each new version across different processor arch. I see crossbeam and tokio being recommended over std channel in several places online, but the above proves std is pretty good on M1. Maybe, these could be better for async scenarios...

Venkat2811 commented 2 months ago

The key difference between Crossbeam and the Disruptor is the busy spinning of the waiting threads. Maybe they are penalized on M1 and AMD or interacts poorly with the chip design?

Yeah this might be the reason.

nicholassm commented 2 months ago

Hi Venkat,

Thanks for the interesting results regarding std. The M1 processor is indeed a different processor than Intel and other traditional chip designs...

I just pushed a commit with minor changes of the spsc benchmark which mainly reduces the overhead of the benchmarking code. It's interesting because the Disruptor now shows much more consistent (good) results compared to Crossbeam. I'm still working on the mpsc benchmark so hold tight.

Kind regards, Nicholas

nicholassm commented 2 months ago

Hi Venkat,

I've done a first stab at an mpsc benchmark. (I'm going to improve naming and do some DRY up later). Try it out and let me know. :-)

cargo bench --bench mpsc

Kind regards, Nicholas

Venkat2811 commented 2 months ago

Hello Nicholas,

With burst & pauses, disruptor is faster in mpsc on M1 as well 🎉. Based on counters.rs, I was expecting crossbeam to be faster at least in burst: 1, pause: 0 ms. Any thoughts on why this isn't the case ?

mpsc/base/1             time:   [198.75 ns 200.94 ns 203.15 ns]
                        thrpt:  [4.9224 Melem/s 4.9766 Melem/s 5.0314 Melem/s]
Found 2 outliers among 100 measurements (2.00%)
  1 (1.00%) high mild
  1 (1.00%) high severe
mpsc/Crossbeam/burst: 1, pause: 0 ms
                        time:   [980.48 ns 988.84 ns 998.24 ns]
                        thrpt:  [1.0018 Melem/s 1.0113 Melem/s 1.0199 Melem/s]
Found 15 outliers among 100 measurements (15.00%)
  1 (1.00%) low severe
  2 (2.00%) low mild
  4 (4.00%) high mild
  8 (8.00%) high severe
mpsc/disruptor/burst: 1, pause: 0 ms
                        time:   [718.18 ns 722.50 ns 727.51 ns]
                        thrpt:  [1.3745 Melem/s 1.3841 Melem/s 1.3924 Melem/s]
Found 15 outliers among 100 measurements (15.00%)
  2 (2.00%) low mild
  8 (8.00%) high mild
  5 (5.00%) high severe
mpsc/Crossbeam/burst: 1, pause: 1 ms
                        time:   [981.82 ns 988.74 ns 995.47 ns]
                        thrpt:  [1.0045 Melem/s 1.0114 Melem/s 1.0185 Melem/s]
Found 20 outliers among 100 measurements (20.00%)
  4 (4.00%) low mild
  6 (6.00%) high mild
  10 (10.00%) high severe
mpsc/disruptor/burst: 1, pause: 1 ms
                        time:   [726.07 ns 733.53 ns 742.81 ns]
                        thrpt:  [1.3462 Melem/s 1.3633 Melem/s 1.3773 Melem/s]
Found 1 outliers among 100 measurements (1.00%)
  1 (1.00%) high mild
mpsc/Crossbeam/burst: 1, pause: 10 ms
                        time:   [968.19 ns 974.28 ns 980.57 ns]
                        thrpt:  [1.0198 Melem/s 1.0264 Melem/s 1.0329 Melem/s]
Found 5 outliers among 100 measurements (5.00%)
  2 (2.00%) low mild
  2 (2.00%) high mild
  1 (1.00%) high severe
mpsc/disruptor/burst: 1, pause: 10 ms
                        time:   [781.78 ns 811.59 ns 847.26 ns]
                        thrpt:  [1.1803 Melem/s 1.2322 Melem/s 1.2791 Melem/s]
Found 11 outliers among 100 measurements (11.00%)
  6 (6.00%) high mild
  5 (5.00%) high severe
mpsc/base/10            time:   [927.63 ns 943.98 ns 960.43 ns]
                        thrpt:  [10.412 Melem/s 10.593 Melem/s 10.780 Melem/s]
Found 32 outliers among 100 measurements (32.00%)
  12 (12.00%) low mild
  3 (3.00%) high mild
  17 (17.00%) high severe
mpsc/Crossbeam/burst: 10, pause: 0 ms
                        time:   [4.2289 µs 4.2732 µs 4.3260 µs]
                        thrpt:  [2.3116 Melem/s 2.3401 Melem/s 2.3647 Melem/s]
Found 12 outliers among 100 measurements (12.00%)
  1 (1.00%) low mild
  4 (4.00%) high mild
  7 (7.00%) high severe
mpsc/disruptor/burst: 10, pause: 0 ms
                        time:   [2.6309 µs 2.6709 µs 2.7179 µs]
                        thrpt:  [3.6793 Melem/s 3.7440 Melem/s 3.8009 Melem/s]
Found 8 outliers among 100 measurements (8.00%)
  3 (3.00%) low mild
  3 (3.00%) high mild
  2 (2.00%) high severe
mpsc/Crossbeam/burst: 10, pause: 1 ms
                        time:   [4.1556 µs 4.2850 µs 4.4604 µs]
                        thrpt:  [2.2419 Melem/s 2.3337 Melem/s 2.4064 Melem/s]
Found 11 outliers among 100 measurements (11.00%)
  1 (1.00%) low severe
  3 (3.00%) low mild
  7 (7.00%) high severe
mpsc/disruptor/burst: 10, pause: 1 ms
                        time:   [2.1449 µs 2.1686 µs 2.1944 µs]
                        thrpt:  [4.5570 Melem/s 4.6114 Melem/s 4.6622 Melem/s]
Found 17 outliers among 100 measurements (17.00%)
  6 (6.00%) low severe
  3 (3.00%) low mild
  3 (3.00%) high mild
  5 (5.00%) high severe
mpsc/Crossbeam/burst: 10, pause: 10 ms
                        time:   [4.1243 µs 4.1732 µs 4.2262 µs]
                        thrpt:  [2.3662 Melem/s 2.3962 Melem/s 2.4247 Melem/s]
Found 18 outliers among 100 measurements (18.00%)
  7 (7.00%) low severe
  4 (4.00%) high mild
  7 (7.00%) high severe
mpsc/disruptor/burst: 10, pause: 10 ms
                        time:   [2.1567 µs 2.1850 µs 2.2185 µs]
                        thrpt:  [4.5076 Melem/s 4.5767 Melem/s 4.6367 Melem/s]
Found 19 outliers among 100 measurements (19.00%)
  13 (13.00%) low mild
  5 (5.00%) high mild
  1 (1.00%) high severe
mpsc/base/100           time:   [7.9839 µs 8.0875 µs 8.1880 µs]
                        thrpt:  [12.213 Melem/s 12.365 Melem/s 12.525 Melem/s]
Found 5 outliers among 100 measurements (5.00%)
  4 (4.00%) low mild
  1 (1.00%) high mild
mpsc/Crossbeam/burst: 100, pause: 0 ms
                        time:   [28.204 µs 29.608 µs 31.080 µs]
                        thrpt:  [3.2175 Melem/s 3.3774 Melem/s 3.5456 Melem/s]
Found 6 outliers among 100 measurements (6.00%)
  4 (4.00%) high mild
  2 (2.00%) high severe
mpsc/disruptor/burst: 100, pause: 0 ms
                        time:   [22.172 µs 22.785 µs 23.472 µs]
                        thrpt:  [4.2603 Melem/s 4.3888 Melem/s 4.5101 Melem/s]
Found 5 outliers among 100 measurements (5.00%)
  1 (1.00%) low mild
  4 (4.00%) high mild
mpsc/Crossbeam/burst: 100, pause: 1 ms
                        time:   [39.517 µs 41.556 µs 43.590 µs]
                        thrpt:  [2.2941 Melem/s 2.4064 Melem/s 2.5305 Melem/s]
Found 8 outliers among 100 measurements (8.00%)
  5 (5.00%) high mild
  3 (3.00%) high severe
mpsc/disruptor/burst: 100, pause: 1 ms
                        time:   [20.577 µs 20.871 µs 21.193 µs]
                        thrpt:  [4.7186 Melem/s 4.7914 Melem/s 4.8598 Melem/s]
Found 19 outliers among 100 measurements (19.00%)
  6 (6.00%) low severe
  6 (6.00%) low mild
  3 (3.00%) high mild
  4 (4.00%) high severe
mpsc/Crossbeam/burst: 100, pause: 10 ms
                        time:   [33.662 µs 35.750 µs 37.827 µs]
                        thrpt:  [2.6436 Melem/s 2.7972 Melem/s 2.9707 Melem/s]
Found 3 outliers among 100 measurements (3.00%)
  1 (1.00%) high mild
  2 (2.00%) high severe
mpsc/disruptor/burst: 100, pause: 10 ms
                        time:   [21.044 µs 21.585 µs 22.232 µs]
                        thrpt:  [4.4981 Melem/s 4.6328 Melem/s 4.7519 Melem/s]
Found 3 outliers among 100 measurements (3.00%)
  1 (1.00%) high mild
  2 (2.00%) high severe

Thanks, Venkat

nicholassm commented 2 months ago

Hi Venkat,

Excellent - the world makes sense again because I was expecting the disruptor to have lower latency. :-D

I think the reason the disruptor has lower latency in the burst: 1, pause: 0 ms case is that, first and foremost, the crossbeam channel does the synchronization in a different way. Secondly, due to its "channel nature", crossbeam also has to check if the channel is disconnected, etc. Check out the source code for the crossbeam bounded channel - it's very well written code.

Kind regards, Nicholas

nicholassm commented 2 months ago

Hi Venkat,

Thanks for all the ping-pong. I'm closing this issue but feel free to open a new if you have new findings.

Kind regards, Nicholas

Venkat2811 commented 2 months ago

I think the reason the disruptor has lower latency in the burst: 1, pause: 0 ms case is that, first and foremost, the crossbeam channel does the synchronization in a different way. Secondly, due to its "channel nature", crossbeam also has to check if the channel is disconnected, etc.

Hi Nicholas,

Yes, these are the reasons for Disruptor to be faster. But in bench/counters.rs' mpsc, disruptor was slower on M1. I think it's the same as burst: 1, pause: 0 ms in your benchmark where disruptor is faster. I'm not sure what's the cause of this discrepancy.

Yes, thanks to you too. It has been interesting for me too & an excellent project for me to get started with Rust :)

Thanks, Venkat

nicholassm commented 2 months ago

I think it actually makes sense that the mpsc benchmark has lower latency on M1 than in the counters bench. The reason is that in the counters bench, the consumer thread is processing many events without interruption in each measurement. In the mpsc bench, there are many iterations done for each measurement (done by criterion) and thus many pauses for the consumer thread in crossbeam. But not to the thread in the disruptor because it busy-spins.

Kind regards Nicholas ons. 15. maj 2024 00.27 skrev Venkat Raman @.***>:

I think the reason the disruptor has lower latency in the burst: 1, pause: 0 ms case is that, first and foremost, the crossbeam channel does the synchronization in a different way. Secondly, due to its "channel nature", crossbeam also has to check if the channel is disconnected, etc.

Hi Nicholas,

Yes, these are the reasons for Disruptor to be faster. But in bench/ counters.rs' mpsc, disruptor was slower on M1. I think it's the same as burst: 1, pause: 0 ms in your benchmark where disruptor is faster. I'm not sure what's the cause of this discrepancy.

Yes, thanks to you too. It has been interesting for me too & an excellent project for me to get started with Rust :)

Thanks, Venkat

— Reply to this email directly, view it on GitHub https://github.com/nicholassm/disruptor-rs/issues/7#issuecomment-2111240745, or unsubscribe https://github.com/notifications/unsubscribe-auth/AABJ7D4KCK37TRPPVPHZQZTZCKFUJAVCNFSM6AAAAABHGWLYYWVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDCMJRGI2DANZUGU . You are receiving this because you modified the open/close state.Message ID: @.***>