quarkusio / quarkus

Quarkus: Supersonic Subatomic Java.
https://quarkus.io
Apache License 2.0
13.8k stars 2.68k forks source link

kafka failure strategy dead-letter-topic can't find deserializer class #34931

Closed Dieken closed 1 year ago

Dieken commented 1 year ago

Describe the bug

Enable mp.messaging.incoming.words-in.failure-strategy=dead-letter-queue then Quarkus application will fail to start:

2023-07-22 19:49:49,127 ERROR [io.sma.rea.mes.provider] (Quarkus Main Thread) SRMSG00230: Unable to create the publisher or subscriber during initialization: java.lang.IllegalArgumentException: SRMSG18010: Unable to create an instance of `org.acme.MQMessage_Serializer_5c899fb72f68994441842e7374516dfc6eaa09e7`
        at io.smallrye.reactive.messaging.kafka.fault.SerializerWrapper.createDelegateSerializer(SerializerWrapper.java:35)
        at io.smallrye.reactive.messaging.kafka.fault.SerializerWrapper.<init>(SerializerWrapper.java:25)
        at io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaProducer.<init>(ReactiveKafkaProducer.java:103)
        at io.smallrye.reactive.messaging.kafka.fault.KafkaDeadLetterQueue$Factory.create(KafkaDeadLetterQueue.java:95)
        at io.smallrye.reactive.messaging.kafka.fault.KafkaDeadLetterQueue_Factory_ClientProxy.create(Unknown Source)
        at io.smallrye.reactive.messaging.kafka.impl.KafkaSource.createFailureHandler(KafkaSource.java:309)
        at io.smallrye.reactive.messaging.kafka.impl.KafkaSource.<init>(KafkaSource.java:119)
        at io.smallrye.reactive.messaging.kafka.KafkaConnector.getPublisher(KafkaConnector.java:211)
        at io.smallrye.reactive.messaging.kafka.KafkaConnector_Subclass.getPublisher$$superforward(Unknown Source)
        at io.smallrye.reactive.messaging.kafka.KafkaConnector_Subclass$$function$$9.apply(Unknown Source)
        at io.quarkus.arc.impl.AroundInvokeInvocationContext.proceed(AroundInvokeInvocationContext.java:73)
        at io.quarkus.arc.impl.AroundInvokeInvocationContext$NextAroundInvokeInvocationContext.proceed(AroundInvokeInvocationContext.java:97)
        at io.quarkus.smallrye.reactivemessaging.runtime.devmode.DevModeSupportConnectorFactoryInterceptor.intercept(DevModeSupportConnectorFactoryInterceptor.java:53)
        at io.quarkus.smallrye.reactivemessaging.runtime.devmode.DevModeSupportConnectorFactoryInterceptor_Bean.intercept(Unknown Source)
        at io.quarkus.arc.impl.InterceptorInvocation.invoke(InterceptorInvocation.java:42)
        at io.quarkus.arc.impl.AroundInvokeInvocationContext.proceed(AroundInvokeInvocationContext.java:70)
        at io.quarkus.arc.impl.AroundInvokeInvocationContext.proceed(AroundInvokeInvocationContext.java:62)
        at io.quarkus.smallrye.reactivemessaging.runtime.DuplicatedContextConnectorFactoryInterceptor.intercept(DuplicatedContextConnectorFactoryInterceptor.java:32)
        at io.quarkus.smallrye.reactivemessaging.runtime.DuplicatedContextConnectorFactoryInterceptor_Bean.intercept(Unknown Source)
        at io.quarkus.arc.impl.InterceptorInvocation.invoke(InterceptorInvocation.java:42)
        at io.quarkus.arc.impl.AroundInvokeInvocationContext.perform(AroundInvokeInvocationContext.java:30)
        at io.quarkus.arc.impl.InvocationContexts.performAroundInvoke(InvocationContexts.java:27)
        at io.smallrye.reactive.messaging.kafka.KafkaConnector_Subclass.getPublisher(Unknown Source)
        at io.smallrye.reactive.messaging.kafka.KafkaConnector_ClientProxy.getPublisher(Unknown Source)
        at io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory.createPublisher(ConfiguredChannelFactory.java:172)
        at io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory.register(ConfiguredChannelFactory.java:134)
        at io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory.initialize(ConfiguredChannelFactory.java:106)
        at io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory_ClientProxy.initialize(Unknown Source)
        at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133)
        at java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1845)
        at java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:762)
        at io.smallrye.reactive.messaging.providers.extension.MediatorManager.start(MediatorManager.java:212)
        at io.smallrye.reactive.messaging.providers.extension.MediatorManager_ClientProxy.start(Unknown Source)
        at io.quarkus.smallrye.reactivemessaging.runtime.SmallRyeReactiveMessagingLifecycle.onApplicationStart(SmallRyeReactiveMessagingLifecycle.java:52)
        at io.quarkus.smallrye.reactivemessaging.runtime.SmallRyeReactiveMessagingLifecycle_Observer_onApplicationStart_68e7b57eb97cb75d597c5b816682366e888d0d9b.notify(Unknown Source)
        at io.quarkus.arc.impl.EventImpl$Notifier.notifyObservers(EventImpl.java:346)
        at io.quarkus.arc.impl.EventImpl$Notifier.notify(EventImpl.java:328)
        at io.quarkus.arc.impl.EventImpl.fire(EventImpl.java:82)
        at io.quarkus.arc.runtime.ArcRecorder.fireLifecycleEvent(ArcRecorder.java:155)
        at io.quarkus.arc.runtime.ArcRecorder.handleLifecycleEvents(ArcRecorder.java:106)
        at io.quarkus.deployment.steps.LifecycleEventsBuildStep$startupEvent1144526294.deploy_0(Unknown Source)
        at io.quarkus.deployment.steps.LifecycleEventsBuildStep$startupEvent1144526294.deploy(Unknown Source)
        at io.quarkus.runner.ApplicationImpl.doStart(Unknown Source)
        at io.quarkus.runtime.Application.start(Application.java:101)
        at io.quarkus.runtime.ApplicationLifecycleManager.run(ApplicationLifecycleManager.java:111)
        at io.quarkus.runtime.Quarkus.run(Quarkus.java:71)
        at io.quarkus.runtime.Quarkus.run(Quarkus.java:44)
        at io.quarkus.runtime.Quarkus.run(Quarkus.java:124)
        at io.quarkus.runner.GeneratedMain.main(Unknown Source)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:568)
        at io.quarkus.runner.bootstrap.StartupActionImpl$1.run(StartupActionImpl.java:104)
        at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.ClassNotFoundException: org.acme.MQMessage_Serializer_5c899fb72f68994441842e7374516dfc6eaa09e7
        at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:641)
        at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:188)
        at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:520)
        at io.quarkus.bootstrap.classloading.QuarkusClassLoader.loadClass(QuarkusClassLoader.java:516)
        at io.quarkus.bootstrap.classloading.QuarkusClassLoader.loadClass(QuarkusClassLoader.java:466)
        at io.quarkus.bootstrap.classloading.QuarkusClassLoader.loadClass(QuarkusClassLoader.java:516)
        at io.quarkus.bootstrap.classloading.QuarkusClassLoader.loadClass(QuarkusClassLoader.java:466)
        at java.base/java.lang.Class.forName0(Native Method)
        at java.base/java.lang.Class.forName(Class.java:467)
        at org.apache.kafka.common.utils.Utils.loadClass(Utils.java:422)
        at org.apache.kafka.common.utils.Utils.newInstance(Utils.java:411)
        at io.smallrye.reactive.messaging.kafka.fault.SerializerWrapper.createDelegateSerializer(SerializerWrapper.java:33)
        ... 54 more

