quarkusio / quarkus

Quarkus: Supersonic Subatomic Java.
https://quarkus.io
Apache License 2.0
13.7k stars 2.66k forks source link

Confluent + confluent kafka-avro-serializer version 7, throw a KafkaException in native mode #21412

Closed pjgg closed 2 years ago

pjgg commented 2 years ago

Describe the bug

Quarkus version: 999-SNAPSHOT Reproducer: https://github.com/quarkus-qe/quarkus-test-suite And this serializer:

<dependency>
                <groupId>io.confluent</groupId>
                <artifactId>kafka-avro-serializer</artifactId>
                <version>7.0.0</version>
            </dependency>

cmd: mvn clean verify -Pmessaging-modules -Dnative -Dit.test=ConfluentKafkaAvroIT -pl messaging/kafka-avro-reactive-messaging

When I am running Kafka Confluent + AVRO test, I am getting the following exception:

17:33:20,328 INFO  [app] 17:33:17,305 SRMSG18228: A failure has been reported for Kafka topics '[stock-price]': org.apache.kafka.common.KafkaException: Could not find a public no-argument constructor for io.confluent.kafka.serializers.context.NullContextNameStrategy
17:33:20,328 INFO  [app]        at org.apache.kafka.common.utils.Utils.newInstance(Utils.java:391)
17:33:20,328 INFO  [app]        at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:395)
17:33:20,329 INFO  [app]        at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:430)
17:33:20,329 INFO  [app]        at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:415)
17:33:20,329 INFO  [app]        at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.contextNameStrategy(AbstractKafkaSchemaSerDeConfig.java:218)
17:33:20,329 INFO  [app]        at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.configureClientProperties(AbstractKafkaSchemaSerDe.java:90)
17:33:20,330 INFO  [app]        at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.configure(AbstractKafkaAvroDeserializer.java:59)
17:33:20,330 INFO  [app]        at io.confluent.kafka.serializers.KafkaAvroDeserializer.configure(KafkaAvroDeserializer.java:50)
17:33:20,330 INFO  [app]        at io.smallrye.reactive.messaging.kafka.fault.DeserializerWrapper.configure(DeserializerWrapper.java:82)
17:33:20,331 INFO  [app]        at io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaConsumer.<init>(ReactiveKafkaConsumer.java:79)
17:33:20,331 INFO  [app]        at io.smallrye.reactive.messaging.kafka.impl.KafkaSource.<init>(KafkaSource.java:93)
17:33:20,331 INFO  [app]        at io.smallrye.reactive.messaging.kafka.KafkaConnector.getPublisherBuilder(KafkaConnector.java:173)
17:33:20,332 INFO  [app]        at io.smallrye.reactive.messaging.kafka.KafkaConnector_ClientProxy.getPublisherBuilder(KafkaConnector_ClientProxy.zig:159)
17:33:20,332 INFO  [app]        at io.smallrye.reactive.messaging.impl.ConnectorFactories.lambda$wrap$4(ConnectorFactories.java:80)
17:33:20,332 INFO  [app]        at io.smallrye.reactive.messaging.impl.ConfiguredChannelFactory.createPublisher(ConfiguredChannelFactory.java:174)
17:33:20,333 INFO  [app]        at io.smallrye.reactive.messaging.impl.ConfiguredChannelFactory.register(ConfiguredChannelFactory.java:136)
17:33:20,333 INFO  [app]        at io.smallrye.reactive.messaging.impl.ConfiguredChannelFactory.initialize(ConfiguredChannelFactory.java:108)
17:33:20,333 INFO  [app]        at io.smallrye.reactive.messaging.impl.ConfiguredChannelFactory_ClientProxy.initialize(ConfiguredChannelFactory_ClientProxy.zig:189)
17:33:20,333 INFO  [app]        at java.util.Iterator.forEachRemaining(Iterator.java:133)
17:33:20,334 INFO  [app]        at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
17:33:20,334 INFO  [app]        at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:658)
17:33:20,334 INFO  [app]        at io.smallrye.reactive.messaging.extension.MediatorManager.start(MediatorManager.java:189)
17:33:20,334 INFO  [app]        at io.smallrye.reactive.messaging.extension.MediatorManager_ClientProxy.start(MediatorManager_ClientProxy.zig:220)
17:33:20,335 INFO  [app]        at io.quarkus.smallrye.reactivemessaging.runtime.SmallRyeReactiveMessagingLifecycle.onApplicationStart(SmallRyeReactiveMessagingLifecycle.java:41)
17:33:20,335 INFO  [app]        at io.quarkus.smallrye.reactivemessaging.runtime.SmallRyeReactiveMessagingLifecycle_Observer_onApplicationStart_4e8937813d9e8faff65c3c07f88fa96615b70e70.notify(SmallRyeReactiveMessagingLifecycle_Observer_onApplicationStart_4e8937813d9e8faff65c3c07f88fa96615b70e70.zig:111)
17:33:20,335 INFO  [app]        at io.quarkus.arc.impl.EventImpl$Notifier.notifyObservers(EventImpl.java:320)
17:33:20,335 INFO  [app]        at io.quarkus.arc.impl.EventImpl$Notifier.notify(EventImpl.java:302)
17:33:20,336 INFO  [app]        at io.quarkus.arc.impl.EventImpl.fire(EventImpl.java:73)
17:33:20,336 INFO  [app]        at io.quarkus.arc.runtime.ArcRecorder.fireLifecycleEvent(ArcRecorder.java:128)
17:33:20,336 INFO  [app]        at io.quarkus.arc.runtime.ArcRecorder.handleLifecycleEvents(ArcRecorder.java:97)
17:33:20,336 INFO  [app]        at io.quarkus.deployment.steps.LifecycleEventsBuildStep$startupEvent1144526294.deploy_0(LifecycleEventsBuildStep$startupEvent1144526294.zig:80)
17:33:20,337 INFO  [app]        at io.quarkus.deployment.steps.LifecycleEventsBuildStep$startupEvent1144526294.deploy(LifecycleEventsBuildStep$startupEvent1144526294.zig:40)
17:33:20,337 INFO  [app]        at io.quarkus.runner.ApplicationImpl.doStart(ApplicationImpl.zig:666)
17:33:20,337 INFO  [app]        at io.quarkus.runtime.Application.start(Application.java:101)
17:33:20,337 INFO  [app]        at io.quarkus.runtime.ApplicationLifecycleManager.run(ApplicationLifecycleManager.java:104)
17:33:20,337 INFO  [app]        at io.quarkus.runtime.Quarkus.run(Quarkus.java:67)
17:33:20,338 INFO  [app]        at io.quarkus.runtime.Quarkus.run(Quarkus.java:41)
17:33:20,338 INFO  [app]        at io.quarkus.runtime.Quarkus.run(Quarkus.java:120)
17:33:20,338 INFO  [app]        at io.quarkus.runner.GeneratedMain.main(GeneratedMain.zig:29)
17:33:20,338 INFO  [app] Caused by: java.lang.NoSuchMethodException: io.confluent.kafka.serializers.context.NullContextNameStrategy.<init>()
17:33:20,338 INFO  [app]        at java.lang.Class.getConstructor0(DynamicHub.java:3349)
17:33:20,339 INFO  [app]        at java.lang.Class.getDeclaredConstructor(DynamicHub.java:2553)
17:33:20,339 INFO  [app]        at org.apache.kafka.common.utils.Utils.newInstance(Utils.java:389)
17:33:20,339 INFO  [app]        ... 38 more
17:33:20,339 INFO  [app] 17:33:17,305 SRMSG00230: Unable to create the publisher or subscriber during initialization: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: Could not find a public no-argument constructor for io.confluent.kafka.serializers.context.NullContextNameStrategy
17:33:20,340 INFO  [app]        at io.smallrye.reactive.messaging.kafka.fault.DeserializerWrapper.configure(DeserializerWrapper.java:86)
17:33:20,340 INFO  [app]        at io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaConsumer.<init>(ReactiveKafkaConsumer.java:79)
17:33:20,340 INFO  [app]        at io.smallrye.reactive.messaging.kafka.impl.KafkaSource.<init>(KafkaSource.java:93)
17:33:20,340 INFO  [app]        at io.smallrye.reactive.messaging.kafka.KafkaConnector.getPublisherBuilder(KafkaConnector.java:173)
17:33:20,340 INFO  [app]        at io.smallrye.reactive.messaging.kafka.KafkaConnector_ClientProxy.getPublisherBuilder(KafkaConnector_ClientProxy.zig:159)
17:33:20,341 INFO  [app]        at io.smallrye.reactive.messaging.impl.ConnectorFactories.lambda$wrap$4(ConnectorFactories.java:80)
17:33:20,341 INFO  [app]        at io.smallrye.reactive.messaging.impl.ConfiguredChannelFactory.createPublisher(ConfiguredChannelFactory.java:174)
17:33:20,341 INFO  [app]        at io.smallrye.reactive.messaging.impl.ConfiguredChannelFactory.register(ConfiguredChannelFactory.java:136)
17:33:20,341 INFO  [app]        at io.smallrye.reactive.messaging.impl.ConfiguredChannelFactory.initialize(ConfiguredChannelFactory.java:108)
17:33:20,341 INFO  [app]        at io.smallrye.reactive.messaging.impl.ConfiguredChannelFactory_ClientProxy.initialize(ConfiguredChannelFactory_ClientProxy.zig:189)
17:33:20,342 INFO  [app]        at java.util.Iterator.forEachRemaining(Iterator.java:133)
17:33:20,342 INFO  [app]        at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
17:33:20,342 INFO  [app]        at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:658)
17:33:20,343 INFO  [app]        at io.smallrye.reactive.messaging.extension.MediatorManager.start(MediatorManager.java:189)
17:33:20,343 INFO  [app]        at io.smallrye.reactive.messaging.extension.MediatorManager_ClientProxy.start(MediatorManager_ClientProxy.zig:220)
17:33:20,343 INFO  [app]        at io.quarkus.smallrye.reactivemessaging.runtime.SmallRyeReactiveMessagingLifecycle.onApplicationStart(SmallRyeReactiveMessagingLifecycle.java:41)
17:33:20,343 INFO  [app]        at io.quarkus.smallrye.reactivemessaging.runtime.SmallRyeReactiveMessagingLifecycle_Observer_onApplicationStart_4e8937813d9e8faff65c3c07f88fa96615b70e70.notify(SmallRyeReactiveMessagingLifecycle_Observer_onApplicationStart_4e8937813d9e8faff65c3c07f88fa96615b70e70.zig:111)
17:33:20,343 INFO  [app]        at io.quarkus.arc.impl.EventImpl$Notifier.notifyObservers(EventImpl.java:320)
17:33:20,344 INFO  [app]        at io.quarkus.arc.impl.EventImpl$Notifier.notify(EventImpl.java:302)
17:33:20,344 INFO  [app]        at io.quarkus.arc.impl.EventImpl.fire(EventImpl.java:73)
17:33:20,344 INFO  [app]        at io.quarkus.arc.runtime.ArcRecorder.fireLifecycleEvent(ArcRecorder.java:128)
17:33:20,344 INFO  [app]        at io.quarkus.arc.runtime.ArcRecorder.handleLifecycleEvents(ArcRecorder.java:97)
17:33:20,344 INFO  [app]        at io.quarkus.deployment.steps.LifecycleEventsBuildStep$startupEvent1144526294.deploy_0(LifecycleEventsBuildStep$startupEvent1144526294.zig:80)
17:33:20,344 INFO  [app]        at io.quarkus.deployment.steps.LifecycleEventsBuildStep$startupEvent1144526294.deploy(LifecycleEventsBuildStep$startupEvent1144526294.zig:40)
17:33:20,345 INFO  [app]        at io.quarkus.runner.ApplicationImpl.doStart(ApplicationImpl.zig:666)
17:33:20,345 INFO  [app]        at io.quarkus.runtime.Application.start(Application.java:101)
17:33:20,345 INFO  [app]        at io.quarkus.runtime.ApplicationLifecycleManager.run(ApplicationLifecycleManager.java:104)
17:33:20,345 INFO  [app]        at io.quarkus.runtime.Quarkus.run(Quarkus.java:67)
17:33:20,345 INFO  [app]        at io.quarkus.runtime.Quarkus.run(Quarkus.java:41)
17:33:20,345 INFO  [app]        at io.quarkus.runtime.Quarkus.run(Quarkus.java:120)
17:33:20,347 INFO  [app]        at io.quarkus.runner.GeneratedMain.main(GeneratedMain.zig:29)
17:33:20,347 INFO  [app] Caused by: org.apache.kafka.common.KafkaException: Could not find a public no-argument constructor for io.confluent.kafka.serializers.context.NullContextNameStrategy
17:33:20,347 INFO  [app]        at org.apache.kafka.common.utils.Utils.newInstance(Utils.java:391)
17:33:20,347 INFO  [app]        at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:395)
17:33:20,347 INFO  [app]        at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:430)
17:33:20,348 INFO  [app]        at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:415)
17:33:20,348 INFO  [app]        at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.contextNameStrategy(AbstractKafkaSchemaSerDeConfig.java:218)
17:33:20,348 INFO  [app]        at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.configureClientProperties(AbstractKafkaSchemaSerDe.java:90)
17:33:20,348 INFO  [app]        at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.configure(AbstractKafkaAvroDeserializer.java:59)
17:33:20,348 INFO  [app]        at io.confluent.kafka.serializers.KafkaAvroDeserializer.configure(KafkaAvroDeserializer.java:50)
17:33:20,348 INFO  [app]        at io.smallrye.reactive.messaging.kafka.fault.DeserializerWrapper.configure(DeserializerWrapper.java:82)
17:33:20,349 INFO  [app]        ... 30 more
17:33:20,349 INFO  [app] Caused by: java.lang.NoSuchMethodException: io.confluent.kafka.serializers.context.NullContextNameStrategy.<init>()
17:33:20,349 INFO  [app]        at java.lang.Class.getConstructor0(DynamicHub.java:3349)
17:33:20,349 INFO  [app]        at java.lang.Class.getDeclaredConstructor(DynamicHub.java:2553)
17:33:20,349 INFO  [app]        at org.apache.kafka.common.utils.Utils.newInstance(Utils.java:389)
17:33:20,349 INFO  [app]        ... 38 more
17:33:20,349 INFO  [app] 17:33:17,306 Failed to start application (with profile prod): java.lang.NoSuchMethodException: io.confluent.kafka.serializers.context.NullContextNameStrategy.<init>()
17:33:20,350 INFO  [app]        at java.lang.Class.getConstructor0(DynamicHub.java:3349)
17:33:20,350 INFO  [app]        at java.lang.Class.getDeclaredConstructor(DynamicHub.java:2553)
17:33:20,350 INFO  [app]        at org.apache.kafka.common.utils.Utils.newInstance(Utils.java:389)
17:33:20,350 INFO  [app]        at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:395)
17:33:20,350 INFO  [app]        at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:430)
17:33:20,350 INFO  [app]        at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:415)
17:33:20,350 INFO  [app]        at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.contextNameStrategy(AbstractKafkaSchemaSerDeConfig.java:218)
17:33:20,350 INFO  [app]        at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.configureClientProperties(AbstractKafkaSchemaSerDe.java:90)
17:33:20,350 INFO  [app]        at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.configure(AbstractKafkaAvroDeserializer.java:59)
17:33:20,351 INFO  [app]        at io.confluent.kafka.serializers.KafkaAvroDeserializer.configure(KafkaAvroDeserializer.java:50)
17:33:20,351 INFO  [app]        at io.smallrye.reactive.messaging.kafka.fault.DeserializerWrapper.configure(DeserializerWrapper.java:82)
17:33:20,351 INFO  [app]        at io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaConsumer.<init>(ReactiveKafkaConsumer.java:79)
17:33:20,351 INFO  [app]        at io.smallrye.reactive.messaging.kafka.impl.KafkaSource.<init>(KafkaSource.java:93)
17:33:20,351 INFO  [app]        at io.smallrye.reactive.messaging.kafka.KafkaConnector.getPublisherBuilder(KafkaConnector.java:173)
17:33:20,351 INFO  [app]        at io.smallrye.reactive.messaging.kafka.KafkaConnector_ClientProxy.getPublisherBuilder(KafkaConnector_ClientProxy.zig:159)
17:33:20,351 INFO  [app]        at io.smallrye.reactive.messaging.impl.ConnectorFactories.lambda$wrap$4(ConnectorFactories.java:80)
17:33:20,351 INFO  [app]        at io.smallrye.reactive.messaging.impl.ConfiguredChannelFactory.createPublisher(ConfiguredChannelFactory.java:174)
17:33:20,352 INFO  [app]        at io.smallrye.reactive.messaging.impl.ConfiguredChannelFactory.register(ConfiguredChannelFactory.java:136)
17:33:20,352 INFO  [app]        at io.smallrye.reactive.messaging.impl.ConfiguredChannelFactory.initialize(ConfiguredChannelFactory.java:108)
17:33:20,352 INFO  [app]        at io.smallrye.reactive.messaging.impl.ConfiguredChannelFactory_ClientProxy.initialize(ConfiguredChannelFactory_ClientProxy.zig:189)
17:33:20,352 INFO  [app]        at java.util.Iterator.forEachRemaining(Iterator.java:133)
17:33:20,352 INFO  [app]        at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
17:33:20,352 INFO  [app]        at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:658)
17:33:20,352 INFO  [app]        at io.smallrye.reactive.messaging.extension.MediatorManager.start(MediatorManager.java:189)
17:33:20,352 INFO  [app]        at io.smallrye.reactive.messaging.extension.MediatorManager_ClientProxy.start(MediatorManager_ClientProxy.zig:220)
17:33:20,353 INFO  [app]        at io.quarkus.smallrye.reactivemessaging.runtime.SmallRyeReactiveMessagingLifecycle.onApplicationStart(SmallRyeReactiveMessagingLifecycle.java:41)
17:33:20,353 INFO  [app]        at io.quarkus.smallrye.reactivemessaging.runtime.SmallRyeReactiveMessagingLifecycle_Observer_onApplicationStart_4e8937813d9e8faff65c3c07f88fa96615b70e70.notify(SmallRyeReactiveMessagingLifecycle_Observer_onApplicationStart_4e8937813d9e8faff65c3c07f88fa96615b70e70.zig:111)
17:33:20,353 INFO  [app]        at io.quarkus.arc.impl.EventImpl$Notifier.notifyObservers(EventImpl.java:320)
17:33:20,353 INFO  [app]        at io.quarkus.arc.impl.EventImpl$Notifier.notify(EventImpl.java:302)
17:33:20,353 INFO  [app]        at io.quarkus.arc.impl.EventImpl.fire(EventImpl.java:73)
17:33:20,353 INFO  [app]        at io.quarkus.arc.runtime.ArcRecorder.fireLifecycleEvent(ArcRecorder.java:128)
17:33:20,353 INFO  [app]        at io.quarkus.arc.runtime.ArcRecorder.handleLifecycleEvents(ArcRecorder.java:97)
17:33:20,353 INFO  [app]        at io.quarkus.deployment.steps.LifecycleEventsBuildStep$startupEvent1144526294.deploy_0(LifecycleEventsBuildStep$startupEvent1144526294.zig:80)
17:33:20,353 INFO  [app]        at io.quarkus.deployment.steps.LifecycleEventsBuildStep$startupEvent1144526294.deploy(LifecycleEventsBuildStep$startupEvent1144526294.zig:40)
17:33:20,354 INFO  [app]        at io.quarkus.runner.ApplicationImpl.doStart(ApplicationImpl.zig:666)
17:33:20,354 INFO  [app]        at io.quarkus.runtime.Application.start(Application.java:101)
17:33:20,354 INFO  [app]        at io.quarkus.runtime.ApplicationLifecycleManager.run(ApplicationLifecycleManager.java:104)
17:33:20,354 INFO  [app]        at io.quarkus.runtime.Quarkus.run(Quarkus.java:67)
17:33:20,354 INFO  [app]        at io.quarkus.runtime.Quarkus.run(Quarkus.java:41)
17:33:20,354 INFO  [app]        at io.quarkus.runtime.Quarkus.run(Quarkus.java:120)
17:33:20,354 INFO  [app]        at io.quarkus.runner.GeneratedMain.main(GeneratedMain.zig:29)

