mpilone / hazelcastmq

Messaging, STOMP server, Camel component, and JMS provider for Hazelcast
Apache License 2.0
81 stars 15 forks source link

Pooled Queue Listener #8

Open nhoughto opened 10 years ago

nhoughto commented 10 years ago

Another question, i'm looking at using hazelcastmq to listen on a potentially large set of endpoints using Camel, say 1000+. As I understand the code atm it creates 1 (or 2?) new threads to listen for each queue. I don't think this approach will scale for me with 1000+ queues to listen on.

Is it possible to do more of a pooled listener approach? Say have X concurrent consumers that farm of requests from all the 1000+ queues, rather than 1 thread per queue? Or is this a restriction of hazelcast and the BlockingQueue implementation?

thanks

bwzhang2011 commented 10 years ago

Hi,nhoughto.I think I have some similar question for that. once we made so many queue for offer and poll and how to make it scalable and run under performance as we expected. please take a look at https://github.com/hazelcast/hazelcast/issues/3661. as for your scene, I think it's not a good idea to have so many queue to transfer message so we should use some type message to hold the similar sort of message to be transferred as you know each queue will consume the IO and rely on the network. then as the issue referred, we should make some multiple way to speed up offering and add use at lease 2 threads for polling. now it was some idea for hzmq to improve its offer and poll way by add more threads way but it was due to your design. here comes my polling similar to your idea but I don't think it was a good idea to rely camel api because once we need to adjust as we needs we have to write another component or make the modification to the current one:

protected void startQueueBack() { for (int i = 0; i < DEFAULT_DISPACH_SIZE; i++) { Thread hzThread = getHzExecutorThread();

        hzThreadList.add(hzThread);

        hzThread.start();
    }
}

private Thread getHzExecutorThread() { ThreadFactory hzThreadFactory = new ThreadFactoryBuilder().setNameFormat(respName).setDaemon(true).build();

    final IQueue<ChannelTransferMsg> transferMsgQueue = getClientHazelcastInstance(this.appType).getQueue(respName);

    final LocalQueueStats localQueueStats = transferMsgQueue.getLocalQueueStats();

    return hzThreadFactory.newThread(new Runnable() {
        @Override
        public void run() {
            while (ctrlFlag.get()) {
                try {
                    ChannelTransferMsg pollMsg = transferMsgQueue.poll(pollTimeout, TimeUnit.MILLISECONDS);

                    if (pollMsg != null) {
                        handleBack(pollMsg);

                        if (enableMonitor) {
                            LOGGER.debug("=====针对应答队列{}>,添加统计总数<{}>,提取统计总数<{}>,失败统计总数<{}>========", respName,
                                    localQueueStats.getOfferOperationCount(), localQueueStats.getPollOperationCount(),
                                    localQueueStats.getRejectedOfferOperationCount());
                        }
                    }
                } catch (InterruptedException ex) {
                    Thread.currentThread().interrupt();
                    return;
                } catch (Exception ex) {
                    handleException(ex, HazelOperType.QUEUE);
                }
            }

        }

    });

}

By the way. during my application, I encountered with the hazelcast not active Exception even if I set hazelcast.shutdownhook.enabled = false. I didn't know you have a lot queue in your project once the hazelcast is not active. how could you solve the exception thrown afterwards.

mpilone commented 10 years ago

The current design dedicates a thread per HzMq context. So you can think of each context as a simple reactor that could have any number of consumers. So if you have a hundred consumers created from a single context, you'll still only be using one thread. However, following the JMS 2 specification (which HzMq attempts to do), each context is not thread-safe so only a single consumer can be polled at a time or all consumers will push messages using the single context thread.

When you combine with with Camel things get complicated. Camel has both pull (polling) and push endpoint consumers and each Camel consumer can be started or stopped independently. Because of this, the underlying HzMq consumers have to be managed carefully in order to not violate any thread-safety rules with HzMq and to make sure that stopping one Camel consumer doesn't affect any other Camel consumer. To do this, each Camel consumer creates its own HzMq context so it can start and stop the context along with the Camel consumer lifecycle. This means (at least) one thread per Camel endpoint consumer.

I have been trying to think of ways to improve this and I might have some ideas that I'd need to think through some more and I'd welcome suggestions:

  1. Change HzMqContext to put a dispatch task in the thread pool whenever it is ready for dispatch rather than having a dedicated thread per context. This would allow the number of threads (i.e. the number of concurrent contexts) to be controlled by the thread pool configuration. So if you had a single threaded thread pool, you would basically have a single thread reactor across all contexts so only one consumer across all contexts could be dispatching. As with all reactor patterns, you trade concurrent performance for low thread count and simplicity. You could also just limit the number of threads in the pool to allow some currency among contexts but know that there is an upper bound.
  2. Modify the Camel consumer implementation to share a context across all Camel consumers on an endpoint. The HzMq consumers would need to be created/closed with the lifecycle of the Camel consumer rather than simply starting or stopping the context. Because the shared context is single threaded, this would mean that there would be no concurrency on the Camel consumers on an endpoint which doesn't seem like a good idea.
  3. Leave the push model the same (i.e. one thread per context) but only initialize the thread when a message listener is installed on a consumer. This would allow a context that is only using polling consumers to not require a thread. If a Camel polling consumer was used, there would be no thread use by HzMq; however I don't know how Camel implements a polling consumer and it may internally be allocating a thread which means we just shifted the thread count problem from HzMq to Camel.

I'm leaning toward approach 1 as it would apply to all uses of HzMq, not just in Camel. The downside is that there might be a little more overhead putting a dispatch task in the executor thread pool than there is now where the thread is already dedicated and it is just waiting on a lock. Maybe there is a way make this configurable in a strategy.

All that being said, 1000 queues is a lot for any application. I would make sure you test this with Camel and Hazelcast because they may also be introducing a lot of thread overhead with this many queues but I don't know the internal architecture of either well enough to say.

nhoughto commented 10 years ago

I like the sound of option 1. The 1000 queue scenario is a solution to hazelcast's lack of message selectors, so queues can only really be used for one purpose, can't be filtered etc. So less threads but a slightly higher latency would work for me.

I might investigate in further detail how hazelcast and camel would handle such a scenario too.

thanks!

bwzhang2011 commented 10 years ago

agreed with above and look forward to the improvement towards option 1.

mpilone commented 10 years ago

I pushed code to support a configurable dispatch strategy into the develop branch. I haven't done too much testing on it but you could give it a try to see if it meets your needs.

nhoughto commented 10 years ago

Looks like its just implemented in core atm? Not the camel component? Or am i missing something?

Thanks for the quick attention!

mpilone commented 10 years ago

The HzMq Camel component uses HzMq core for all the underlying messaging so if you configure your HzMq Instance to use the new reactor dispatch strategy and then set that instance on your HzMq Camel config, the Camel component will create contexts that use the reactor pattern. As far as I can remember without reviewing the code the HzMq Camel component doesn't create any of its own threads (aside from those created by Camel itself) and just relies on HzMq core for message dispatch.