2023-07-22 19:49:49,140 INFO  [io.qua.dep.dev.IsolatedDevModeMain] (main) Attempting to start live reload endpoint to recover from previous Quarkus startup failure
2023-07-22 19:49:49,140 ERROR [io.qua.run.Application] (Quarkus Main Thread) Failed to start application (with profile [dev]): java.lang.RuntimeException: Failed to start quarkus
        at io.quarkus.runner.ApplicationImpl.doStart(Unknown Source)
        at io.quarkus.runtime.Application.start(Application.java:101)
        at io.quarkus.runtime.ApplicationLifecycleManager.run(ApplicationLifecycleManager.java:111)
        at io.quarkus.runtime.Quarkus.run(Quarkus.java:71)
        at io.quarkus.runtime.Quarkus.run(Quarkus.java:44)
        at io.quarkus.runtime.Quarkus.run(Quarkus.java:124)
        at io.quarkus.runner.GeneratedMain.main(Unknown Source)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:568)
        at io.quarkus.runner.bootstrap.StartupActionImpl$1.run(StartupActionImpl.java:104)
        at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: jakarta.enterprise.inject.spi.DeploymentException: java.lang.IllegalArgumentException: SRMSG18010: Unable to create an instance of `org.acme.MQMessage_Serializer_5c899fb72f68994441842e7374516dfc6eaa09e7`
        at io.quarkus.smallrye.reactivemessaging.runtime.SmallRyeReactiveMessagingLifecycle.onApplicationStart(SmallRyeReactiveMessagingLifecycle.java:57)
        at io.quarkus.smallrye.reactivemessaging.runtime.SmallRyeReactiveMessagingLifecycle_Observer_onApplicationStart_68e7b57eb97cb75d597c5b816682366e888d0d9b.notify(Unknown Source)
        at io.quarkus.arc.impl.EventImpl$Notifier.notifyObservers(EventImpl.java:346)
        at io.quarkus.arc.impl.EventImpl$Notifier.notify(EventImpl.java:328)
        at io.quarkus.arc.impl.EventImpl.fire(EventImpl.java:82)
        at io.quarkus.arc.runtime.ArcRecorder.fireLifecycleEvent(ArcRecorder.java:155)
        at io.quarkus.arc.runtime.ArcRecorder.handleLifecycleEvents(ArcRecorder.java:106)
        at io.quarkus.deployment.steps.LifecycleEventsBuildStep$startupEvent1144526294.deploy_0(Unknown Source)
        at io.quarkus.deployment.steps.LifecycleEventsBuildStep$startupEvent1144526294.deploy(Unknown Source)
        ... 13 more
