rollno748 / di-kafkameter

JMeter Plugin to load test Apache Kafka topics/brokers
34 stars 11 forks source link

Problems running more than one consumer #10

Closed HaddadJoe closed 1 year ago

HaddadJoe commented 1 year ago

I have a simple setup trying to run a kafka consumer. I tried to follow the wiki for the consumer but it's not well documented. Please see https://github.com/rollno748/di-kafkameter/issues/9. The wiki mentions that the Variable Name should be unique. I'm trying to generate a unique one per thread. Everything works fine with 1 thread but scaling to more than on thread raises the following error

image image image
2023-01-29 19:10:25,503 INFO c.d.j.k.s.KafkaConsumerSampler: Kafka Consumer config not initialized properly.. Check the config element
2023-01-29 19:10:25,502 ERROR o.a.j.t.JMeterThread: Error while processing sampler: 'Kafka Consumer Samplerclient__StandardJMeterEngine'.
java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
    at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2495) ~[kafka-clients-3.3.1.jar:?]
    at org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:2479) ~[kafka-clients-3.3.1.jar:?]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1227) ~[kafka-clients-3.3.1.jar:?]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1220) ~[kafka-clients-3.3.1.jar:?]
    at com.di.jmeter.kafka.sampler.KafkaConsumerSampler.getConsumerRecords(KafkaConsumerSampler.java:73) ~[di-kafkameter-1.1.jar:?]
    at com.di.jmeter.kafka.sampler.KafkaConsumerSampler.sample(KafkaConsumerSampler.java:56) ~[di-kafkameter-1.1.jar:?]
    at org.apache.jmeter.threads.JMeterThread.doSampling(JMeterThread.java:651) ~[ApacheJMeter_core.jar:5.5]
    at org.apache.jmeter.threads.JMeterThread.executeSamplePackage(JMeterThread.java:570) ~[ApacheJMeter_core.jar:5.5]
    at org.apache.jmeter.threads.JMeterThread.processSampler(JMeterThread.java:501) ~[ApacheJMeter_core.jar:5.5]
    at org.apache.jmeter.threads.JMeterThread.run(JMeterThread.java:268) ~[ApacheJMeter_core.jar:5.5]
    at java.lang.Thread.run(Thread.java:1589) ~[?:?]
rollno748 commented 1 year ago

Hello Thanks for letting me know about the issue. Actually Unique per topic not per thread. - You can define a variable name bound to topic. Can you please try that ?

Also Can you please share your jmeter.log to understand better on the scenario

HaddadJoe commented 1 year ago

I'm running a large benchmark of 200 partitions per topic. The goal is to scale up to 200 consumers but with the current limitation it doesn't look like the consumer will scale if it can't be ran on multiple threads.

Producer

