quarkusio / quarkus

Quarkus: Supersonic Subatomic Java.
https://quarkus.io
Apache License 2.0
13.34k stars 2.55k forks source link

Kafka consumer has a lag when connecting to a broker in KRaft mode #41441

Open fedinskiy opened 5 days ago

fedinskiy commented 5 days ago

Describe the bug

I have an application, which contains a Rest endpoint, which sends messages to kafka via Emitter after receiving a HTTP request, and a consumer, which processes them. When I connect to kafka broker, which runs in a "classic" mode (with Zookeeper), the application works as expected. When I connect to kafka broker, which runs in a KRaft mode (without Zookeeper), the application sends the messages to the kafka, but do not process them, unless I wait for some time after the app started to send the first request.

Expected behavior

After the app started, it should process all incoming messages, be they from HTTP or Kafka.

Actual behavior

The app doesn't start to process the messages right away.

How to Reproduce?

  1. git clone -b slow-kraft git@github.com:fedinskiy/reproducer.git && cd reproducer
  2. [Optional] Build a image for dual-mode kafka: podman build --tag double-kafka -f Dockerfile
  3. Run Kafka in a classic mode, eg: podman run --rm -it -p9092:9092 -p2181:2181 --name kafka double-kafka:latest /tmp/classic.sh
  4. Run the tests: mvn clean verify -Dtest=GreetingResourceTest#fastTest. The logs contain line from consumer: INFO [io.qua.qe.SlowTopicConsumer] (vert.x-eventloop-thread-3) Processed Message fast
  5. Run Kafka in a kraft mode, eg podman run --rm -it -p9092:9092 -p2181:2181 --name kafka double-kafka:latest /tmp/kraft.sh
  6. Run the tests: mvn clean verify -Dtest=GreetingResourceTest#fastTest. The logs doesn't contain line from consumer.
  7. Run the tests with an initial waiting: mvn clean verify -Dtest=GreetingResourceTest#slowTest. The logs contain line from the consumer: [io.qua.qe.SlowTopicConsumer] (vert.x-eventloop-thread-3) Processed Message wait

Output of uname -a or ver

6.7.3-200.fc39.x86_64

Output of java -version

Java version: 21.0.1, vendor: Eclipse Adoptium

Quarkus version or git rev

3.11.3

Build tool (ie. output of mvnw --version or gradlew --version)

Apache Maven 3.9.6 (bc0240f3c744dd6b6ec2920b3cd08dcc295161ae)

Additional information

The error is reproduced using Quarkus tests but we first encountered using https://github.com/quarkus-qe/quarkus-test-suite, which runs an application in a prod mode.

quarkus-bot[bot] commented 5 days ago

/cc @alesj (kafka), @cescoffier (kafka), @ozangunalp (kafka)

cescoffier commented 5 days ago

I think it is the proper behavior. The consensus protocol happens after when using kraft, especially when the targeted partition is not yet determined.

fedinskiy commented 5 days ago

@cescoffier thank you! Is there a way to find, whether consensus was found from (ideally) quarkus APIs or kafka logs?

ozangunalp commented 4 days ago

If you create the topic beforehand, the consumer partition assignment would be less noticeable.