logstash-plugins / logstash-integration-rabbitmq

Logstash Integration Plugin for RabbitMQ, including Logstash Input and Output Plugins
Apache License 2.0
5 stars 25 forks source link

Use a shared ExecutorService for all AMQP connections / plugin instances #19

Open micoq opened 7 years ago

micoq commented 7 years ago

Currently, for each AMQP connection, an ExecutorService is created with a number of threads equal to two times the number of CPUs/cores. These threads seems always to be in sleeping/blocking state (checked with VisualVM). On a system with many cores and many AMQP connections this can waste a large amount of memory and sometimes slow down the whole pipeline.

Example on my environment:

Total of sleeping threads : 7680 (160 connections24 cores2) for about 2GB of threads stacks.

If we look at the code, the RabbitMQ client library create a FixedThreadPool with 2*core threads in the constructor of com.rabbitmq.client.impl.ConsumerWorkService if no pool is provided:

private static final int DEFAULT_NUM_THREADS = Runtime.getRuntime().availableProcessors() * 2;

this.executor = (executor == null ? Executors.newFixedThreadPool(DEFAULT_NUM_THREADS, threadFactory) : executor);

Actually, the March Hare library (which uses the RabbitMQ client) should provide this pool in the connection factory (com.rabbitmq.client.ConnectionFactory).

Here is a quick&dirty workaround to create a pool with only one thread for each connection by patching March Hare (/usr/share/logstash/vendor/bundle/jruby/1.9/gems/march_hare-2.20.0-java/lib/march_hare/session.rb). Add this line in self.connect: cf.setSharedExecutor(Executors.newSingleThreadExecutor)

and the import line at the beginning: java_import java.util.concurrent.Executors

On my setup:

We can even use a single thread for all the connections without a loss of speed (@4000 events/s with filters). In this case, the thread is running but not heavily used. It seems to handle the consumer delivery in the RabbitMQ client library.

michaelklishin commented 7 years ago

March Hare allows you to use any executor service you want.

michaelklishin commented 7 years ago

I -1 the idea of using the single threaded executor by default. It can be Runtime.getRuntime().availableProcessors() instead of Runtime.getRuntime().availableProcessors() * 2, for example.

micoq commented 7 years ago

Hello,

Yes, it was a very dirty workaround :) How can I do that properly ? By using the ":thread_pool_size" and or the ":executor_factory" parameter in the the self.connect method ?

michaelklishin commented 7 years ago

@micoq correct. The former is for cases when you only want to specify pool size and nothing else (the executor is of a fixed size). I'd probably recommend calculating pool size as Runtime.getRuntime().availableProcessors() over using a dynamically growing and shrinking service for Logstash specifically.

michaelklishin commented 7 years ago

Also worth pointing out that the setting is per ConnectionFactory and most projects only ever have one (or few). March Hare tries to provide an API that hides it from the user (some Ruby users experience a cultural shock when they see a class with a Factory in the name), effectively equating the number of factories to the number of connections.

I'd consider an alternative API for March Hare that would e.g. accept a connection factory. The next release will be 3.0.0, so it's a good chance to consider it.

micoq commented 7 years ago

So, we could create a single ConnectionFactory for the whole Logstash process with only one thread pool for the consumer work service executor (with something like this: Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors())).

Just one more question: I don't really understand the exact purpose of the consumer work service executor, does it handle the consumer I/O, serialization of AMQP frames, acks... ? In VisualVM, the threads from this pool (pool-XXX-thread-X) are almost always in Park state and very rarely in Running state (for about 4000 messages/s).

michaelklishin commented 7 years ago

@micoq correct.

ConsumerWorkService handles dispatch of deliveries and other consumer methods (e.g. a server-sent basic.cancel) to consumers. There are two major requirements that shape its design:

I think you can find enough comments in the Java client code as well test examples to learn more.

michaelklishin commented 7 years ago

@micoq well, change your workload to do something more intensive/time consuming and you will see a different pool thread utilization picture.

micoq commented 7 years ago

@michaelklishin Thank you for the explanation. I could use the Java client directly in a custom application to charge the consumer threads. However, in Logstash, these threads never do an CPU-intensive job: They just put the paylod in a queue. The queue may block but in this case, the bottleneck are the worker threads which handle the filters (and the outputs) so all the CPU-intensives tasks (Groks, fields manipulations...) or the input thread themselves (payload deserialization, codec...) just after the queue if I don't make any mistake.

Anyway, the availableProcessors() pool size is a good compromise. (I forgot to mention the thread stack is pretty heavy on JRuby with a default size of 2048kb).

michaelklishin commented 7 years ago

@micoq I didn't say "CPU intensive", I said "time consuming."

I'll see what would a suitable connection method look like in March Hare soon.

micoq commented 7 years ago

@michaelklishin Yes, I just said "CPU intensive" compared to the job done by the workers threads in Logstash.

andrewvc commented 7 years ago

@micoq thanks for the excellent sleuthing. We definitely should fix this issue.

@michaelklishin I would love to see that option added. We'd use a single global executor for all of logstash, with perhaps #cpu_cores slots allocated across all rabbitmq input instances. WDYT of that idea?

micoq commented 7 years ago

Without touching the march_hare code, could it be possible to share an unique connection with multiple consumers (if all the rabbitmq input instances have to the same server/port/credentials) e.g. by caching the connection among the input instances ?

In the ideal case, the input thread and the consumer thread could also be merged into an unique thread (so the internal_queue between the two threads could be removed). However, in practice, the consumer thread is declared as a callback and the run() method can't return before Logstash stops.

Meanwhile, I discovered an issue in Logstash 5.1.1 where too many threads can severely slow down the whole pipeline : see here. If the monitoring can't be disabled, I think it should be good to reuse/share the threads as much as possible across the JVM.

andrewvc commented 7 years ago

@micoq I think that kind of connection sharing makes a ton of sense. In the long run I'd like to make it explicit via new additions to the logstash grammar, but in the meantime I think it makes sense for plugins to detect this.

I would say the best way to do this would be for the rabbitmq_connection mixin to set class variables in a special namespace as singletons and manage that pool.

I'm glad to tackle that, but given some other priorities that change would be at least a month away. Do you have any interest in tackling that patch? I'm glad to give speedy PR reviews.

michaelklishin commented 7 years ago

In case it would be necessary (or more convenient) to extend March Hare to expose the new Java client settings, let me know. I plan to work on a March Hare release based on the 4.x Java client quite soon.

micoq commented 7 years ago

@andrewvc I made a small patch on the mixin rabbitmq_connection. I don't know if it's the best solution but I hope it can help a bit :)

There's a new parameter connection_pool_name to provide a custom name to group all the rabbitmq input/outputs instances with the same name in a single connection (when it's possible). By default, this parameter is nil so there will be no grouping (like the legacy behavior).Each thread have his own channel (by following the recommendation of the march hare documentation.

After some tests, the best solution seems to have a single connection for the inputs and another connection for the outputs (on a single RabbitMQ). I think the consumers can be slowed down by the whole connection (in flow state).

@michaelklishin I didn't have to touch march hare for this but for our earlier discussion it could be a good idea to let the user pass an already initialized executor to the self.connect method (and by default, to use an executor with Runtime.getRuntime().availableProcessors() threads).

jakauppila commented 7 years ago

I was curious where this was left off, we also connect to a number of RabbitMQ instances from our indexers that would likely benefit from having this in place.

michaelklishin commented 7 years ago

@Jakauppila I haven't looked at a new March Hare version but would definitely find some time to review/guide a PR that adds support for the new Java client options. Integrating them into the Logstash plugins then should be pretty straightforward.