<?xml version="1.0" encoding="UTF-8"?>
<jmeterTestPlan version="1.2" properties="5.0" jmeter="5.5">
  <hashTree>
    <TestPlan guiclass="TestPlanGui" testclass="TestPlan" testname="Local Plan" enabled="true">
      <stringProp name="TestPlan.comments"></stringProp>
      <boolProp name="TestPlan.functional_mode">false</boolProp>
      <boolProp name="TestPlan.tearDown_on_shutdown">true</boolProp>
      <boolProp name="TestPlan.serialize_threadgroups">false</boolProp>
      <elementProp name="TestPlan.user_defined_variables" elementType="Arguments" guiclass="ArgumentsPanel" testclass="Arguments" testname="User Defined Variables" enabled="true">
        <collectionProp name="Arguments.arguments"/>
      </elementProp>
      <stringProp name="TestPlan.user_define_classpath"></stringProp>
    </TestPlan>
    <hashTree>
      <com.di.jmeter.kafka.config.KafkaProducerConfig guiclass="TestBeanGUI" testclass="com.di.jmeter.kafka.config.KafkaProducerConfig" testname="KafkaProducerConfig" enabled="true">
        <stringProp name="batchSize">16384</stringProp>
        <stringProp name="clientId">Jmeter-Producer-1</stringProp>
        <collectionProp name="extraConfigs"/>
        <boolProp name="isSsl">false</boolProp>
        <stringProp name="kafkaBrokers">localhost:9092</stringProp>
        <stringProp name="kafkaSslKeystore"></stringProp>
        <stringProp name="kafkaSslKeystorePassword">Chang3M3</stringProp>
        <stringProp name="kafkaSslTruststore"></stringProp>
        <stringProp name="kafkaSslTruststorePassword">Chang3M3</stringProp>
        <stringProp name="serializerKey">org.apache.kafka.common.serialization.StringSerializer</stringProp>
        <stringProp name="serializerValue">org.apache.kafka.common.serialization.StringSerializer</stringProp>
        <stringProp name="kafkaProducerClientVariableName">KafkaProducerClient</stringProp>
        <stringProp name="securityType">securityType.plaintext</stringProp>
        <stringProp name="kafkaSslPrivateKeyPass">Chang3M3</stringProp>
      </com.di.jmeter.kafka.config.KafkaProducerConfig>
      <hashTree/>
      <ThreadGroup guiclass="ThreadGroupGui" testclass="ThreadGroup" testname="Thread Group Broker 1" enabled="true">
        <stringProp name="ThreadGroup.on_sample_error">continue</stringProp>
        <elementProp name="ThreadGroup.main_controller" elementType="LoopController" guiclass="LoopControlPanel" testclass="LoopController" testname="Loop Controller" enabled="true">
          <boolProp name="LoopController.continue_forever">false</boolProp>
          <intProp name="LoopController.loops">-1</intProp>
        </elementProp>
        <stringProp name="ThreadGroup.num_threads">10</stringProp>
        <stringProp name="ThreadGroup.ramp_time">1</stringProp>
        <boolProp name="ThreadGroup.scheduler">true</boolProp>
        <stringProp name="ThreadGroup.duration">600</stringProp>
        <stringProp name="ThreadGroup.delay"></stringProp>
        <boolProp name="ThreadGroup.same_user_on_next_iteration">false</boolProp>
      </ThreadGroup>
      <hashTree>
        <com.di.jmeter.kafka.sampler.KafkaProducerSampler guiclass="TestBeanGUI" testclass="com.di.jmeter.kafka.sampler.KafkaProducerSampler" testname="KafkaProducerSampler" enabled="true">
          <stringProp name="kafkaMessage">\x02\x03\x05\x07\x02\x03\x05\x07\x02\x03\x05\x07\x02\x03\x05\x07\x02\x03\x05\x07\x02\x03\x05\x07\x02\x03\x05\x07\x02\x03\x05\x07\x02\x03\x05\x07\x02\x03\x05\x07\x02\x03\x05\x07\x02\x03\x05\x07\x02\x03\x05\x07\x02\x03\x05\x07\x02\x03\x05\x07\x02\x03\x05\x07\x02\x03\x05\x07\x02\x03\x05\x07\x02\x03\x05\x07\x02\x03\x05\x07\x02\x03\x05\x07\x02\x03\x05\x07\x02\x03\x05\x07\x02\x03\x05\x07\x02\x03\x05\x07</stringProp>
          <stringProp name="kafkaMessageKey"></stringProp>
          <stringProp name="kafkaTopic">partition-topic</stringProp>
          <collectionProp name="messageHeaders"/>
          <stringProp name="partitionString"></stringProp>
          <stringProp name="kafkaProducerClientVariableName">KafkaProducerClient</stringProp>
        </com.di.jmeter.kafka.sampler.KafkaProducerSampler>
        <hashTree/>
      </hashTree>
      <ResultCollector guiclass="StatVisualizer" testclass="ResultCollector" testname="Aggregate Report" enabled="true">
        <boolProp name="ResultCollector.error_logging">false</boolProp>
        <objProp>
          <name>saveConfig</name>
          <value class="SampleSaveConfiguration">
            <time>true</time>
            <latency>true</latency>
            <timestamp>true</timestamp>
            <success>true</success>
            <label>true</label>
            <code>true</code>
            <message>true</message>
            <threadName>true</threadName>
            <dataType>true</dataType>
            <encoding>false</encoding>
            <assertions>true</assertions>
            <subresults>true</subresults>
            <responseData>false</responseData>
            <samplerData>false</samplerData>
            <xml>false</xml>
            <fieldNames>true</fieldNames>
            <responseHeaders>false</responseHeaders>
            <requestHeaders>false</requestHeaders>
            <responseDataOnError>false</responseDataOnError>
            <saveAssertionResultsFailureMessage>true</saveAssertionResultsFailureMessage>
            <assertionsResultsToSave>0</assertionsResultsToSave>
            <bytes>true</bytes>
            <sentBytes>true</sentBytes>
            <url>true</url>
            <threadCounts>true</threadCounts>
            <idleTime>true</idleTime>
            <connectTime>true</connectTime>
          </value>
        </objProp>
        <stringProp name="filename"></stringProp>
      </ResultCollector>
      <hashTree/>
      <ResultCollector guiclass="ViewResultsFullVisualizer" testclass="ResultCollector" testname="View Results Tree" enabled="true">
        <boolProp name="ResultCollector.error_logging">false</boolProp>
        <objProp>
          <name>saveConfig</name>
          <value class="SampleSaveConfiguration">
            <time>true</time>
            <latency>true</latency>
            <timestamp>true</timestamp>
            <success>true</success>
            <label>true</label>
            <code>true</code>
            <message>true</message>
            <threadName>true</threadName>
            <dataType>true</dataType>
            <encoding>false</encoding>
            <assertions>true</assertions>
            <subresults>true</subresults>
            <responseData>false</responseData>
            <samplerData>false</samplerData>
            <xml>false</xml>
            <fieldNames>true</fieldNames>
            <responseHeaders>false</responseHeaders>
            <requestHeaders>false</requestHeaders>
            <responseDataOnError>false</responseDataOnError>
            <saveAssertionResultsFailureMessage>true</saveAssertionResultsFailureMessage>
            <assertionsResultsToSave>0</assertionsResultsToSave>
            <bytes>true</bytes>
            <sentBytes>true</sentBytes>
            <url>true</url>
            <threadCounts>true</threadCounts>
            <idleTime>true</idleTime>
            <connectTime>true</connectTime>
          </value>
        </objProp>
        <stringProp name="filename"></stringProp>
      </ResultCollector>
      <hashTree/>
    </hashTree>
  </hashTree>
</jmeterTestPlan>

Consumer