Caused by: java.lang.IllegalArgumentException: SRMSG18010: Unable to create an instance of `org.acme.MQMessage_Serializer_5c899fb72f68994441842e7374516dfc6eaa09e7`
        at io.smallrye.reactive.messaging.kafka.fault.SerializerWrapper.createDelegateSerializer(SerializerWrapper.java:35)
        at io.smallrye.reactive.messaging.kafka.fault.SerializerWrapper.<init>(SerializerWrapper.java:25)
        at io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaProducer.<init>(ReactiveKafkaProducer.java:103)
        at io.smallrye.reactive.messaging.kafka.fault.KafkaDeadLetterQueue$Factory.create(KafkaDeadLetterQueue.java:95)
        at io.smallrye.reactive.messaging.kafka.fault.KafkaDeadLetterQueue_Factory_ClientProxy.create(Unknown Source)
        at io.smallrye.reactive.messaging.kafka.impl.KafkaSource.createFailureHandler(KafkaSource.java:309)
        at io.smallrye.reactive.messaging.kafka.impl.KafkaSource.<init>(KafkaSource.java:119)
        at io.smallrye.reactive.messaging.kafka.KafkaConnector.getPublisher(KafkaConnector.java:211)
        at io.smallrye.reactive.messaging.kafka.KafkaConnector_Subclass.getPublisher$$superforward(Unknown Source)
        at io.smallrye.reactive.messaging.kafka.KafkaConnector_Subclass$$function$$9.apply(Unknown Source)
        at io.quarkus.arc.impl.AroundInvokeInvocationContext.proceed(AroundInvokeInvocationContext.java:73)
        at io.quarkus.arc.impl.AroundInvokeInvocationContext$NextAroundInvokeInvocationContext.proceed(AroundInvokeInvocationContext.java:97)
        at io.quarkus.smallrye.reactivemessaging.runtime.devmode.DevModeSupportConnectorFactoryInterceptor.intercept(DevModeSupportConnectorFactoryInterceptor.java:53)
        at io.quarkus.smallrye.reactivemessaging.runtime.devmode.DevModeSupportConnectorFactoryInterceptor_Bean.intercept(Unknown Source)
        at io.quarkus.arc.impl.InterceptorInvocation.invoke(InterceptorInvocation.java:42)
        at io.quarkus.arc.impl.AroundInvokeInvocationContext.proceed(AroundInvokeInvocationContext.java:70)
        at io.quarkus.arc.impl.AroundInvokeInvocationContext.proceed(AroundInvokeInvocationContext.java:62)
        at io.quarkus.smallrye.reactivemessaging.runtime.DuplicatedContextConnectorFactoryInterceptor.intercept(DuplicatedContextConnectorFactoryInterceptor.java:32)
        at io.quarkus.smallrye.reactivemessaging.runtime.DuplicatedContextConnectorFactoryInterceptor_Bean.intercept(Unknown Source)
        at io.quarkus.arc.impl.InterceptorInvocation.invoke(InterceptorInvocation.java:42)
        at io.quarkus.arc.impl.AroundInvokeInvocationContext.perform(AroundInvokeInvocationContext.java:30)
        at io.quarkus.arc.impl.InvocationContexts.performAroundInvoke(InvocationContexts.java:27)
        at io.smallrye.reactive.messaging.kafka.KafkaConnector_Subclass.getPublisher(Unknown Source)
        at io.smallrye.reactive.messaging.kafka.KafkaConnector_ClientProxy.getPublisher(Unknown Source)
        at io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory.createPublisher(ConfiguredChannelFactory.java:172)
        at io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory.register(ConfiguredChannelFactory.java:134)
        at io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory.initialize(ConfiguredChannelFactory.java:106)
        at io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory_ClientProxy.initialize(Unknown Source)
        at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133)
        at java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1845)
        at java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:762)
        at io.smallrye.reactive.messaging.providers.extension.MediatorManager.start(MediatorManager.java:212)
        at io.smallrye.reactive.messaging.providers.extension.MediatorManager_ClientProxy.start(Unknown Source)
        at io.quarkus.smallrye.reactivemessaging.runtime.SmallRyeReactiveMessagingLifecycle.onApplicationStart(SmallRyeReactiveMessagingLifecycle.java:52)
        ... 21 more
