conversant / disruptor

Disruptor BlockingQueue
Apache License 2.0
311 stars 47 forks source link

why SpinPolicy.BLOCKING seems to be the fatest? #41

Closed microhardsmith closed 2 years ago

microhardsmith commented 2 years ago

` package com.example.demo;

import com.conversantmedia.util.concurrent.DisruptorBlockingQueue; import com.conversantmedia.util.concurrent.SpinPolicy; import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue;

@Slf4j public class SimpleTest { public static int TASK_SUM = 0; public static int PRODUCER = 0; public static int CONSUMER = 0; public static int QUE_SIZE = 0; public static CountDownLatch countDownLatch;

public void test(final BlockingQueue<String> q) throws InterruptedException {
    //生产者线程
    class Producer implements Runnable {
        @Override
        public void run() {
            for (int i = 0; i < TASK_SUM * CONSUMER; i++) {
                try {
                    q.put("");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            countDownLatch.countDown();
        }

    }
    ;
    //消费者线程
    class Consumer implements Runnable {
        @Override
        public void run() {
            for (int i = 0; i < TASK_SUM * PRODUCER; i++) {
                try {
                    q.take();
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
            countDownLatch.countDown();
        }
    }
    countDownLatch = new CountDownLatch(PRODUCER + CONSUMER);
    Thread[] arrProducerThread = new Thread[PRODUCER];
    for (int i = 0; i < PRODUCER; i++) {
        arrProducerThread[i] = new Thread(new Producer());
    }
    Thread[] arrConsumerThread = new Thread[CONSUMER];
    for (int i = 0; i < CONSUMER; i++) {
        arrConsumerThread[i] = new Thread(new Consumer());
    }
    //go!
    long t1 = System.currentTimeMillis();
    for (int i = 0; i < PRODUCER; i++) {
        arrProducerThread[i].start();
    }
    for (int i = 0; i < CONSUMER; i++) {
        arrConsumerThread[i].start();
    }
    countDownLatch.await();
    long t2 = System.currentTimeMillis();
    log.info("task: {}, producer: {}, consumer: {}, queue size: {}, time: {} ms", TASK_SUM, PRODUCER, CONSUMER, QUE_SIZE, t2-t1);
}

public static void main(String[] args) throws InterruptedException {

    SimpleTest.TASK_SUM = 100000;
    SimpleTest.PRODUCER = 5;
    SimpleTest.CONSUMER = 5;
    SimpleTest.QUE_SIZE = 512;
    final BlockingQueue<String> q1 = new LinkedBlockingQueue<>(SimpleTest.QUE_SIZE );
    new SimpleTest().test(q1);

    final BlockingQueue<String> q2 = new ArrayBlockingQueue<>(SimpleTest.QUE_SIZE );
    new SimpleTest().test(q2);

    final BlockingQueue<String> q3 = new DisruptorBlockingQueue<>(SimpleTest.QUE_SIZE, SpinPolicy.BLOCKING);
    new SimpleTest().test(q3);

}

} `

I tested the disruptor blocking queue for its performance, and it seems only SpinPolicy.BLOCKING strategy is faster than ArrayBlockingQueue or LinkedBlockingQueue, SpinPolicy.WAITING is extremely slow! am I wrong or where is the problem?

jac18281828 commented 2 years ago

Thanks @microhardsmith, I think you will find that this example underscores the challenges of developing a benchmark. First, if one is trying to observe a statistic one must develop a measurement that is more precise than the signal presented by the statistic itself (signal to noise ratio). From a software standpoint this means that you can not allow any 'noise' from the operating system itself or from the Hotspot JVM, such as compilations of modules and dynamic recompilation, speculative conditions or garbage collection to interfere with the timing itself. Second, you must take into account processor architecture and processor scheduling as operating system pauses which may be of order milliseconds can be extremely disruptive in this kind of measurement. Third, one must take into account 'reproducibility' and order of operations and how that impacts subsequent operations in Java. Are the conditions of the test identical in Java from the instantiation of q1 through to q2 or q3? They are not. Finally, you may find that a ring buffer style queue offers no benefit to the application you are developing. Perhaps your application is already cpu bound or bound by waiting as in the above case. I hope this helps in your efforts to benchmark blocking queues.

microhardsmith commented 2 years ago

I still don't know how to properly test blocking queue, am I encouraged to use DisruptorBlockingQueue in executors to improve its performance? perhaps you could tell me the best practice of using DisruptorBlockingQueue by your tests