smallrye / smallrye-reactive-messaging

SmallRye Reactive Messaging
http://www.smallrye.io/smallrye-reactive-messaging/
Apache License 2.0
235 stars 176 forks source link

Unable to use Kafka interceptors while using DLQ #862

Closed loicmathieu closed 3 years ago

loicmathieu commented 3 years ago

If we configure a channel with a Kafka interceptor and with a DLQ, the DLQ producer fails to create with java.lang.ClassCastException: class io.opentracing.contrib.kafka.TracingConsumerInterceptor.

2020-11-09 14:45:48,030 ERROR [io.sma.rea.mes.provider] (Quarkus Main Thread) SRMSG00230: Unable to create the publisher or subscriber during initialization: org.apache.kafka.common.KafkaException: Failed to construct kafka producer
        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:434)
        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:270)
        at io.vertx.kafka.client.producer.impl.KafkaWriteStreamImpl.create(KafkaWriteStreamImpl.java:52)
        at io.vertx.kafka.client.producer.KafkaWriteStream.create(KafkaWriteStream.java:92)
        at io.vertx.kafka.client.producer.KafkaProducer.create(KafkaProducer.java:149)
        at io.vertx.mutiny.kafka.client.producer.KafkaProducer.create(KafkaProducer.java:142)
        at io.smallrye.reactive.messaging.kafka.fault.KafkaDeadLetterQueue.create(KafkaDeadLetterQueue.java:52)
        at io.smallrye.reactive.messaging.kafka.impl.KafkaSource.createFailureHandler(KafkaSource.java:358)
        at io.smallrye.reactive.messaging.kafka.impl.KafkaSource.<init>(KafkaSource.java:217)
        at io.smallrye.reactive.messaging.kafka.KafkaConnector.getPublisherBuilder(KafkaConnector.java:150)
        at io.smallrye.reactive.messaging.kafka.KafkaConnector_ClientProxy.getPublisherBuilder(KafkaConnector_ClientProxy.zig:305)
        at io.smallrye.reactive.messaging.impl.ConfiguredChannelFactory.createPublisherBuilder(ConfiguredChannelFactory.java:188)
        at io.smallrye.reactive.messaging.impl.ConfiguredChannelFactory.register(ConfiguredChannelFactory.java:153)
        at io.smallrye.reactive.messaging.impl.ConfiguredChannelFactory.initialize(ConfiguredChannelFactory.java:125)
        at io.smallrye.reactive.messaging.impl.ConfiguredChannelFactory_ClientProxy.initialize(ConfiguredChannelFactory_ClientProxy.zig:217)
        at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133)
        at java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
        at java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:658)
        at io.smallrye.reactive.messaging.extension.MediatorManager.initializeAndRun(MediatorManager.java:156)
        at io.smallrye.reactive.messaging.extension.MediatorManager_ClientProxy.initializeAndRun(MediatorManager_ClientProxy.zig:277)
        at io.quarkus.smallrye.reactivemessaging.runtime.SmallRyeReactiveMessagingLifecycle.onApplicationStart(SmallRyeReactiveMessagingLifecycle.java:20)
        at io.quarkus.smallrye.reactivemessaging.runtime.SmallRyeReactiveMessagingLifecycle_Observer_onApplicationStart_4e8937813d9e8faff65c3c07f88fa96615b70e70.notify(SmallRyeReactiveMessagingLifecycle_Observer_onApplicationStart_4e8937813d9e8faff65c3c07f88fa96615b70e70.zig:111)
        at io.quarkus.arc.impl.EventImpl$Notifier.notifyObservers(EventImpl.java:282)
        at io.quarkus.arc.impl.EventImpl$Notifier.notify(EventImpl.java:267)
        at io.quarkus.arc.impl.EventImpl.fire(EventImpl.java:69)
        at io.quarkus.arc.runtime.LifecycleEventRunner.fireStartupEvent(LifecycleEventRunner.java:23)
        at io.quarkus.arc.runtime.ArcRecorder.handleLifecycleEvents(ArcRecorder.java:60)
        at io.quarkus.deployment.steps.LifecycleEventsBuildStep$startupEvent-858218658.deploy_0(LifecycleEventsBuildStep$startupEvent-858218658.zig:81)
        at io.quarkus.deployment.steps.LifecycleEventsBuildStep$startupEvent-858218658.deploy(LifecycleEventsBuildStep$startupEvent-858218658.zig:40)
        at io.quarkus.runner.ApplicationImpl.doStart(ApplicationImpl.zig:488)
        at io.quarkus.runtime.Application.start(Application.java:90)
        at io.quarkus.runtime.ApplicationLifecycleManager.run(ApplicationLifecycleManager.java:95)
        at io.quarkus.runtime.Quarkus.run(Quarkus.java:62)
        at io.quarkus.runtime.Quarkus.run(Quarkus.java:38)
        at io.quarkus.runtime.Quarkus.run(Quarkus.java:104)
        at io.quarkus.runner.GeneratedMain.main(GeneratedMain.zig:29)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:64)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:564)
        at io.quarkus.runner.bootstrap.StartupActionImpl$3.run(StartupActionImpl.java:134)
        at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: java.lang.ClassCastException: class io.opentracing.contrib.kafka.TracingConsumerInterceptor
        at java.base/java.lang.Class.asSubclass(Class.java:3851)
        at org.apache.kafka.common.utils.Utils.loadClass(Utils.java:335)
        at org.apache.kafka.common.utils.Utils.newInstance(Utils.java:324)
        at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:365)
        at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:436)
        at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:417)
        at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:404)
        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:378)
        ... 41 more

Reproducer: kafka-interceptor-issue.zip

cescoffier commented 3 years ago

Hum... we should drop the interceptor from the DLQ sender config.