Caused by: java.lang.ClassNotFoundException: org.acme.MQMessage_Serializer_5c899fb72f68994441842e7374516dfc6eaa09e7
        at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:641)
        at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:188)
        at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:520)
        at io.quarkus.bootstrap.classloading.QuarkusClassLoader.loadClass(QuarkusClassLoader.java:516)
        at io.quarkus.bootstrap.classloading.QuarkusClassLoader.loadClass(QuarkusClassLoader.java:466)
        at io.quarkus.bootstrap.classloading.QuarkusClassLoader.loadClass(QuarkusClassLoader.java:516)
        at io.quarkus.bootstrap.classloading.QuarkusClassLoader.loadClass(QuarkusClassLoader.java:466)
        at java.base/java.lang.Class.forName0(Native Method)
        at java.base/java.lang.Class.forName(Class.java:467)
        at org.apache.kafka.common.utils.Utils.loadClass(Utils.java:422)
        at org.apache.kafka.common.utils.Utils.newInstance(Utils.java:411)
        at io.smallrye.reactive.messaging.kafka.fault.SerializerWrapper.createDelegateSerializer(SerializerWrapper.java:33)
        ... 54 more

Expected behavior

It should just work.

Actual behavior

It throws exception and fails to start.

How to Reproduce?

code-with-quarkus.tar.gz

Run quarkus dev, it will throw exception above.

Output of uname -a or ver

Darwin xxx 22.4.0 Darwin Kernel Version 22.4.0: Mon Mar 6 21:00:17 PST 2023; root:xnu-8796.101.5~3/RELEASE_X86_64 x86_64