<?xml version="1.0" encoding="UTF-8"?>
<jmeterTestPlan version="1.2" properties="5.0" jmeter="5.5">
  <hashTree>
    <TestPlan guiclass="TestPlanGui" testclass="TestPlan" testname="Local Plan" enabled="true">
      <stringProp name="TestPlan.comments"></stringProp>
      <boolProp name="TestPlan.functional_mode">false</boolProp>
      <boolProp name="TestPlan.tearDown_on_shutdown">true</boolProp>
      <boolProp name="TestPlan.serialize_threadgroups">false</boolProp>
      <elementProp name="TestPlan.user_defined_variables" elementType="Arguments" guiclass="ArgumentsPanel" testclass="Arguments" testname="User Defined Variables" enabled="true">
        <collectionProp name="Arguments.arguments"/>
      </elementProp>
      <stringProp name="TestPlan.user_define_classpath"></stringProp>
    </TestPlan>
    <hashTree>
      <ThreadGroup guiclass="ThreadGroupGui" testclass="ThreadGroup" testname="Thread Group" enabled="true">
        <stringProp name="ThreadGroup.on_sample_error">continue</stringProp>
        <elementProp name="ThreadGroup.main_controller" elementType="LoopController" guiclass="LoopControlPanel" testclass="LoopController" testname="Loop Controller" enabled="true">
          <boolProp name="LoopController.continue_forever">false</boolProp>
          <intProp name="LoopController.loops">-1</intProp>
        </elementProp>
        <stringProp name="ThreadGroup.num_threads">10</stringProp>
        <stringProp name="ThreadGroup.ramp_time">1</stringProp>
        <boolProp name="ThreadGroup.scheduler">true</boolProp>
        <stringProp name="ThreadGroup.duration">600</stringProp>
        <stringProp name="ThreadGroup.delay"></stringProp>
        <boolProp name="ThreadGroup.same_user_on_next_iteration">true</boolProp>
      </ThreadGroup>
      <hashTree>
        <Arguments guiclass="ArgumentsPanel" testclass="Arguments" testname="User Defined Variables" enabled="true">
          <collectionProp name="Arguments.arguments">
            <elementProp name="client_name" elementType="Argument">
              <stringProp name="Argument.name">client_name</stringProp>
              <stringProp name="Argument.value">client__${__threadNum}</stringProp>
              <stringProp name="Argument.metadata">=</stringProp>
            </elementProp>
          </collectionProp>
        </Arguments>
        <hashTree/>
        <com.di.jmeter.kafka.config.KafkaConsumerConfig guiclass="TestBeanGUI" testclass="com.di.jmeter.kafka.config.KafkaConsumerConfig" testname="Kafka Consumer Config${client_name}" enabled="true">
          <stringProp name="kafkaConsumerClientVariableName">${client_name}</stringProp>
          <stringProp name="kafkaBrokers">localhost:9092</stringProp>
          <stringProp name="groupId">consumer-group-${client_name}</stringProp>
          <stringProp name="topic">partition-topic</stringProp>
          <stringProp name="deSerializerKey">org.apache.kafka.common.serialization.StringDeserializer</stringProp>
          <stringProp name="deSerializerValue">org.apache.kafka.common.serialization.StringDeserializer</stringProp>
          <stringProp name="numberOfMsgToPoll">1</stringProp>
          <boolProp name="autoCommit">true</boolProp>
          <stringProp name="securityType">securityType.plaintext</stringProp>
          <stringProp name="kafkaSslTruststore"></stringProp>
          <stringProp name="kafkaSslTruststorePassword">Chang3M3</stringProp>
          <stringProp name="kafkaSslKeystore"></stringProp>
          <stringProp name="kafkaSslKeystorePassword">Chang3M3</stringProp>
          <stringProp name="kafkaSslPrivateKeyPass">Chang3M3</stringProp>
          <collectionProp name="extraConfigs"/>
        </com.di.jmeter.kafka.config.KafkaConsumerConfig>
        <hashTree/>
        <com.di.jmeter.kafka.sampler.KafkaConsumerSampler guiclass="TestBeanGUI" testclass="com.di.jmeter.kafka.sampler.KafkaConsumerSampler" testname="Kafka Consumer Sampler${client_name}" enabled="true">
          <stringProp name="kafkaConsumerClientVariableName">${client_name}</stringProp>
          <stringProp name="pollTimeout">100</stringProp>
          <stringProp name="commitType">Sync</stringProp>
        </com.di.jmeter.kafka.sampler.KafkaConsumerSampler>
        <hashTree>
          <SizeAssertion guiclass="SizeAssertionGui" testclass="SizeAssertion" testname="Size Assertion" enabled="true">
            <stringProp name="Assertion.test_field">SizeAssertion.response_data</stringProp>
            <stringProp name="SizeAssertion.size">2</stringProp>
            <intProp name="SizeAssertion.operator">1</intProp>
          </SizeAssertion>
          <hashTree/>
        </hashTree>
      </hashTree>
      <ResultCollector guiclass="StatVisualizer" testclass="ResultCollector" testname="Aggregate Report" enabled="true">
        <boolProp name="ResultCollector.error_logging">false</boolProp>
        <objProp>
          <name>saveConfig</name>
          <value class="SampleSaveConfiguration">
            <time>true</time>
            <latency>true</latency>
            <timestamp>true</timestamp>
            <success>true</success>
            <label>true</label>
            <code>true</code>
            <message>true</message>
            <threadName>true</threadName>
            <dataType>true</dataType>
            <encoding>false</encoding>
            <assertions>true</assertions>
            <subresults>true</subresults>
            <responseData>false</responseData>
            <samplerData>false</samplerData>
            <xml>false</xml>
            <fieldNames>true</fieldNames>
            <responseHeaders>false</responseHeaders>
            <requestHeaders>false</requestHeaders>
            <responseDataOnError>false</responseDataOnError>
            <saveAssertionResultsFailureMessage>true</saveAssertionResultsFailureMessage>
            <assertionsResultsToSave>0</assertionsResultsToSave>
            <bytes>true</bytes>
            <sentBytes>true</sentBytes>
            <url>true</url>
            <threadCounts>true</threadCounts>
            <idleTime>true</idleTime>
            <connectTime>true</connectTime>
          </value>
        </objProp>
        <stringProp name="filename"></stringProp>
      </ResultCollector>
      <hashTree/>
      <ResultCollector guiclass="ViewResultsFullVisualizer" testclass="ResultCollector" testname="View Results Tree" enabled="true">
        <boolProp name="ResultCollector.error_logging">false</boolProp>
        <objProp>
          <name>saveConfig</name>
          <value class="SampleSaveConfiguration">
            <time>true</time>
            <latency>true</latency>
            <timestamp>true</timestamp>
            <success>true</success>
            <label>true</label>
            <code>true</code>
            <message>true</message>
            <threadName>true</threadName>
            <dataType>true</dataType>
            <encoding>false</encoding>
            <assertions>true</assertions>
            <subresults>true</subresults>
            <responseData>false</responseData>
            <samplerData>false</samplerData>
            <xml>false</xml>
            <fieldNames>true</fieldNames>
            <responseHeaders>false</responseHeaders>
            <requestHeaders>false</requestHeaders>
            <responseDataOnError>false</responseDataOnError>
            <saveAssertionResultsFailureMessage>true</saveAssertionResultsFailureMessage>
            <assertionsResultsToSave>0</assertionsResultsToSave>
            <bytes>true</bytes>
            <sentBytes>true</sentBytes>
            <url>true</url>
            <threadCounts>true</threadCounts>
            <idleTime>true</idleTime>
            <connectTime>true</connectTime>
          </value>
        </objProp>
        <stringProp name="filename"></stringProp>
      </ResultCollector>
      <hashTree/>
    </hashTree>
  </hashTree>
</jmeterTestPlan>
rollno748 commented 1 year ago

Thanks for sharing - it appears to be the jmx you are using, it would be great if you could attach the jmeter.log file

