nicholassm / disruptor-rs

Low latency inter-thread communication library in Rust inspired by the LMAX Disruptor.
MIT License
618 stars 18 forks source link

When the number of producers is greater than or equal to the number of cpu cores, the performance will be greatly reduced #12

Closed bigKoki closed 2 months ago

bigKoki commented 2 months ago

Hello,thank for your work。When the number of producers is bigger than or equal to the number of cpu cores, the performance will be greatly reduced。 My test machine cpu has 4 cores. If I set buffSize = 64, msgNum = 10_000_000, producerNum = 2, it consumes 972 ms. But if I set producerNum = 4, it consumes over 20000ms, much worse than crossbeam. Could you please tell me what cause this? Thansk very much!!! My code is as below:

use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicI64, Ordering};
use std::{env, thread};
use std::time::Instant;
use disruptor::*;

// The event on the ring buffer.
struct Event {
    price: i64,
}

impl Event {
    fn set(&mut self, price: i64) {
        self.price = price;
    }
    fn get(&mut self) -> i64 {
        self.price
    }
}

fn main() {
    let mut size = 64;
    let mut count = 10_000_000;
    let mut num_threads = 8;

    let args: Vec<String> = env::args().collect();
    if args.len()>1 {
        size = args[1].parse::<i64>().unwrap();
        num_threads = args[2].parse::<i64>().unwrap();
        if args.len() > 3 {
            count = args[3].parse::<i64>().unwrap();
        }
    }
    println!("size: {}, count: {}, num_threads: {}", size, count, num_threads);
    let factory = || { Event { price: 0 }};

    // let sink = Arc::new(AtomicI64::new(0)); //bcos we read and print value from main thread
    // Consumer
    let processor = {
        // let sink = Arc::clone(&sink);
        move |event: &Event, _sequence: i64, _end_of_batch: bool| {
            // sink.fetch_add(event.price, Ordering::Release);
            // println!("thread id: {:?}", thread::current().id());

        }
    };

    let mut producer1 =
        disruptor::build_multi_producer(size as usize, factory, BusySpin)
            .handle_events_with(processor.clone())
            .build();

    // Create another producer.

    let mut producers: Vec<_> = (0..num_threads-1).map(|_| producer1.clone()).collect();

    let start_time = Instant::now();
    // Publish into the Disruptor.
    thread::scope(|s| {
        s.spawn(move || {
            for i in 0..count {
                producer1.publish(|e| {
                    // println!("thread id: {:?}", thread::current().id());
                    e.set(1);
                });
            }
        });
        for producer in producers.iter_mut() {
            s.spawn(move || {
                for i in 0..count {
                    producer.publish(|e| {
                        // println!("thread id: {:?}", thread::current().id());
                        e.set(1);
                    });
                }
            });
        }
    });

    // Wait for the producers to finish.

    // Get the final sum.
    // let final_sum = sum.lock().unwrap();
    let d = Instant::now().duration_since(start_time);
    let delta = d.as_millis();
    // let res = sink.load(Ordering::Acquire);
    println!("Sum: {}, processed time: {}", 0, delta);
}
nicholassm commented 2 months ago

The problem is that your machine has too few CPU cores compared to how may threads you spawn. In your example, you spawn 5 producer threads and one consumer thread (the ladder is done by the Disruptor).

When you have more threads than cores, the threads have to be context-switched during execution to allow all threads to make progress. That generates a lot of overhead. That's why you're seeing much slower execution in your example.

You haven't posted your Crossbeam code so I'm not sure if you also spawn 6 threads in that code? How are you comparing the two libraries?

nicholassm commented 2 months ago

I should add that the difference is also that the consumer thread in the Disruptor is busy-spinning while it waits for events. I.e. it fully occupies a core which a receiver in Crossbeam does not. So Crossbeam performs better in the case where you have more threads than cores - but then it's not a low latency scenario which is what the Disruptor is designed for.

Does it make sense?

Kind regards Nicholas

bigKoki commented 2 months ago

The problem is that your machine has too few CPU cores compared to how may threads you spawn. In your example, you spawn 5 producer threads and one consumer thread (the ladder is done by the Disruptor).

When you have more threads than cores, the threads have to be context-switched during execution to allow all threads to make progress. That generates a lot of overhead. That's why you're seeing much slower execution in your example.

You haven't posted your Crossbeam code so I'm not sure if you also spawn 6 threads in that code? How are you comparing the two libraries?

First of all, thanks very much for your answer. According to what you said, when using rust-disruptor, the number of threads needs to be controlled to be less than the number of cpu cores. When I use rust-crossbeam and java-disruptor(with BusySpin), there is no such limitation. They will not cause exponential performance decay because the number of threads exceeds the number of cpu cores. Here are my comparison results:

Setting 1 (buff_size=256, producer_num=2, msg_num_per_producer=10_000_000): java-disruptor: 1620ms rust-crossbeam: 788ms rust-disruptor: 761ms

Setting 2 (buff_size=256, producer_num=4, msg_num_per_producer=10_000_000): java-disruptor: 3875ms rust-crossbeam: 1820ms rust-disruptor: It took more than 2 minutes so I stopped the program.

I wonder if your code design requires the number of threads to be less than the number of cpu cores to achieve ideal performance.

Here are my other two codes:

import com.lmax.disruptor.BusySpinWaitStrategy;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import com.lmax.disruptor.util.DaemonThreadFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