Output of java -version

OpenJDK 64-Bit Server VM Temurin-17+35 (build 17+35, mixed mode, sharing)

GraalVM version (if different from Java)

No response

Quarkus version or git rev

3.2.1.Final

Build tool (ie. output of mvnw --version or gradlew --version)

Apache Maven 3.8.8 (4c87b05d9aedce574290d1acc98575ed5eb6cd39)

Additional information

No response

quarkus-bot[bot] commented 1 year ago

/cc @alesj (kafka), @cescoffier (kafka), @ozangunalp (kafka)

ozangunalp commented 1 year ago

Dead letter queue strategy tries to find a serializer given the name of the incoming channel deserializer. In your case it seems like an auto generated deserializer. You can look at the dead letter queue config to configure the serializer: https://smallrye.io/smallrye-reactive-messaging/4.8.0/kafka/receiving-kafka-records/#dead-letter-queue

Dieken commented 1 year ago

The auto generation of serializer and deserializer for nomal topic just works, so I feel it should just work too for dead letter topic.

cescoffier commented 1 year ago

I agree @Dieken. The discovery should work in this case. I think this should be considered as a bug.

For now, you can work around it by configuring the dead letter queue serializer.

Dieken commented 1 year ago

@ozangunalp got this exception after set mp.messaging.incoming.messages-in.dead-letter-queue.value.serializer=io.quarkus.kafka.client.serialization.ObjectMapperSerializer

2023-07-27 08:27:00,238 ERROR [org.apa.kaf.cli.pro.int.Sender] (kafka-producer-network-thread | kafka-dead-letter-topic-producer-kafka-consumer-messages-in-1) [Producer clientId=kafka-dead-letter-topic-producer-kafka-consumer-messages-in-1] Uncaught error in kafka producer I/O thread: : java.lang.NullPointerException: Cannot invoke "String.getBytes(java.nio.charset.Charset)" because "this.key" is null
        at org.apache.kafka.common.message.FindCoordinatorRequestData.addSize(FindCoordinatorRequestData.java:235)
        at org.apache.kafka.common.protocol.SendBuilder.buildSend(SendBuilder.java:218)
        at org.apache.kafka.common.protocol.SendBuilder.buildRequestSend(SendBuilder.java:187)
        at org.apache.kafka.common.requests.AbstractRequest.toSend(AbstractRequest.java:101)
        at org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:524)
        at org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:500)
        at org.apache.kafka.clients.NetworkClient.send(NetworkClient.java:460)
        at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:478)
        at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:316)
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243)
        at java.base/java.lang.Thread.run(Thread.java:833)

2023-07-27 08:27:00,238 ERROR [org.apa.kaf.cli.pro.int.Sender] (kafka-producer-network-thread | kafka-dead-letter-topic-producer-kafka-consumer-messages-in-0) [Producer clientId=kafka-dead-letter-topic-producer-kafka-consumer-messages-in-0] Uncaught error in kafka producer I/O thread: : java.lang.NullPointerException: Cannot invoke "String.getBytes(java.nio.charset.Charset)" because "this.key" is null
        at org.apache.kafka.common.message.FindCoordinatorRequestData.addSize(FindCoordinatorRequestData.java:235)
        at org.apache.kafka.common.protocol.SendBuilder.buildSend(SendBuilder.java:218)
        at org.apache.kafka.common.protocol.SendBuilder.buildRequestSend(SendBuilder.java:187)
        at org.apache.kafka.common.requests.AbstractRequest.toSend(AbstractRequest.java:101)
        at org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:524)
        at org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:500)
        at org.apache.kafka.clients.NetworkClient.send(NetworkClient.java:460)
        at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:478)
        at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:316)
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243)
        at java.base/java.lang.Thread.run(Thread.java:833)