Note: if you are going to reproduce it following the above steps double-check that the scenario is not disabled for native mode. Remove @DisabledOnNative on the following class

Works as expected in JVM mode. Also, the same test works in Strimzi in JVM and native mode. This scenario works with io.confluent:kafka-avro-serializer:6.2.1

Expected behavior

Same behavior in JVM/Native in Strimzi/Confluent

Actual behavior

Confluent native mode doesn't work with io.confluent:kafka-avro-serializer:7.0.0

How to Reproduce?

No response

Output of uname -a or ver

No response

Output of java -version

No response

GraalVM version (if different from Java)

No response

Quarkus version or git rev

No response

Build tool (ie. output of mvnw --version or gradlew --version)

No response

Additional information

No response

quarkus-bot[bot] commented 2 years ago

/cc @cescoffier, @ozangunalp

ozangunalp commented 2 years ago

Kafka extension's kafka-avro-serializer support has only been tested on version 6.1.1 With the latest version of confluent schema-registry we need to register for reflection the class io.confluent.kafka.serializers.context.NullContextNameStrategy and maybe others.

pjgg commented 2 years ago

Kafka-avro-serializers 6.2.1 is also covered by this testsuite

But yes definitely I think that we should also support in native mode the newest confluent serializer library. Please let me know if you need some support in order to move on this issue.

cescoffier commented 2 years ago

I should be able to fix it. Is there a way to extract a reproducer from your test suite? It would be great to have something in Quarkus itself.

ozangunalp commented 2 years ago

@cescoffier Changing https://github.com/quarkusio/quarkus/blob/main/integration-tests/kafka-avro-apicurio2/pom.xml#L68 to 7.0.0 should reproduce the issue in the test with native compilation.