Ideally, it should create more consumer according to the logic - But never tested this scenario, of having multiple consumers per topic

Do you see this issue when reaching a limit or with the lower value as well (For example 10 threads ) ?

HaddadJoe commented 1 year ago

Problem happens even for 2 threads. I'm testing with 10

here are the logs. I tried moving the kafka consumer config out of the threadgroup too but it doesn't work

2023-01-29 20:10:32,079 INFO o.a.j.e.StandardJMeterEngine: Running the test!
2023-01-29 20:10:32,079 INFO o.a.j.s.SampleEvent: List of sample_variables: []
2023-01-29 20:10:32,080 INFO o.a.k.c.c.ConsumerConfig: ConsumerConfig values: 
    allow.auto.create.topics = true
    auto.commit.interval.ms = 5000
    auto.offset.reset = latest
    bootstrap.servers = [localhost:9092]
    check.crcs = true
    client.dns.lookup = use_all_dns_ips
    client.id = consumer-consumer-group-client__StandardJMeterEngine-26
    client.rack = 
    connections.max.idle.ms = 540000
    default.api.timeout.ms = 60000
    enable.auto.commit = true
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = consumer-group-client__StandardJMeterEngine
    group.instance.id = null
    heartbeat.interval.ms = 3000
    interceptor.classes = []
    internal.leave.group.on.close = true
    internal.throw.on.fetch.stable.offset.unsupported = false
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 1
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor, class org.apache.kafka.clients.consumer.CooperativeStickyAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.connect.timeout.ms = null
    sasl.login.read.timeout.ms = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.login.retry.backoff.max.ms = 10000
    sasl.login.retry.backoff.ms = 100
    sasl.mechanism = GSSAPI
    sasl.oauthbearer.clock.skew.seconds = 30
    sasl.oauthbearer.expected.audience = null
    sasl.oauthbearer.expected.issuer = null
    sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
    sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
    sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
    sasl.oauthbearer.jwks.endpoint.url = null
    sasl.oauthbearer.scope.claim.name = scope
    sasl.oauthbearer.sub.claim.name = sub
    sasl.oauthbearer.token.endpoint.url = null
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    session.timeout.ms = 45000
    socket.connection.setup.timeout.max.ms = 30000
    socket.connection.setup.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
    ssl.endpoint.identification.algorithm = https
    ssl.engine.factory.class = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.certificate.chain = null
    ssl.keystore.key = null
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.3
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.certificates = null
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

2023-01-29 20:10:32,084 INFO o.a.k.c.u.AppInfoParser: Kafka version: 3.3.1
2023-01-29 20:10:32,084 INFO o.a.k.c.u.AppInfoParser: Kafka commitId: e23c59d00e687ff5
2023-01-29 20:10:32,084 INFO o.a.k.c.u.AppInfoParser: Kafka startTimeMs: 1675023032084
2023-01-29 20:10:32,085 INFO o.a.k.c.c.KafkaConsumer: [Consumer clientId=consumer-consumer-group-client__StandardJMeterEngine-26, groupId=consumer-group-client__StandardJMeterEngine] Subscribed to topic(s): partition-topic
2023-01-29 20:10:32,085 INFO c.d.j.k.c.KafkaConsumerConfig: Kafka consumer client successfully Initialized
2023-01-29 20:10:32,085 INFO o.a.j.g.u.JMeterMenuBar: setRunning(true, *local*)
2023-01-29 20:10:32,146 INFO o.a.j.e.StandardJMeterEngine: Starting ThreadGroup: 1 : Thread Group
2023-01-29 20:10:32,146 INFO o.a.j.e.StandardJMeterEngine: Starting 2 threads for group Thread Group.
2023-01-29 20:10:32,146 INFO o.a.j.e.StandardJMeterEngine: Thread will continue on error
2023-01-29 20:10:32,146 INFO o.a.j.t.ThreadGroup: Starting thread group... number=1 threads=2 ramp-up=1 delayedStart=false
2023-01-29 20:10:32,147 INFO o.a.j.t.JMeterThread: Thread started: Thread Group 1-1
2023-01-29 20:10:32,147 INFO o.a.j.t.ThreadGroup: Started thread group number 1
2023-01-29 20:10:32,147 INFO o.a.j.e.StandardJMeterEngine: All thread groups have been started
2023-01-29 20:10:32,159 INFO o.a.k.c.Metadata: [Consumer clientId=consumer-consumer-group-client__StandardJMeterEngine-26, groupId=consumer-group-client__StandardJMeterEngine] Resetting the last seen epoch of partition partition-topic-0 to 0 since the associated topicId changed from null to 47fgebMuSq6bN9C6fz24WQ
2023-01-29 20:10:32,159 INFO o.a.k.c.Metadata: [Consumer clientId=consumer-consumer-group-client__StandardJMeterEngine-26, groupId=consumer-group-client__StandardJMeterEngine] Cluster ID: 2Pg2fzVHTGOIZE584iarpw
2023-01-29 20:10:32,160 INFO o.a.k.c.c.i.ConsumerCoordinator: [Consumer clientId=consumer-consumer-group-client__StandardJMeterEngine-26, groupId=consumer-group-client__StandardJMeterEngine] Discovered group coordinator localhost:9092 (id: 2147483646 rack: null)
2023-01-29 20:10:32,161 INFO o.a.k.c.c.i.ConsumerCoordinator: [Consumer clientId=consumer-consumer-group-client__StandardJMeterEngine-26, groupId=consumer-group-client__StandardJMeterEngine] (Re-)joining group
2023-01-29 20:10:32,170 INFO o.a.k.c.c.i.ConsumerCoordinator: [Consumer clientId=consumer-consumer-group-client__StandardJMeterEngine-26, groupId=consumer-group-client__StandardJMeterEngine] Request joining group due to: need to re-join with the given member-id: consumer-consumer-group-client__StandardJMeterEngine-26-2f90731a-74b6-4843-b156-e22bbb344c75
2023-01-29 20:10:32,170 INFO o.a.k.c.c.i.ConsumerCoordinator: [Consumer clientId=consumer-consumer-group-client__StandardJMeterEngine-26, groupId=consumer-group-client__StandardJMeterEngine] Request joining group due to: rebalance failed due to 'The group member needs to have a valid member id before actually entering a consumer group.' (MemberIdRequiredException)
2023-01-29 20:10:32,170 INFO o.a.k.c.c.i.ConsumerCoordinator: [Consumer clientId=consumer-consumer-group-client__StandardJMeterEngine-26, groupId=consumer-group-client__StandardJMeterEngine] (Re-)joining group
2023-01-29 20:10:32,650 INFO o.a.j.t.JMeterThread: Thread started: Thread Group 1-2
2023-01-29 20:10:32,662 ERROR o.a.j.t.JMeterThread: Error while processing sampler: 'Kafka Consumer Samplerclient__StandardJMeterEngine'.
java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
    at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2495) ~[kafka-clients-3.3.1.jar:?]
    at org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:2479) ~[kafka-clients-3.3.1.jar:?]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1227) ~[kafka-clients-3.3.1.jar:?]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1220) ~[kafka-clients-3.3.1.jar:?]
    at com.di.jmeter.kafka.sampler.KafkaConsumerSampler.getConsumerRecords(KafkaConsumerSampler.java:73) ~[di-kafkameter-1.1.jar:?]
    at com.di.jmeter.kafka.sampler.KafkaConsumerSampler.sample(KafkaConsumerSampler.java:56) ~[di-kafkameter-1.1.jar:?]
    at org.apache.jmeter.threads.JMeterThread.doSampling(JMeterThread.java:651) ~[ApacheJMeter_core.jar:5.5]
    at org.apache.jmeter.threads.JMeterThread.executeSamplePackage(JMeterThread.java:570) ~[ApacheJMeter_core.jar:5.5]
    at org.apache.jmeter.threads.JMeterThread.processSampler(JMeterThread.java:501) ~[ApacheJMeter_core.jar:5.5]
    at org.apache.jmeter.threads.JMeterThread.run(JMeterThread.java:268) ~[ApacheJMeter_core.jar:5.5]
    at java.lang.Thread.run(Thread.java:1589) ~[?:?]