2023-07-27 08:27:00,238 ERROR [org.apa.kaf.cli.pro.int.Sender] (kafka-producer-network-thread | kafka-dead-letter-topic-producer-kafka-consumer-messages-in-3) [Producer clientId=kafka-dead-letter-topic-producer-kafka-consumer-messages-in-3] Uncaught error in kafka producer I/O thread: : java.lang.NullPointerException: Cannot invoke "String.getBytes(java.nio.charset.Charset)" because "this.key" is null
        at org.apache.kafka.common.message.FindCoordinatorRequestData.addSize(FindCoordinatorRequestData.java:235)
        at org.apache.kafka.common.protocol.SendBuilder.buildSend(SendBuilder.java:218)
        at org.apache.kafka.common.protocol.SendBuilder.buildRequestSend(SendBuilder.java:187)
        at org.apache.kafka.common.requests.AbstractRequest.toSend(AbstractRequest.java:101)
        at org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:524)
        at org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:500)
        at org.apache.kafka.clients.NetworkClient.send(NetworkClient.java:460)
        at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:478)
        at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:316)
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243)
        at java.base/java.lang.Thread.run(Thread.java:833)

Related config:

mp.messaging.outgoing.messages-out.connector=smallrye-kafka
mp.messaging.outgoing.messages-out.topic=messages
mp.messaging.outgoing.messages-out.transactional.id=messages-${quarkus.uuid}

mp.messaging.incoming.messages-in.auto.offset.reset=latest
mp.messaging.incoming.messages-in.connector=smallrye-kafka
mp.messaging.incoming.messages-in.topic=messages
mp.messaging.incoming.messages-in.isolation.level=read_committed
mp.messaging.incoming.messages-in.partitions=4
mp.messaging.incoming.messages-in.failure-strategy=dead-letter-queue
mp.messaging.incoming.messages-in.dead-letter-queue.value.serializer=io.quarkus.kafka.client.serialization.ObjectMapperSerializer

kafka.group.id=engine
kafka.partitioner.class=my.KafkaConsistentHashPartitioner

KafkaConsistentHashPartitioner.java:

import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import java.util.Map;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

public class KafkaConsistentHashPartitioner implements Partitioner {

    private HashFunction hasher = Hashing.sipHash24();

    @Override
    public int partition(java.lang.String topic, Object key, byte[] keyBytes,
            Object value, byte[] valueBytes, Cluster cluster) {
        int partitions = cluster.partitionCountForTopic(topic);
        return Hashing.consistentHash(hasher.hashBytes(keyBytes), partitions);
    }

    @Override
    public void close() {
    }

    @Override
    public void configure(Map<String, ?> map) {
    }
}

Environment:

  1. Quarkus 3.2.2.Final
  2. JDK 17
  3. macOS
Dieken commented 1 year ago

I agree @Dieken. The discovery should work in this case. I think this should be considered as a bug.

For now, you can work around it by configuring the dead letter queue serializer.

Yes, it would be better if it just works without explicit configuration of dead letter queue serializer.

ozangunalp commented 1 year ago

I've never seen this exception before. It may be related to your authentication method to the Kafka broker.

Dieken commented 1 year ago

I use Kafka dev container. The exception can be easily recurred, just update the application.properties in the code-with-quarkus.tar.gz attachment above:

mp.messaging.outgoing.words-out.topic=words
#mp.messaging.outgoing.words-out.transactional.id=words-${quarkus.uuid}

mp.messaging.incoming.words-in.topic=words
mp.messaging.incoming.words-in.auto.offset.reset=earliest
mp.messaging.incoming.words-in.failure-strategy=dead-letter-queue
mp.messaging.incoming.words-in.dead-letter-queue.value.serializer=io.quarkus.kafka.client.serialization.ObjectMapperSerializer
mp.messaging.incoming.words-in.isolation.level=read_committed
mp.messaging.incoming.words-in.partitions=4

kafka.group.id=engine

The NPE is caused by partitions=4.

Dieken commented 1 year ago

Maybe it's an issue of Kafka client, it reports warnings or errors if the topic doesn't exist. After the topic is created, the warnings or errors disappear.

With Confluent-7.0.4:

