vert-x3 / vertx-kafka-client

Reactive Kafka Client for Vert.x
Apache License 2.0
84 stars 82 forks source link

Kafka Consumer batchHandler() not working unless handler #170

Closed valeriodurante closed 9 months ago

valeriodurante commented 4 years ago

Hi all, I think this might be a bug, but if it's not maybe it's a wrong report.

Questions

I try to use the batchHandler, but with no success. Apparently the client connects to kafka server but doesn't want to read messages. I searched the problem on the web and I found this solution: https://groups.google.com/d/topic/vertx/Rz984-m11bU infact, with both handler and batchHandler all works fine. Is it correct? In the docs I don't find nothing about it.

Version

vertx-kafka-client: 3.8.3

Steps to reproduce

  1. Istance KafkaConsumer with handler
  2. it works fine
  3. change hadler with batchHandler
  4. it doesn't work, not poll messages
  5. added hadler also
  6. it works fine again with batch

Extra

JVM: 1.8.0_221 auth with kerberos - protocol SASL_TEXT

barryzhounb commented 4 years ago

Version: compile group: 'io.vertx', name: 'vertx-kafka-client', version: '3.9.0'

Consumer batchHandler() doesn't work, but Consumer handler() works.

barryzhounb commented 4 years ago

Hi guys, Any update? batchHandler doesn't receive any message from Kafak broker.

In production, we need batchHandler, even though handler works, because we need consumer performance.

        KafkaConsumer<Long, byte[]> kafkaConsumer = createConsumer(topicName, props);

        kafkaConsumer.batchHandler(batchRecords -> {

            if (batchRecords.size() > 0) {
                for (int i = 0; i < batchRecords.size(); i++) {

                    KafkaConsumerRecord<Long, byte[]> record = batchRecords.recordAt(i);

                    log.info("Consumer received: " + new String(record.value()));
                }
                kafkaConsumer.commit();
            }
        });
vietj commented 4 years ago

can you investigate @ppatierno ?

barryzhounb commented 4 years ago

Hi @vietj , I saw this issue was put in 3.9.2 milestone, thanks a lot. I am wondering when the fix can be released ? Because we are awaiting its fix. Currently, our project was blocked by it because Kafka consumer does not support batch handler. Or, it there any workaround for batch handler.

ppatierno commented 4 years ago

@barryzhounb @vietj I have no bandwidth right now to investigate this sorry. If it's a real blocker for you, we are happy to accept a PR with a fix and I will be happy to review. At the end it's open :-)

barryzhounb commented 4 years ago

Hi @ppatierno and @vietj , I never fix the issue in the vertx, however, you can point out the files that are related to batch handler, I can figure out the issue and try to fix. Thanks.

barryzhounb commented 4 years ago

Hi @vietj , any updates for this issue?

vietj commented 4 years ago

it is expected that batchHandler will not trigger reading from Kafka,

have you tried calling stream.resume() instead of setting an handler ?

wem commented 3 years ago

I think the cause is the KafkaReadStreamImpl strongly depends on the presence of the handler. At least within the schedule method, it calls the run method (where the polling will happen) only if the handler is set. io/vertx/kafka/client/consumer/impl/KafkaReadStreamImpl.java:186

calohmn commented 3 years ago

@wem I agree (I guess you mean "strongly depends" instead of "hardly")

it is expected that batchHandler will not trigger reading from Kafka,

I think it should at least be made very clear in the KafkaConsumer#batchHandler javadoc that the batchHandler will only be invoked if a handler has been set via the handler() method. In general, I think it would be much more intuitive, if setting the batchHandler alone would also start the polling loop, without the dependence on the handler() method.

wem commented 3 years ago

@calohmn ... Yes for sure i mean strongly, i did update my comment :)

I agree, imo both handlers should be operational independently of each other. Another finding is that only the usual handler is covered by tracing. But that's another issue.

tsegismont commented 9 months ago

Fixed by https://github.com/vert-x3/vertx-kafka-client/pull/261