2023-01-29 20:10:32,662 ERROR o.a.j.t.JMeterThread: Error while processing sampler: 'Kafka Consumer Samplerclient__StandardJMeterEngine'.
java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
    at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2495) ~[kafka-clients-3.3.1.jar:?]
    at org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:2479) ~[kafka-clients-3.3.1.jar:?]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1227) ~[kafka-clients-3.3.1.jar:?]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1220) ~[kafka-clients-3.3.1.jar:?]
    at com.di.jmeter.kafka.sampler.KafkaConsumerSampler.getConsumerRecords(KafkaConsumerSampler.java:73) ~[di-kafkameter-1.1.jar:?]
    at com.di.jmeter.kafka.sampler.KafkaConsumerSampler.sample(KafkaConsumerSampler.java:56) ~[di-kafkameter-1.1.jar:?]
    at org.apache.jmeter.threads.JMeterThread.doSampling(JMeterThread.java:651) ~[ApacheJMeter_core.jar:5.5]
    at org.apache.jmeter.threads.JMeterThread.executeSamplePackage(JMeterThread.java:570) ~[ApacheJMeter_core.jar:5.5]
    at org.apache.jmeter.threads.JMeterThread.processSampler(JMeterThread.java:501) ~[ApacheJMeter_core.jar:5.5]
    at org.apache.jmeter.threads.JMeterThread.run(JMeterThread.java:268) ~[ApacheJMeter_core.jar:5.5]
    at java.lang.Thread.run(Thread.java:1589) ~[?:?]
2023-01-29 20:10:32,662 ERROR o.a.j.t.JMeterThread: Error while processing sampler: 'Kafka Consumer Samplerclient__StandardJMeterEngine'.
java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
    at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2495) ~[kafka-clients-3.3.1.jar:?]
    at org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:2479) ~[kafka-clients-3.3.1.jar:?]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1227) ~[kafka-clients-3.3.1.jar:?]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1220) ~[kafka-clients-3.3.1.jar:?]
    at com.di.jmeter.kafka.sampler.KafkaConsumerSampler.getConsumerRecords(KafkaConsumerSampler.java:73) ~[di-kafkameter-1.1.jar:?]
    at com.di.jmeter.kafka.sampler.KafkaConsumerSampler.sample(KafkaConsumerSampler.java:56) ~[di-kafkameter-1.1.jar:?]
    at org.apache.jmeter.threads.JMeterThread.doSampling(JMeterThread.java:651) ~[ApacheJMeter_core.jar:5.5]
    at org.apache.jmeter.threads.JMeterThread.executeSamplePackage(JMeterThread.java:570) ~[ApacheJMeter_core.jar:5.5]
    at org.apache.jmeter.threads.JMeterThread.processSampler(JMeterThread.java:501) ~[ApacheJMeter_core.jar:5.5]
    at org.apache.jmeter.threads.JMeterThread.run(JMeterThread.java:268) ~[ApacheJMeter_core.jar:5.5]
    at java.lang.Thread.run(Thread.java:1589) ~[?:?]
2023-01-29 20:10:32,663 ERROR o.a.j.t.JMeterThread: Error while processing sampler: 'Kafka Consumer Samplerclient__StandardJMeterEngine'.
java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
    at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2495) ~[kafka-clients-3.3.1.jar:?]
    at org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:2479) ~[kafka-clients-3.3.1.jar:?]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1227) ~[kafka-clients-3.3.1.jar:?]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1220) ~[kafka-clients-3.3.1.jar:?]
    at com.di.jmeter.kafka.sampler.KafkaConsumerSampler.getConsumerRecords(KafkaConsumerSampler.java:73) ~[di-kafkameter-1.1.jar:?]
    at com.di.jmeter.kafka.sampler.KafkaConsumerSampler.sample(KafkaConsumerSampler.java:56) ~[di-kafkameter-1.1.jar:?]
    at org.apache.jmeter.threads.JMeterThread.doSampling(JMeterThread.java:651) ~[ApacheJMeter_core.jar:5.5]
    at org.apache.jmeter.threads.JMeterThread.executeSamplePackage(JMeterThread.java:570) ~[ApacheJMeter_core.jar:5.5]
    at org.apache.jmeter.threads.JMeterThread.processSampler(JMeterThread.java:501) ~[ApacheJMeter_core.jar:5.5]
    at org.apache.jmeter.threads.JMeterThread.run(JMeterThread.java:268) ~[ApacheJMeter_core.jar:5.5]
    at java.lang.Thread.run(Thread.java:1589) ~[?:?]
