Apicurio / apicurio-registry

An API/Schema registry - stores APIs and Schemas.
https://www.apicur.io/registry/
Apache License 2.0
558 stars 250 forks source link

Cannot create custom artifact and schema resolvers from Quarkus based camel-kafka application #4514

Open shuawest opened 2 years ago

shuawest commented 2 years ago

A Quarkus based camel-kafka application runs into classloader errors when trying to specify custom schema and artifact resolvers.

The expected behavior is having classloading handled by the client for quarkus based applications, or a documented set of constraints, or an example for using Quarkus + Camel-Kafka + Apicurio.

[1] Example quarkus camel-kafka application: https://github.com/shuawest/apicurio-sandbox/tree/main/streaming-app/src/main/java

[2] Example kafka routes with apicurio configured for custom resolvers:

String kdestA = KafkaUriBuilder.create(ProducerService.TOPIC_A)
            .appendProperty("brokers", "{{aregsandbox.kafka.brokers}}")
            .appendProperty("clientId", "producerA")
            .appendProperty("valueSerializer", MyProtobufKafkaSerializer.class.getName())
            .appendProperty("maxRequestSize", "5242880")
            .appendAdditional(SerdeConfig.REGISTRY_URL, "{{registryurl}}")
            .appendAdditional(SerdeConfig.SCHEMA_RESOLVER, MySchemaResolver.class.getName())
            .appendAdditional(SerdeConfig.ARTIFACT_RESOLVER_STRATEGY, MyArtifactResolverStrategy.class.getName())
            .appendAdditional(SerdeConfig.EXPLICIT_ARTIFACT_GROUP_ID, "aregsandbox")
            .appendAdditional(SerdeConfig.EXPLICIT_ARTIFACT_ID, "samplea")
            .appendAdditional(SerdeConfig.FIND_LATEST_ARTIFACT, "true")
            //.appendAdditional(SerdeConfig.CHECK_PERIOD_MS, "60000")
            .value();

        String kdestB = KafkaUriBuilder.create(ProducerService.TOPIC_B)
            .appendProperty("brokers", "{{aregsandbox.kafka.brokers}}")
            .appendProperty("clientId", "producerB")
            .appendProperty("valueSerializer", MyProtobufKafkaSerializer.class.getName())
            .appendProperty("maxRequestSize", "5242880")
            .appendAdditional(SerdeConfig.REGISTRY_URL, "{{registryurl}}")
            .appendAdditional(SerdeConfig.SCHEMA_RESOLVER, MySchemaResolver.class.getName())
            .appendAdditional(SerdeConfig.ARTIFACT_RESOLVER_STRATEGY, MyArtifactResolverStrategy.class.getName())
            .appendAdditional(SerdeConfig.EXPLICIT_ARTIFACT_GROUP_ID, "aregsandbox")
            .appendAdditional(SerdeConfig.EXPLICIT_ARTIFACT_ID, "sampleb")
            .appendAdditional(SerdeConfig.FIND_LATEST_ARTIFACT, "true")
            //.appendAdditional(SerdeConfig.CHECK_PERIOD_MS, "60000")
            .value();

        log.info("Kafka connection A: {}\n\n\n", kdestA);
        log.info("Kafka connection B: {}\n\n\n", kdestB);

        from("timer:producerTimerA?repeatCount=1000&delay=1s&period=1s")
            .bean(this, "genA")
            //.log("producer timer fired ${headers.genCount}:\n${body}");
            .to(kdestA);

        from("timer:producerTimerB?repeatCount=1000&delay=1s&period=1s")
            .bean(this, "genB")
            //.log("producer timer fired ${headers.genCount}:\n${body}");
            .to(kdestB);

[3] Classloading exception:

