quarkusio / quarkus

Quarkus: Supersonic Subatomic Java.
https://quarkus.io
Apache License 2.0
13.39k stars 2.56k forks source link

Add support for smallrye-reactive-messaging-camel #1766

Closed ddcprg closed 3 years ago

ddcprg commented 5 years ago

Currently it seems only smallrye-reactive-messaging-kafka is supported. Adding support for reactive Camel would be useful in my opinion. Is this is in the roadmap?

I'd would like to contribute this but honestly I don't know where to start.

cescoffier commented 5 years ago

The first step is to try with the upstream artifact and see if it works out of the box.

Then, we would need to depends on quarkus-camel instead of the upstream camel.

ddcprg commented 5 years ago

I've got a working example but as soon as I try to use quarkus-camel-core the application fails. I noticed that this artifact has a dependency on Camel 3 (which is still under development). I think this is causing the exception since smallrye-reactive-messaging-camel depends on previous version 2.23.1.

I'll try to clean up the code and push it a public repo in my account.

ddcprg commented 5 years ago

I have created this repo https://github.com/ddcprg/camel-stream-test which is in fact using quarkus-camel-core

At the moment I'm not using the Smallrye annotations to define the stream and instead I'm just creating the reactive stream programmatically with a Camel route subscriber.

This example uses Camel 3.0.0-M1 and I manage to get it working with smallrye-reactive-messaging-camel version 0.0.4 compiled to use this version of Camel. If I try to use a later version smallrye-reactive-messaging-camel (I think Quarkus depends on 0.0.5 for Kafka actually) then I get this exception on startup:

19:05:02,568 ERROR [io.qua.dev.DevModeMain] Failed to start quarkus: java.lang.RuntimeException: org.jboss.builder.BuildException: Build failure: Build failed due to errors
        [error]: Build step io.quarkus.arc.deployment.ArcAnnotationProcessor#build threw an exception: java.lang.IllegalStateException: Failed to index: io.smallrye.reactive.messaging.spi.IncomingConnectorFactory
        at io.quarkus.runner.RuntimeRunner.run(RuntimeRunner.java:134)
        at io.quarkus.dev.DevModeMain.doStart(DevModeMain.java:105)
        at io.quarkus.dev.DevModeMain.main(DevModeMain.java:66)
Caused by: org.jboss.builder.BuildException: Build failure: Build failed due to errors
        [error]: Build step io.quarkus.arc.deployment.ArcAnnotationProcessor#build threw an exception: java.lang.IllegalStateException: Failed to index: io.smallrye.reactive.messaging.spi.IncomingConnectorFactory
        at org.jboss.builder.Execution.run(Execution.java:123)
        at org.jboss.builder.BuildExecutionBuilder.execute(BuildExecutionBuilder.java:136)
        at io.quarkus.deployment.QuarkusAugmentor.run(QuarkusAugmentor.java:110)
        at io.quarkus.runner.RuntimeRunner.run(RuntimeRunner.java:99)
        ... 2 more
Caused by: java.lang.IllegalStateException: Failed to index: io.smallrye.reactive.messaging.spi.IncomingConnectorFactory
        at io.quarkus.arc.processor.BeanArchives.index(BeanArchives.java:233)
        at io.quarkus.arc.processor.BeanArchives$IndexWrapper.lambda$getClassByName$0(BeanArchives.java:105)
        at java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1660)
        at io.quarkus.arc.processor.BeanArchives$IndexWrapper.getClassByName(BeanArchives.java:102)
        at io.quarkus.arc.processor.Types.getTypeClosure(Types.java:216)
        at io.quarkus.arc.processor.Beans.createClassBean(Beans.java:55)
        at io.quarkus.arc.processor.BeanDeployment.findBeans(BeanDeployment.java:615)
        at io.quarkus.arc.processor.BeanDeployment.<init>(BeanDeployment.java:136)
        at io.quarkus.arc.processor.BeanProcessor.process(BeanProcessor.java:150)
        at io.quarkus.arc.deployment.ArcAnnotationProcessor.build(ArcAnnotationProcessor.java:237)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at io.quarkus.deployment.ExtensionLoader$1.execute(ExtensionLoader.java:506)
        at org.jboss.builder.BuildContext.run(BuildContext.java:413)
        at org.jboss.threads.ContextClassLoaderSavingRunnable.run(ContextClassLoaderSavingRunnable.java:35)
        at org.jboss.threads.EnhancedQueueExecutor.safeRun(EnhancedQueueExecutor.java:1998)
        at org.jboss.threads.EnhancedQueueExecutor$ThreadBody.doRunTask(EnhancedQueueExecutor.java:1525)
        at org.jboss.threads.EnhancedQueueExecutor$ThreadBody.run(EnhancedQueueExecutor.java:1416)
        at java.lang.Thread.run(Thread.java:748)
        at org.jboss.threads.JBossThread.run(JBossThread.java:479)
Caused by: java.io.IOException: Stream closed
        at java.io.BufferedInputStream.getInIfOpen(BufferedInputStream.java:159)
        at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
        at java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
        at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
        at java.io.DataInputStream.readFully(DataInputStream.java:195)
        at java.io.DataInputStream.readFully(DataInputStream.java:169)
        at org.jboss.jandex.Indexer.verifyMagic(Indexer.java:1288)
        at org.jboss.jandex.Indexer.index(Indexer.java:1587)
        at io.quarkus.arc.processor.BeanArchives.index(BeanArchives.java:231)
        ... 21 more

I'll try to dig a bit more into this issue later.

DuncanDoyle commented 4 years ago

This would be super useful for Kogito. In Kogito we generate SmallRye Reactive Messaging Consumers and Producers from BPMN2 message events (e.g. Start Message Event, End Message Event). Currently we mostly use the Kafka connector for our use-cases. Being able to use the Camel Connector would open loads of new integration possibilities for us!

cescoffier commented 4 years ago

The connector is using Camel-CDI which does not seem to be available in quarkus-camel. @lburgazzoli can you confirm?

cescoffier commented 4 years ago

After discussing it with @lburgazzoli, we should implement the extension in camel quarkus.

lburgazzoli commented 4 years ago

Opened an issue on camel-quarkus: https://github.com/apache/camel-quarkus/issues/1469

cescoffier commented 3 years ago

Done and documented: https://camel.apache.org/camel-quarkus/latest/reference/extensions/smallrye-reactive-messaging.html

@lburgazzoli please thank the contributor for me!