2023-01-29 20:10:32,663 ERROR o.a.j.t.JMeterThread: Error while processing sampler: 'Kafka Consumer Samplerclient__StandardJMeterEngine'.
java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
    at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2495) ~[kafka-clients-3.3.1.jar:?]
    at org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:2479) ~[kafka-clients-3.3.1.jar:?]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1227) ~[kafka-clients-3.3.1.jar:?]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1220) ~[kafka-clients-3.3.1.jar:?]
    at com.di.jmeter.kafka.sampler.KafkaConsumerSampler.getConsumerRecords(KafkaConsumerSampler.java:73) ~[di-kafkameter-1.1.jar:?]
    at com.di.jmeter.kafka.sampler.KafkaConsumerSampler.sample(KafkaConsumerSampler.java:56) ~[di-kafkameter-1.1.jar:?]
    at org.apache.jmeter.threads.JMeterThread.doSampling(JMeterThread.java:651) ~[ApacheJMeter_core.jar:5.5]
    at org.apache.jmeter.threads.JMeterThread.executeSamplePackage(JMeterThread.java:570) ~[ApacheJMeter_core.jar:5.5]
    at org.apache.jmeter.threads.JMeterThread.processSampler(JMeterThread.java:501) ~[ApacheJMeter_core.jar:5.5]
    at org.apache.jmeter.threads.JMeterThread.run(JMeterThread.java:268) ~[ApacheJMeter_core.jar:5.5]
    at java.lang.Thread.run(Thread.java:1589) ~[?:?]
2023-01-29 20:10:32,664 ERROR o.a.j.t.JMeterThread: Error while processing sampler: 'Kafka Consumer Samplerclient__StandardJMeterEngine'.
java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
    at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2495) ~[kafka-clients-3.3.1.jar:?]
    at org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:2479) ~[kafka-clients-3.3.1.jar:?]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1227) ~[kafka-clients-3.3.1.jar:?]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1220) ~[kafka-clients-3.3.1.jar:?]
    at com.di.jmeter.kafka.sampler.KafkaConsumerSampler.getConsumerRecords(KafkaConsumerSampler.java:73) ~[di-kafkameter-1.1.jar:?]
    at com.di.jmeter.kafka.sampler.KafkaConsumerSampler.sample(KafkaConsumerSampler.java:56) ~[di-kafkameter-1.1.jar:?]
    at org.apache.jmeter.threads.JMeterThread.doSampling(JMeterThread.java:651) ~[ApacheJMeter_core.jar:5.5]
    at org.apache.jmeter.threads.JMeterThread.executeSamplePackage(JMeterThread.java:570) ~[ApacheJMeter_core.jar:5.5]
    at org.apache.jmeter.threads.JMeterThread.processSampler(JMeterThread.java:501) ~[ApacheJMeter_core.jar:5.5]
    at org.apache.jmeter.threads.JMeterThread.run(JMeterThread.java:268) ~[ApacheJMeter_core.jar:5.5]
    at java.lang.Thread.run(Thread.java:1589) ~[?:?]
2023-01-29 20:10:32,664 ERROR o.a.j.t.JMeterThread: Error while processing sampler: 'Kafka Consumer Samplerclient__StandardJMeterEngine'.
java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
    at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2495) ~[kafka-clients-3.3.1.jar:?]
    at org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:2479) ~[kafka-clients-3.3.1.jar:?]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1227) ~[kafka-clients-3.3.1.jar:?]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1220) ~[kafka-clients-3.3.1.jar:?]
    at com.di.jmeter.kafka.sampler.KafkaConsumerSampler.getConsumerRecords(KafkaConsumerSampler.java:73) ~[di-kafkameter-1.1.jar:?]
    at com.di.jmeter.kafka.sampler.KafkaConsumerSampler.sample(KafkaConsumerSampler.java:56) ~[di-kafkameter-1.1.jar:?]
    at org.apache.jmeter.threads.JMeterThread.doSampling(JMeterThread.java:651) ~[ApacheJMeter_core.jar:5.5]
    at org.apache.jmeter.threads.JMeterThread.executeSamplePackage(JMeterThread.java:570) ~[ApacheJMeter_core.jar:5.5]
    at org.apache.jmeter.threads.JMeterThread.processSampler(JMeterThread.java:501) ~[ApacheJMeter_core.jar:5.5]
    at org.apache.jmeter.threads.JMeterThread.run(JMeterThread.java:268) ~[ApacheJMeter_core.jar:5.5]
    at java.lang.Thread.run(Thread.java:1589) ~[?:?]
2023-01-29 20:10:32,664 ERROR o.a.j.t.JMeterThread: Error while processing sampler: 'Kafka Consumer Samplerclient__StandardJMeterEngine'.
java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
    at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2495) ~[kafka-clients-3.3.1.jar:?]
    at org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:2479) ~[kafka-clients-3.3.1.jar:?]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1227) ~[kafka-clients-3.3.1.jar:?]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1220) ~[kafka-clients-3.3.1.jar:?]
    at com.di.jmeter.kafka.sampler.KafkaConsumerSampler.getConsumerRecords(KafkaConsumerSampler.java:73) ~[di-kafkameter-1.1.jar:?]
    at com.di.jmeter.kafka.sampler.KafkaConsumerSampler.sample(KafkaConsumerSampler.java:56) ~[di-kafkameter-1.1.jar:?]
    at org.apache.jmeter.threads.JMeterThread.doSampling(JMeterThread.java:651) ~[ApacheJMeter_core.jar:5.5]
    at org.apache.jmeter.threads.JMeterThread.executeSamplePackage(JMeterThread.java:570) ~[ApacheJMeter_core.jar:5.5]
    at org.apache.jmeter.threads.JMeterThread.processSampler(JMeterThread.java:501) ~[ApacheJMeter_core.jar:5.5]
    at org.apache.jmeter.threads.JMeterThread.run(JMeterThread.java:268) ~[ApacheJMeter_core.jar:5.5]
    at java.lang.Thread.run(Thread.java:1589) ~[?:?]
