spring-projects / spring-kafka

Provides Familiar Spring Abstractions for Apache Kafka
https://projects.spring.io/spring-kafka
Apache License 2.0
2.18k stars 1.56k forks source link

Embedded kafka reporting topics not created in Integration test after spring boot upgrade to v3.2 #2936

Closed rafeeq75 closed 10 months ago

rafeeq75 commented 10 months ago

In what version(s) of Spring for Apache Kafka are you seeing this issue?

For example:

2.8.2

Between 2.7.0 and 2.8.2

Describe the bug

A clear and concise description of what the bug is. Do not create an issue to ask a question; see below.

To Reproduce

Steps to reproduce the behavior.

Expected behavior

A clear and concise description of what you expected to happen.

Sample

A link to a GitHub repository with a minimal, reproducible, sample.

Reports that include a sample will take priority over reports that do not. Sometimes, we may require a sample, so it is good to try to include a sample up front.

we are trying to up grade springboot from v2.7 to v3.2 , we resolved dependencies issues succesfully. one Integration test is failing which was working fine, this test tries to post a message to a kafka topic

springboot 3.2 uses springkafka test v3.1.0 internally

@EmbeddedKafka(partitions = 2, brokerProperties = {"listeners=PLAINTEXT://localhost:9092", "port=9092"}, topics={"topic1","topic2"])

i get the error topic 'topic1' does not exist in metadata after NN ms

due to the below code timing out, we have tried increasing the time out too

ProducerMetadata.java 
public synchronized void awaitUpdate(final int lastVersion, final long timeoutMs) throws InterruptedException {
        long currentTimeMs = time.milliseconds();
        long deadlineMs = currentTimeMs + timeoutMs < 0 ? Long.MAX_VALUE : currentTimeMs + timeoutMs;
        time.waitObject(this, () -> {
            // Throw fatal exceptions, if there are any. Recoverable topic errors will be handled by the caller.
            maybeThrowFatalException();
            return updateVersion() > lastVersion || isClosed();
        }, deadlineMs);

        if (isClosed())
            throw new KafkaException("Requested metadata update after close");
    }

I debugged through the code and found that the metadata contains the topic I configured, il.e., it is being created.

artembilan commented 10 months ago

Spring for Apache Kafka 3.1 has introduced KRaft support for embedded broker: https://docs.spring.io/spring-kafka/reference/whats-new.html#x31-ekb.

As it turned out , the kafka.testkit.KafkaClusterTestKit does not support assigned ports and rely only on randomly generated. So, that your listeners=PLAINTEXT://localhost:9092", "port=9092" is just ignored in case of KRaft mode. See more info in similar issue: https://github.com/spring-projects/spring-kafka/issues/2914.

So, you have a choice like use kraft = false on that @EmbeddedKafka, or rework your test configuration to rely on randomly generated ports.

Closed as Works as Designed