LMAX-Exchange / disruptor

High Performance Inter-Thread Messaging Library
http://lmax-exchange.github.io/disruptor/
Apache License 2.0
17.44k stars 3.92k forks source link

Disruptor silently fails to consume events in case of threadsNumber < handlersCount #121

Closed Spikhalskiy closed 9 years ago

Spikhalskiy commented 9 years ago

If we initialize disruptor with 2-threads executor and setup workflow of 2 parallel handlers and 1 more handler after them - disruptor will not throw any exceptions or at least warning logs, but will fail to consume all published events. Actually, we can execute this flow with 2 threads. In example below only handlers 0 and 1 executing, handler 2 will never execute, event never get consumed state, which leads to overflow.

        Executor executor = Executors.newFixedThreadPool(2);
        Disruptor<AtomicLong> disruptor = new Disruptor<AtomicLong>(new AtomicLongEventFactory(), 8, executor, ProducerType.SINGLE, new BlockingWaitStrategy());

        disruptor.handleExceptionsWith(new FatalExceptionHandler());
        disruptor
                .handleEventsWith(new AtomicLongWorkHandler(0), new AtomicLongWorkHandler(1)).then(new AtomicLongWorkHandler(2));
        disruptor.start();
        disruptor.publishEvent(...);

You can find failing unit test here

I can pay some time, investigate and prepare pull request maybe if you confirm that it's actually a bug.

mikeb01 commented 9 years ago

I think the problem is that we've used the wrong interface for the DSL. We should have use a ThreadFactory instead of an Executor as the handler thread owns the thread for the lifetime of the disruptor instance (which generally similar to the lifetime of the application hosting it). The Executor interface does not have the right set of information to let the caller know if the Runnable has been executed or not.

Spikhalskiy commented 9 years ago

Hi Michael,

Right, we have just one thread per executor/handler and we never put this threads back to pool, so there is no need in Executor and it's wrong interface here. So, I think it will be better to provide interface with ThreadFactory instead of ExecutorService and without it at all with default "new Thread()".

The Executor interface does not have the right set of information to let the caller know if the Runnable has been executed or not.

We can look at executor and it's "running" marker. If we pushed it to Executor and it doesn't running - something went wrong and can log it at least. But there it no need in such quick fixes if we can fix design.

Thank you, Dmitry

Spikhalskiy commented 9 years ago

Please, find pull request attached. Feel free to take it and rework.