class Event {
    private int value;
    public int get() {
        return value;
    }
    public void set(int value) {
        this.value = value;
    }
}

class Task implements EventHandler<Event> {
    AtomicLong consume_num = new AtomicLong(0);
    private Disruptor<Event> disruptor;
    private RingBuffer<Event> buffer;

    public Task(int bufSize) {
        this.disruptor = new Disruptor<>(Event::new, bufSize, DaemonThreadFactory.INSTANCE,
                ProducerType.MULTI, new BusySpinWaitStrategy());
        this.buffer = disruptor.getRingBuffer();
        this.disruptor.handleEventsWith(this);
        this.disruptor.start();
    }

    public boolean emit() {
        final long sequence = buffer.next();
        final Event event = buffer.get(sequence);
        event.set(1);
        buffer.publish(sequence);
        return true;
    }

    @Override
    public void onEvent(Event event, long l, boolean b) throws Exception {
//        this.consume_num.getAndAdd(event.get());
    }
}

class ProducerThread extends Thread {
    long msg_num = 0;
    Task task;
    public ProducerThread(Task task, long msg_num) {
        this.task = task;
        this.msg_num = msg_num;
    }
    @Override
    public void run() {
        for (long i = 0; i < msg_num; i++){
            task.emit();
        }
    }
}

public class Disruptor3P1CDemo {
    public static void main(String[] args) throws Exception
    {
        int bufSize = 512;
        int num_threads = 2;
        long num_msg = 10_000_000;

        if (args.length > 0) {
            bufSize = Integer.parseInt(args[0]);
            num_threads = Integer.parseInt(args[1]);
        }

        Task task = new Task(bufSize);
        List<ProducerThread> producerThreads = new ArrayList<>();
        for (int i = 0; i < num_threads; i++) {
            ProducerThread producerThread = new ProducerThread(task, num_msg);
            producerThread.start();
            producerThreads.add(producerThread);
        }
        for( ProducerThread producerThread : producerThreads){
            producerThread.join();
        }
        producerThreads.clear();
        long st = System.currentTimeMillis();
//        List<ProducerThread> producerThreads = new ArrayList<>();
        for (int i = 0; i < num_threads; i++) {
            ProducerThread producerThread = new ProducerThread(task, num_msg);
            producerThread.start();
            producerThreads.add(producerThread);
        }
        for( ProducerThread producerThread : producerThreads){
            producerThread.join();
        }
        long et = System.currentTimeMillis();
        System.out.println(String.format("Sum: %s, Process time: %s", task.consume_num, (et - st)));
    }
}
use crossbeam::channel::*;
use std::{env, thread};
use std::time::Instant;

fn main() {
    let mut buf_size = 8192;
    let mut num_threads = 8;
    let mut producer_msg_no = 10_000_000;

    let args: Vec<String> = env::args().collect();
    if args.len() > 1 {
        buf_size = args[1].parse::<i64>().unwrap();
        num_threads = args[2].parse::<i64>().unwrap();
        if args.len() > 3 {
            producer_msg_no = args[3].parse::<i64>().unwrap();
        }
    }

    let (s, r) = bounded(buf_size as usize);

    let mut threads = Vec::new();
    let mut producers: Vec<_> = (0..num_threads - 1).map(|_| s.clone()).collect();

    let start_time = Instant::now();

    let t1 = thread::spawn(move || {
        for _ in 0..producer_msg_no {
            s.send(1).unwrap();
        }
    });

    for s_clone in producers {
        threads.push(thread::spawn(move || {
            for _ in 0..producer_msg_no {
                s_clone.send(1).unwrap();
            }
        }));
    }

    // 消费者线程,负责接收和处理生产者发送的消息
    let mut sum = 0;
    for msg in r {
        let tmp = msg;
        // sum += tmp;
    }

    let _ = t1.join();
    for thread in threads {
        let _ = thread.join();
    }

    let d = Instant::now().duration_since(start_time);
    let delta = d.as_millis();
    println!("Sum: {}, processed time: {}", sum, delta);
}
nicholassm commented 2 months ago

I can see that in both the Disruptor (Rust version) and Crossbeam examples, you have commented out code inside the receiving/consuming end. That can cause the compiler to perform dead-code removal so I'm not sure you're measuring what you intended. I would recommend you try out the benchmark in benches/mpsc.rs with a the number of producers you're trying here. That should measure everything in a comparative way so you can compare the two.

Also, to answer your question on the number of threads and cores for optimal performance of the Disruptor: Yes, there need to be enough available cores. If you need to use more producer threads than cores, you should use Crossbeam as you are trying to maximize throughput and not minimize latency.

Kind regards, Nicholas

bigKoki commented 2 months ago

I can see that in both the Disruptor (Rust version) and Crossbeam examples, you have commented out code inside the receiving/consuming end. That can cause the compiler to perform dead-code removal so I'm not sure you're measuring what you intended. I would recommend you try out the benchmark in benches/mpsc.rs with a the number of producers you're trying here. That should measure everything in a comparative way so you can compare the two.

Also, to answer your question on the number of threads and cores for optimal performance of the Disruptor: Yes, there need to be enough available cores. If you need to use more producer threads than cores, you should use Crossbeam as you are trying to maximize throughput and not minimize latency.

Kind regards, Nicholas

Thanks for your reply!!! After our testing, your tool outperforms others in most cases. We will do more testing to decide whether to use rust-disruptor or crossbeam. Best wishes!!!