quarkusio / quarkus

Quarkus: Supersonic Subatomic Java.
https://quarkus.io
Apache License 2.0
13.44k stars 2.58k forks source link

Kafka transactional sending API behave in an unexpected way #37172

Open krisztiankocsis opened 8 months ago

krisztiankocsis commented 8 months ago

Describe the bug

We are sending multiple Kafka messages within a Kafka transaction and use the code below to make in synchronous, because we have to know in that location of the code that the message is both accepted. I expected that waiting on the Uni indefinitely do this, and throws if something is not okay.

This is injected: KafkaTransactions messageProducer;

private void sendMessages(
    @NotNull final ImmutableList<
        Map.Entry<MyDataRecord, MyCompletionRecord>> simulationChunks) {
    messageProducer.withTransaction(emitter -> {
            simulationChunks
                .stream()
                .map(Map.Entry::getKey)
                .forEach(emitter::send);

            return Uni
                .createFrom()
                .voidItem();
        })
        .await()
        .indefinitely();
}

One example is, if the message is too big, I'll later get a log message from Kafka: The message is 95645096 bytes when serialized which is larger than 83886080, which is the value of the max.request.size configuration.

I want to be able to catch that errors here in the code, or need an other way to catch such errors in a way that I can identify the transaction - because I have to mark the related entity as "Failed to send".

Quarkus version: 3.4.3

Expected behavior

Uni.await().indefinitely() returns only if the message is already sent.

Actual behavior

Seem to return when the message is only enqueue inside Kafka.

How to Reproduce?

No response

Output of uname -a or ver

No response

Output of java -version

17

Quarkus version or git rev

No response

Build tool (ie. output of mvnw --version or gradlew --version)

No response

Additional information

No response

quarkus-bot[bot] commented 8 months ago

/cc @alesj (kafka), @cescoffier (kafka), @ozangunalp (kafka)

krisztiankocsis commented 7 months ago

@ozangunalp Is there any news about this issue?

ozangunalp commented 7 months ago

Sorry for my late response. This was indeed fixed on the upstream Reactive Messaging, and will be included in the next release (3.7) of Quarkus.