2023-07-27 20:52:21,833 WARN  [org.apa.kaf.cli.NetworkClient] (smallrye-kafka-consumer-thread-0) [Consumer clientId=kafka-consumer-words-in-0, groupId=engine] Error while fetching metadata with correlation id 2 : {words=UNKNOWN_TOPIC_OR_PARTITION}
2023-07-27 20:52:21,833 WARN  [org.apa.kaf.cli.NetworkClient] (smallrye-kafka-consumer-thread-1) [Consumer clientId=kafka-consumer-words-in-1, groupId=engine] Error while fetching metadata with correlation id 2 : {words=UNKNOWN_TOPIC_OR_PARTITION}
2023-07-27 20:52:21,833 WARN  [org.apa.kaf.cli.NetworkClient] (smallrye-kafka-consumer-thread-3) [Consumer clientId=kafka-consumer-words-in-3, groupId=engine] Error while fetching metadata with correlation id 2 : {words=UNKNOWN_TOPIC_OR_PARTITION}
2023-07-27 20:52:21,833 WARN  [org.apa.kaf.cli.NetworkClient] (kafka-producer-network-thread | kafka-producer-words-out) [Producer clientId=kafka-producer-words-out] Error while fetching metadata with correlation id 1 : {words=UNKNOWN_TOPIC_OR_PARTITION}
2023-07-27 20:52:21,837 WARN  [org.apa.kaf.cli.NetworkClient] (smallrye-kafka-consumer-thread-2) [Consumer clientId=kafka-consumer-words-in-2, groupId=engine] Error while fetching metadata with correlation id 2 : {words=LEADER_NOT_AVAILABLE}

With Redpanda-v22.3.4:

2023-07-27 20:51:14,167 ERROR [org.apa.kaf.cli.pro.int.Sender] (kafka-producer-network-thread | kafka-dead-letter-topic-producer-kafka-consumer-words-in-2) [Producer clientId=kafka-dead-letter-topic-producer-kafka-consumer-words-in-2] Uncaught error in kafka producer I/O thread:  [Error Occurred After Shutdown]: java.lang.NullPointerException: Cannot invoke "String.getBytes(java.nio.charset.Charset)" because "this.key" is null
        at org.apache.kafka.common.message.FindCoordinatorRequestData.addSize(FindCoordinatorRequestData.java:235)
        at org.apache.kafka.common.protocol.SendBuilder.buildSend(SendBuilder.java:218)
        at org.apache.kafka.common.protocol.SendBuilder.buildRequestSend(SendBuilder.java:187)
        at org.apache.kafka.common.requests.AbstractRequest.toSend(AbstractRequest.java:101)
        at org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:524)
        at org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:500)
        at org.apache.kafka.clients.NetworkClient.send(NetworkClient.java:460)
        at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:478)
        at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:316)
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243)
        at java.base/java.lang.Thread.run(Thread.java:833)

2023-07-27 20:51:14,168 ERROR [org.apa.kaf.cli.pro.int.Sender] (kafka-producer-network-thread | kafka-dead-letter-topic-producer-kafka-consumer-words-in-3) [Producer clientId=kafka-dead-letter-topic-producer-kafka-consumer-words-in-3] Uncaught error in kafka producer I/O thread:  [Error Occurred After Shutdown]: java.lang.NullPointerException: Cannot invoke "String.getBytes(java.nio.charset.Charset)" because "this.key" is null
        at org.apache.kafka.common.message.FindCoordinatorRequestData.addSize(FindCoordinatorRequestData.java:235)
        at org.apache.kafka.common.protocol.SendBuilder.buildSend(SendBuilder.java:218)
        at org.apache.kafka.common.protocol.SendBuilder.buildRequestSend(SendBuilder.java:187)
        at org.apache.kafka.common.requests.AbstractRequest.toSend(AbstractRequest.java:101)
        at org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:524)
        at org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:500)
        at org.apache.kafka.clients.NetworkClient.send(NetworkClient.java:460)
        at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:478)
        at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:316)
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243)
        at java.base/java.lang.Thread.run(Thread.java:833)