2023-01-29 20:10:32,664 ERROR o.a.j.t.JMeterThread: Error while processing sampler: 'Kafka Consumer Samplerclient__StandardJMeterEngine'.
java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
    at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2495) ~[kafka-clients-3.3.1.jar:?]
    at org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:2479) ~[kafka-clients-3.3.1.jar:?]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1227) ~[kafka-clients-3.3.1.jar:?]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1220) ~[kafka-clients-3.3.1.jar:?]
    at com.di.jmeter.kafka.sampler.KafkaConsumerSampler.getConsumerRecords(KafkaConsumerSampler.java:73) ~[di-kafkameter-1.1.jar:?]
    at com.di.jmeter.kafka.sampler.KafkaConsumerSampler.sample(KafkaConsumerSampler.java:56) ~[di-kafkameter-1.1.jar:?]
    at org.apache.jmeter.threads.JMeterThread.doSampling(JMeterThread.java:651) ~[ApacheJMeter_core.jar:5.5]
    at org.apache.jmeter.threads.JMeterThread.executeSamplePackage(JMeterThread.java:570) ~[ApacheJMeter_core.jar:5.5]
    at org.apache.jmeter.threads.JMeterThread.processSampler(JMeterThread.java:501) ~[ApacheJMeter_core.jar:5.5]
    at org.apache.jmeter.threads.JMeterThread.run(JMeterThread.java:268) ~[ApacheJMeter_core.jar:5.5]
    at java.lang.Thread.run(Thread.java:1589) ~[?:?]
2023-01-29 20:10:32,664 ERROR o.a.j.t.JMeterThread: Error while processing sampler: 'Kafka Consumer Samplerclient__StandardJMeterEngine'.
java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
    at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2495) ~[kafka-clients-3.3.1.jar:?]
    at org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:2479) ~[kafka-clients-3.3.1.jar:?]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1227) ~[kafka-clients-3.3.1.jar:?]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1220) ~[kafka-clients-3.3.1.jar:?]
    at com.di.jmeter.kafka.sampler.KafkaConsumerSampler.getConsumerRecords(KafkaConsumerSampler.java:73) ~[di-kafkameter-1.1.jar:?]
    at com.di.jmeter.kafka.sampler.KafkaConsumerSampler.sample(KafkaConsumerSampler.java:56) ~[di-kafkameter-1.1.jar:?]
    at org.apache.jmeter.threads.JMeterThread.doSampling(JMeterThread.java:651) ~[ApacheJMeter_core.jar:5.5]
    at org.apache.jmeter.threads.JMeterThread.executeSamplePackage(JMeterThread.java:570) ~[ApacheJMeter_core.jar:5.5]
    at org.apache.jmeter.threads.JMeterThread.processSampler(JMeterThread.java:501) ~[ApacheJMeter_core.jar:5.5]
    at org.apache.jmeter.threads.JMeterThread.run(JMeterThread.java:268) ~[ApacheJMeter_core.jar:5.5]
    at java.lang.Thread.run(Thread.java:1589) ~[?:?]
2023-01-29 20:10:32,665 INFO o.a.j.t.JMeterThread: Thread is done: Thread Group 1-2
2023-01-29 20:10:32,665 INFO o.a.j.t.JMeterThread: Thread finished: Thread Group 1-2
2023-01-29 20:10:35,177 INFO o.a.k.c.c.i.ConsumerCoordinator: [Consumer clientId=consumer-consumer-group-client__StandardJMeterEngine-26, groupId=consumer-group-client__StandardJMeterEngine] Successfully joined group with generation Generation{generationId=7, memberId='consumer-consumer-group-client__StandardJMeterEngine-26-2f90731a-74b6-4843-b156-e22bbb344c75', protocol='range'}
2023-01-29 20:10:35,177 INFO o.a.k.c.c.i.ConsumerCoordinator: [Consumer clientId=consumer-consumer-group-client__StandardJMeterEngine-26, groupId=consumer-group-client__StandardJMeterEngine] Finished assignment for group at generation 7: {consumer-consumer-group-client__StandardJMeterEngine-26-2f90731a-74b6-4843-b156-e22bbb344c75=Assignment(partitions=[partition-topic-0])}
2023-01-29 20:10:35,181 INFO o.a.k.c.c.i.ConsumerCoordinator: [Consumer clientId=consumer-consumer-group-client__StandardJMeterEngine-26, groupId=consumer-group-client__StandardJMeterEngine] Successfully synced group in generation Generation{generationId=7, memberId='consumer-consumer-group-client__StandardJMeterEngine-26-2f90731a-74b6-4843-b156-e22bbb344c75', protocol='range'}
2023-01-29 20:10:35,182 INFO o.a.k.c.c.i.ConsumerCoordinator: [Consumer clientId=consumer-consumer-group-client__StandardJMeterEngine-26, groupId=consumer-group-client__StandardJMeterEngine] Notifying assignor about the new Assignment(partitions=[partition-topic-0])
2023-01-29 20:10:35,182 INFO o.a.k.c.c.i.ConsumerCoordinator: [Consumer clientId=consumer-consumer-group-client__StandardJMeterEngine-26, groupId=consumer-group-client__StandardJMeterEngine] Adding newly assigned partitions: partition-topic-0
2023-01-29 20:10:35,184 INFO o.a.k.c.c.i.ConsumerCoordinator: [Consumer clientId=consumer-consumer-group-client__StandardJMeterEngine-26, groupId=consumer-group-client__StandardJMeterEngine] Setting offset for partition partition-topic-0 to the committed offset FetchPosition{offset=5087436, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 1 rack: null)], epoch=0}}
2023-01-29 20:10:35,247 INFO o.a.j.t.JMeterThread: Thread is done: Thread Group 1-1
2023-01-29 20:10:35,247 INFO o.a.j.t.JMeterThread: Thread finished: Thread Group 1-1
2023-01-29 20:10:35,247 INFO o.a.j.e.StandardJMeterEngine: Notifying test listeners of end of test
2023-01-29 20:10:35,250 INFO o.a.k.c.c.i.ConsumerCoordinator: [Consumer clientId=consumer-consumer-group-client__StandardJMeterEngine-26, groupId=consumer-group-client__StandardJMeterEngine] Revoke previously assigned partitions partition-topic-0
2023-01-29 20:10:35,250 INFO o.a.k.c.c.i.ConsumerCoordinator: [Consumer clientId=consumer-consumer-group-client__StandardJMeterEngine-26, groupId=consumer-group-client__StandardJMeterEngine] Member consumer-consumer-group-client__StandardJMeterEngine-26-2f90731a-74b6-4843-b156-e22bbb344c75 sending LeaveGroup request to coordinator localhost:9092 (id: 2147483646 rack: null) due to the consumer is being closed
2023-01-29 20:10:35,251 INFO o.a.k.c.c.i.ConsumerCoordinator: [Consumer clientId=consumer-consumer-group-client__StandardJMeterEngine-26, groupId=consumer-group-client__StandardJMeterEngine] Resetting generation and member id due to: consumer pro-actively leaving the group
2023-01-29 20:10:35,251 INFO o.a.k.c.c.i.ConsumerCoordinator: [Consumer clientId=consumer-consumer-group-client__StandardJMeterEngine-26, groupId=consumer-group-client__StandardJMeterEngine] Request joining group due to: consumer pro-actively leaving the group
2023-01-29 20:10:35,257 INFO o.a.k.c.m.Metrics: Metrics scheduler closed
2023-01-29 20:10:35,257 INFO o.a.k.c.m.Metrics: Closing reporter org.apache.kafka.common.metrics.JmxReporter
2023-01-29 20:10:35,257 INFO o.a.k.c.m.Metrics: Metrics reporters closed
2023-01-29 20:10:35,258 INFO o.a.k.c.u.AppInfoParser: App info kafka.consumer for consumer-consumer-group-client__StandardJMeterEngine-26 unregistered
2023-01-29 20:10:35,258 INFO c.d.j.k.c.KafkaConsumerConfig: Kafka consumer client connection terminated
2023-01-29 20:10:35,258 INFO o.a.j.g.u.JMeterMenuBar: setRunning(false, *local*)