2021-07-14 16:55:15,979 ERROR [org.apa.cam.imp.eng.AbstractCamelContext] (Quarkus Main Thread) Error starting CamelContext (camel-1) due to exception thrown: Failed to start route route3 because of null: org.apache.camel.FailedToStartRouteException: Failed to start route route3 because of null
    at org.apache.camel.impl.engine.RouteService.warmUp(RouteService.java:123)
    at org.apache.camel.impl.engine.InternalRouteStartupManager.doWarmUpRoutes(InternalRouteStartupManager.java:306)
    at org.apache.camel.impl.engine.InternalRouteStartupManager.safelyStartRouteServices(InternalRouteStartupManager.java:189)
    at org.apache.camel.impl.engine.InternalRouteStartupManager.doStartOrResumeRoutes(InternalRouteStartupManager.java:147)
    at org.apache.camel.impl.engine.AbstractCamelContext.doStartCamel(AbstractCamelContext.java:3150)
    at org.apache.camel.impl.engine.AbstractCamelContext.doStartContext(AbstractCamelContext.java:2846)
    at org.apache.camel.impl.engine.AbstractCamelContext.doStart(AbstractCamelContext.java:2797)
    at org.apache.camel.support.service.BaseService.start(BaseService.java:115)
    at org.apache.camel.impl.engine.AbstractCamelContext.start(AbstractCamelContext.java:2492)
    at org.apache.camel.quarkus.core.CamelContextRuntime.start(CamelContextRuntime.java:57)
    at org.apache.camel.quarkus.core.CamelBootstrapRecorder.start(CamelBootstrapRecorder.java:45)
    at io.quarkus.deployment.steps.CamelBootstrapProcessor$boot-173480958.deploy_0(CamelBootstrapProcessor$boot-173480958.zig:101)
    at io.quarkus.deployment.steps.CamelBootstrapProcessor$boot-173480958.deploy(CamelBootstrapProcessor$boot-173480958.zig:40)
    at io.quarkus.runner.ApplicationImpl.doStart(ApplicationImpl.zig:806)
    at io.quarkus.runtime.Application.start(Application.java:90)
    at io.quarkus.runtime.ApplicationLifecycleManager.run(ApplicationLifecycleManager.java:100)
    at io.quarkus.runtime.Quarkus.run(Quarkus.java:66)
    at io.quarkus.runtime.Quarkus.run(Quarkus.java:42)
    at io.quarkus.runtime.Quarkus.run(Quarkus.java:119)
    at io.quarkus.runner.GeneratedMain.main(GeneratedMain.zig:29)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at io.quarkus.runner.bootstrap.StartupActionImpl$3.run(StartupActionImpl.java:134)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.camel.RuntimeCamelException: org.apache.kafka.common.KafkaException: Failed to construct kafka producer
    at org.apache.camel.RuntimeCamelException.wrapRuntimeCamelException(RuntimeCamelException.java:51)
    at org.apache.camel.support.ChildServiceSupport.start(ChildServiceSupport.java:67)
    at org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:84)
    at org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:101)
    at org.apache.camel.impl.engine.DefaultChannel.doStart(DefaultChannel.java:126)
    at org.apache.camel.support.service.BaseService.start(BaseService.java:115)
    at org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:84)
    at org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:87)
    at org.apache.camel.processor.Pipeline.doStart(Pipeline.java:154)
    at org.apache.camel.support.service.BaseService.start(BaseService.java:115)
    at org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:84)
    at org.apache.camel.support.processor.DelegateAsyncProcessor.doStart(DelegateAsyncProcessor.java:79)
    at org.apache.camel.support.service.BaseService.start(BaseService.java:115)
    at org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:84)
    at org.apache.camel.impl.engine.RouteService.startChildServices(RouteService.java:398)
    at org.apache.camel.impl.engine.RouteService.doWarmUp(RouteService.java:195)
    at org.apache.camel.impl.engine.RouteService.warmUp(RouteService.java:121)
    ... 25 more
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka producer
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:434)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)
    at org.apache.camel.component.kafka.DefaultKafkaClientFactory.getProducer(DefaultKafkaClientFactory.java:29)
    at org.apache.camel.component.kafka.KafkaProducer.doStart(KafkaProducer.java:114)
    at org.apache.camel.support.service.BaseService.start(BaseService.java:115)
    at org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:84)
    at org.apache.camel.impl.engine.AbstractCamelContext.internalAddService(AbstractCamelContext.java:1467)
    at org.apache.camel.impl.engine.AbstractCamelContext.addService(AbstractCamelContext.java:1385)
    at org.apache.camel.processor.SendProcessor.doStart(SendProcessor.java:236)
    at org.apache.camel.support.service.BaseService.start(BaseService.java:115)
    at org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:84)
    at org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:101)
    at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler.doStart(RedeliveryErrorHandler.java:1487)
    at org.apache.camel.support.ChildServiceSupport.start(ChildServiceSupport.java:60)
    ... 40 more
Caused by: java.lang.RuntimeException: java.lang.ClassNotFoundException: io.apicurio.registry.serde.strategy.MySchemaResolver
    at io.apicurio.registry.serde.utils.Utils.loadClass(Utils.java:35)
    at io.apicurio.registry.serde.utils.Utils.instantiate(Utils.java:50)
    at io.apicurio.registry.serde.SchemaResolverConfigurer.configure(SchemaResolverConfigurer.java:72)
    at io.apicurio.registry.serde.AbstractKafkaSerDe.configure(AbstractKafkaSerDe.java:68)
    at io.apicurio.registry.serde.protobuf.ProtobufKafkaSerializer.configure(ProtobufKafkaSerializer.java:78)
    at io.apicurio.registry.serde.strategy.MyProtobufKafkaSerializer.configure(MyProtobufKafkaSerializer.java:49)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:369)
    ... 53 more
Caused by: java.lang.ClassNotFoundException: io.apicurio.registry.serde.strategy.MySchemaResolver
    at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
    at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
    at io.quarkus.bootstrap.classloading.QuarkusClassLoader.loadClass(QuarkusClassLoader.java:428)
    at io.quarkus.bootstrap.classloading.QuarkusClassLoader.loadClass(QuarkusClassLoader.java:378)
    at java.base/java.lang.Class.forName0(Native Method)
    at java.base/java.lang.Class.forName(Class.java:315)
    at io.apicurio.registry.serde.utils.Utils.loadClass(Utils.java:33)
    ... 59 more
apicurio-bot[bot] commented 3 months ago

Thank you for reporting an issue!

Pinging @carlesarnal to respond or triage.