rollno748 / di-kafkameter

JMeter Plugin to load test Apache Kafka topics/brokers
33 stars 11 forks source link

Queue for multi-thread consumers. How to organize consumers for multi-thread access? #21

Open polyakovmyu opened 5 months ago

polyakovmyu commented 5 months ago

Hello! Assume that we have 1 pool of values for multiple threads and we want to get fresh value every time we ask for it. Example of it is VTS which I used with LoadRunner. HTTP Simple Table Server has the same idea. When I launch 2 threads I get exception: KafkaConsumer is not safe for multi-threaded access. How should I configure consumers to get fresh values with multiple threads and is it possible? consumer consumer_config logs_kafka

rollno748 commented 5 months ago

Hello,

This plugin is designed in such a way to support the multi threaded functionality.

Is it possible for you to attach a sample test plan, I will have a look at it and come back at the earliest?

polyakovmyu commented 5 months ago

I have made simple test plan with all used plugins, modified elements were marked with >, except kafka elements. jmeter version 5.6.3 java version 17 Kafka_playground.zip

rollno748 commented 5 months ago

Thanks for addressing this issue. I have validated it, it is throwing error while running the consumer with multiple threads

2024-02-21 16:33:09,684 ERROR o.a.j.t.JMeterThread: Error while processing sampler: 'Thread-2-Read'.
java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
    at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2495) ~[kafka-clients-3.3.1.jar:?]
    at org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:2479) ~[kafka-clients-3.3.1.jar:?]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1227) ~[kafka-clients-3.3.1.jar:?]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1220) ~[kafka-clients-3.3.1.jar:?]
    at com.di.jmeter.kafka.sampler.KafkaConsumerSampler.getConsumerRecords(KafkaConsumerSampler.java:90) ~[di-kafkameter-1.2.jar:?]
    at com.di.jmeter.kafka.sampler.KafkaConsumerSampler.sample(KafkaConsumerSampler.java:73) ~[di-kafkameter-1.2.jar:?]
    at org.apache.jmeter.threads.JMeterThread.doSampling(JMeterThread.java:651) ~[ApacheJMeter_core.jar:5.6.2]
    at org.apache.jmeter.threads.JMeterThread.executeSamplePackage(JMeterThread.java:570) ~[ApacheJMeter_core.jar:5.6.2]
    at org.apache.jmeter.threads.JMeterThread.processSampler(JMeterThread.java:501) [ApacheJMeter_core.jar:5.6.2]
    at org.apache.jmeter.threads.JMeterThread.run(JMeterThread.java:268) [ApacheJMeter_core.jar:5.6.2]
    at java.lang.Thread.run(Thread.java:829) [?:?]

I will release a patch soon to fix this. Keep supporting by hitting a star and open up issues, if you have bug

rollno748 commented 4 months ago

Hey @polyakovmyu

After careful read about the consumer group, I came to know that, The Kafka consumer is not thread-safe, and multi-threaded access must be properly synchronized, which can be complex.

Typically, a single-threaded model is used where each consumer in a consumer group is mapped to a partition of the topic.

If you want to use multiple threads, you should ensure that each thread is consuming from a different partition to avoid conflicts and potential data inconsistencies

https://www.confluent.io/blog/kafka-consumer-multi-threaded-messaging/

https://stackoverflow.com/questions/44587416/kafka-single-consumer-group-in-multiple-instances

So, you should try reading each thread to partition instead of reading from a global factor on where the broker decides

I will look for other options to make it multi threaded - until then, will keep this issue open. If you come across any implementation idea, pls lmk

polyakovmyu commented 4 months ago

My solution is simple table server plugin - enough for my tasks. It would be a problem in case you want to load kafka itself.

rollno748 commented 4 months ago

Reading is not always a bottleneck its the insert. Btw, will update here when I implement a logic

I have a couple of options in mind but it won't be a replica of the real time scenario

amolsr commented 3 months ago

bump