reduced the loop iteration to 10 to avoid having massive log file

rollno748 commented 1 year ago

Alright - This appears to be a valid scenario to me I will check the code and push my changes at the earliest.

rollno748 commented 1 year ago

Hi,

Seems like its not creating a unique name when you use user defined variables image image

This indicates - you are trying to create two consumer with the same name/object (WEII)

HaddadJoe commented 1 year ago

Oh nice catch, is your plugin not rendering jmeter variables?On Jan 29, 2023, at 8:50 PM, Mohamed Ibrahim @.***> wrote: Hi, Seems like its not creating a unique name when you use user defined variables

This indicates - you are trying to create two consumer with the same name/object (WEII)

—Reply to this email directly, view it on GitHub, or unsubscribe.You are receiving this because you authored the thread.Message ID: @.***>

rollno748 commented 1 year ago

I dont think its plugin issue.. Its the functionality within the JMeter. I guess it makes more sense - the thread number will be created inside the thread threadgroup

HaddadJoe commented 1 year ago

My variable definition is within the threadgroup though. I see it being correctly assigned to the name of the Kafka consumer config and the name of the kafka consumer sampler. I added it in the name of the config too just to see if it's being set and it is. here's a screenshot, i'm using latest jmeter 5.5

image
rollno748 commented 1 year ago

Hey, There is something called execution order in which Jmeter executes The config element tops the list after the test plan - no matter where you keep it in the script

rollno748 commented 1 year ago

I have tried multiple consumer subscription using different config elements within the same test plan - it works for me image image

image image

In your case - the parameter function is not replacing within the config element (Which is not an issue with Plugin but JMeter nature). The workaround is you can have n number of config element in the testplan to have a unique consumer group Id and variable name to run your test.

I am marking this as not a bug. Please Hit a star on github, if you like my work on this plugin Keep supporting !

HaddadJoe commented 1 year ago

hey @rollno748 thanks for taking a look but this is an unfeasible solution. I want to run more than 100-1000s of consumers on same and different group IDs. Creating these manually is tedious. Are you saying your consumer part of your plugin would not work within a thread group? Would you plan on having support for that?

Since as you said

The config element tops the list after the test plan - no matter where you keep it in the script

then there's no way to build it using thread groups correct?

rollno748 commented 1 year ago

No config element gets updated within the thread group. I used the JMeter's ConfigTestElement to extend as a Kafka plugin. But it is worth giving a question on JMeter group.

With the current implementation of this plugin - you cannot update the values dynamically Another approach is to create a setup thread group and create the consumer config element alone using JSR223 sampler, save the connection string as vars.putObject() https://jmeter.apache.org/api/org/apache/jmeter/threads/JMeterVariables.html#putObject(java.lang.String,java.lang.Object)

Then you can use the same name in my consumer sampler to consume the messages - This way you can have multiple consumers dynamically according to your needs

giraone commented 1 year ago

I have the same problem with the current solution of the plugin. Consuming messages from a Kafka topic has to scale by using multiple threads and multiple processes - at least up to the number of partitions of a topic, so it must be possible to use the plugin with multiple threads in a group.

I have forked the project under giraone/di-kafkameter - Branch multithreaded-consumer, to support multiple threads from a Thread Group in consuming messages.

I have also changed a couple of other things:

It works basically - the only things I am struggling is "rebalancing" and shutting down the consumers correctly. @rollno748 : Give me a signal, if this is sth. you might include in your project. If not, it is also OK. Your basic setup saved me a lot of time and I have to thank you, but my performance requirements will need multi-threaded consumers.

rollno748 commented 1 year ago

Thanks for the suggestion Again, I don't think its a problem with the Plugin. If you able to pass in a dynamic variable in config element - then it should create multiple consumers according to the current release.

It is worth asking it in the dev community via email on the testplan structure which you have and how you are passing the dynamic variable to the config element.

For me, I guess the plugin works right - If you try passing a dynamic variable to any config element , I guess it works the same way as this plugin