2023-07-27 20:51:14,167 ERROR [org.apa.kaf.cli.pro.int.Sender] (kafka-producer-network-thread | kafka-dead-letter-topic-producer-kafka-consumer-words-in-1) [Producer clientId=kafka-dead-letter-topic-producer-kafka-consumer-words-in-1] Uncaught error in kafka producer I/O thread:  [Error Occurred After Shutdown]: java.lang.NullPointerException: Cannot invoke "String.getBytes(java.nio.charset.Charset)" because "this.key" is null
        at org.apache.kafka.common.message.FindCoordinatorRequestData.addSize(FindCoordinatorRequestData.java:235)
        at org.apache.kafka.common.protocol.SendBuilder.buildSend(SendBuilder.java:218)
        at org.apache.kafka.common.protocol.SendBuilder.buildRequestSend(SendBuilder.java:187)
        at org.apache.kafka.common.requests.AbstractRequest.toSend(AbstractRequest.java:101)
        at org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:524)
        at org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:500)
        at org.apache.kafka.clients.NetworkClient.send(NetworkClient.java:460)
        at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:478)
        at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:316)
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243)
        at java.base/java.lang.Thread.run(Thread.java:833)

2023-07-27 20:51:14,597 INFO  [io.sma.rea.mes.kafka] (smallrye-kafka-consumer-thread-1) SRMSG18224: Executing consumer revoked re-balance listener for group 'engine'
2023-07-27 20:51:14,596 INFO  [io.sma.rea.mes.kafka] (vert.x-eventloop-thread-1) SRMSG18256: Initialize record store for topic-partition 'words-0' at position -1.
ozangunalp commented 1 year ago

Idempotent producer support may be different in Redpanda than in Kafka. For the dev service have you tried creating topics before hand using https://quarkus.io/guides/kafka#configuring-kafka-topics ?

Dieken commented 1 year ago

Idempotent producer support may be different in Redpanda than in Kafka. For the dev service have you tried creating topics before hand using https://quarkus.io/guides/kafka#configuring-kafka-topics ?

quarkus.kafka.devservices.topic-partitions.messages=4

Tried this configuration, it doesn't help.

Dieken commented 1 year ago

I agree @Dieken. The discovery should work in this case. I think this should be considered as a bug.

For now, you can work around it by configuring the dead letter queue serializer.

@ozangunalp Although mp.messaging.incoming.words-in.dead-letter-queue.value.serializer=io.quarkus.kafka.client.serialization.ObjectMapperSerializer fixed the original ClassNotFoundException, ignoring the harmless NPE error above, it would be better if Quarkus just works without this explicit configuration.

cescoffier commented 1 year ago

I think that's the main issue: the DLQ serializer should be discovered automatically. The idempotent provider issue is a difference between redpanda and confluent kafka - not something we can fix here (you can use kafka native as dev service it should behave like the confluent one)

orlandokj commented 1 year ago

I had the same problem, looking at the quarkus logs on initialization I found this: 2023-08-24 08:25:03,263 INFO [io.qua.sma.dep.processor] (build-49) Generating Jackson deserializer for type ... It doesn't generate the serializer (at least doesn't log it) maybe because it is not needed for an incoming channel. Maybe a solution is to generate a serializer when there is a dead letter queue configured.

cescoffier commented 1 year ago

Yes, that's one of the thing that need to be added. When using a DLQ you need both sides

Dieken commented 1 year ago

I agree @Dieken. The discovery should work in this case. I think this should be considered as a bug. For now, you can work around it by configuring the dead letter queue serializer.

@ozangunalp Although mp.messaging.incoming.words-in.dead-letter-queue.value.serializer=io.quarkus.kafka.client.serialization.ObjectMapperSerializer fixed the original ClassNotFoundException, ignoring the harmless NPE error above, it would be better if Quarkus just works without this explicit configuration.

@ozangunalp @cescoffier Could Quarkus just work without this explicit configuration of mp.messaging.incoming.xxx.dead-letter-queue.value.serializer=io.quarkus.kafka.client.serialization.ObjectMapperSerializer?

cescoffier commented 1 year ago

That's the idea, but it's not implemented yet.

ozangunalp commented 1 year ago

@Dieken thanks for bumping this, I'd forgotten about it. https://github.com/quarkusio/quarkus/pull/36347 should resolve this.