quarkusio / quarkus

Quarkus: Supersonic Subatomic Java.
https://quarkus.io
Apache License 2.0
13.87k stars 2.71k forks source link

App crash on first message when using rabbitmq (Smallrye reactive messaging) with Kogito #23071

Closed zakhdar closed 2 years ago

zakhdar commented 2 years ago

Describe the bug

I have created a small application to test the public api of Kogito, in this application I also use rabbitmq (Smallrye reactive messaging extension).

When my application consumes the first message (rabbitmq), the application restarts, the following messages do not pose any problem.

After several searches when I remove the dependencies of Kogito, the application works well, as soon as I add the dependencies of Kogito I have again the bug (even if there are no kogito resources like bpmn drl .... in my project) the simple fact of putting the dependencies in my pom creates this bug.

Bean consuming message

`@ApplicationScoped public class MessageService {

@Incoming("organisation")
public CompletionStage<Void> process(Message<String> message) {

    System.out.println("Message received "+message.getPayload());
    return message.ack();
}

}`

application.properties

%dev.kogito.generate.rest.processes=false
%dev.kogito.generate.rest = false

%dev.quarkus.log.level=DEBUG
%dev.quarkus.log.category."org.kie.kogito".min-level=DEBUG
%dev.quarkus.log.category."org.kie.kogito".level=DEBUG

%dev.rabbitmq-host=**********************
%dev.rabbitmq-port=5672
%dev.rabbitmq-username=***********
%dev.rabbitmq-password=*****************
%dev.rabbitmq-virtual-host=******************

%dev.mp.messaging.incoming.organisation.connector=smallrye-rabbitmq
%dev.mp.messaging.incoming.organisation.queue.name=organisation-filems
%dev.mp.messaging.incoming.organisation.auto-acknowledgement=true
%dev.mp.messaging.incoming.organisation.exchange.name=organisation
%dev.mp.messaging.incoming.organisation.exchange.type=fanout
%dev.mp.messaging.incoming.organisation.routing-keys=creation  

Pom.xml

<dependencies>
        <dependency>
            <groupId>org.kie.kogito</groupId>
            <artifactId>kogito-quarkus</artifactId>
        </dependency>
        <dependency>
            <groupId>org.kie.kogito</groupId>
            <artifactId>kogito-addons-quarkus-messaging</artifactId>
            <version>1.15.0.Final</version>
        </dependency>
        <dependency>
            <groupId>io.quarkus</groupId>
            <artifactId>quarkus-resteasy-reactive</artifactId>
        </dependency>
        <dependency>
            <groupId>io.quarkus</groupId>
            <artifactId>quarkus-resteasy-reactive-jackson</artifactId>
        </dependency>
        <dependency>
            <groupId>io.quarkus</groupId>
            <artifactId>quarkus-smallrye-reactive-messaging-rabbitmq</artifactId>
        </dependency>
        <dependency>
            <groupId>io.quarkus</groupId>
            <artifactId>quarkus-smallrye-reactive-messaging</artifactId>
        </dependency>
        <dependency>
            <groupId>io.quarkus</groupId>
            <artifactId>quarkus-undertow</artifactId>
        </dependency>
        <dependency>
            <groupId>io.quarkus</groupId>
            <artifactId>quarkus-arc</artifactId>
        </dependency>
        <dependency>
            <groupId>io.quarkus</groupId>
            <artifactId>quarkus-smallrye-openapi</artifactId>
        </dependency>
        <dependency>
            <groupId>io.quarkus</groupId>
            <artifactId>quarkus-junit5</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>io.rest-assured</groupId>
            <artifactId>rest-assured</artifactId>
            <scope>test</scope>
        </dependency>

Stacktrace when receiving the first message

2022-01-20 17:01:21,074 DEBUG [io.net.buf.AbstractByteBuf] (pool-11-thread-4) -Dio.netty.buffer.checkAccessible: true
2022-01-20 17:01:21,074 DEBUG [io.net.buf.AbstractByteBuf] (pool-11-thread-4) -Dio.netty.buffer.checkBounds: true
2022-01-20 17:01:21,075 DEBUG [io.net.uti.ResourceLeakDetectorFactory] (pool-11-thread-4) Loaded default ResourceLeakDetector: io.netty.util.ResourceLeakDetector@48625fbf
2022-01-20 17:01:21,106 INFO  [io.qua.dep.dev.RuntimeUpdatesProcessor] (pool-1-thread-1) Restarting quarkus due to changes in HotReloadSupportClass.class.
2022-01-20 17:01:21,124 INFO  [io.ver.rab.imp.RabbitMQClientImpl] (Quarkus Main Thread) Stopping rabbitmq client
2022-01-20 17:01:21,124 DEBUG [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-worker-thread-0) Disconnecting from rabbitmq...
2022-01-20 17:01:21,218 DEBUG [io.ver.rab.imp.QueueConsumerHandler] (pool-11-thread-5) consumer has been shutdown unexpectedly: amq.ctag-SecTD0iKyNsN_3K5MlviaQ
2022-01-20 17:01:21,219 DEBUG [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-worker-thread-0) Disconnected from rabbitmq !
2022-01-20 17:01:21,219 DEBUG [io.ver.rab.imp.RabbitMQClientImpl] (pool-11-thread-5) Start to reconnect...
2022-01-20 17:01:21,220 INFO  [io.ver.rab.imp.RabbitMQClientImpl] (pool-11-thread-5) Stopping rabbitmq client
2022-01-20 17:01:21,220 DEBUG [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-worker-thread-0) Disconnecting from rabbitmq...
2022-01-20 17:01:21,220 DEBUG [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-worker-thread-0) Disconnected from rabbitmq !
2022-01-20 17:01:21,221 INFO  [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-eventloop-thread-0) Starting rabbitmq client
2022-01-20 17:01:21,221 DEBUG [io.ver.rab.imp.RabbitMQClientImpl] (vert.x-worker-thread-0) Connecting to rabbitmq...
2022-01-20 17:01:21,222 DEBUG [io.qua.arc.impl] (Quarkus Main Thread) ArC DI container shut down
2022-01-20 17:01:21,222 INFO  [io.quarkus] (Quarkus Main Thread) demo stopped in 0.115s

If i remove this 2 depencies form POM everything works fine

 <dependency>
            <groupId>org.kie.kogito</groupId>
            <artifactId>kogito-quarkus</artifactId>
        </dependency>
        <dependency>
            <groupId>org.kie.kogito</groupId>
            <artifactId>kogito-addons-quarkus-messaging</artifactId>
            <version>1.15.0.Final</version>
        </dependency>

Its seem like when i receive the first message , hot reload mechanism is proccing

Expected behavior

No response

Actual behavior

No response

How to Reproduce?

You can find a reproducer at https://github.com/zakhdar/demo.git (dont forget to change RabbitMQ settings in application.properties or use RabbitMQ dev service)

Output of uname -a or ver

Windows 11

Output of java -version

11

GraalVM version (if different from Java)

No response

Quarkus version or git rev

2.6.2

Build tool (ie. output of mvnw --version or gradlew --version)

No response

Additional information

No response

quarkus-bot[bot] commented 2 years ago

/cc @Ladicek, @cescoffier, @evacchi, @jmartisk, @mariofusco, @michalszynkiewicz, @ozangunalp, @phillip-kruger, @radcortez

evacchi commented 2 years ago

the reactive messaging addon is currently quite opinionated in what it expects (i.e. basically, Kafka), so I am not surprised it won't work with rabbitmq. I am not sure what the current status of pluggability is, maybe @fjtirado or @ricardozanini have some further details

zakhdar commented 2 years ago

thanks @evacchi for the answer, isn't the fact of using reactive messaging supposed to make abstraction on the connector used (kafka , amqp , rabbitmq ...)

evacchi commented 2 years ago

indeed. However, the Kogito addon generates an event consumer that makes a few assumptions for simplicity.

We are working on a public API to allow end-users to write their own endpoints/event consumers (less automatic codegen involved). You can read more here https://blog.kie.org/2021/11/the-road-towards-a-kogito-public-api.html

the intended idea is that, if one diverges from the intended "happy path" you'll be able to customize at will

cescoffier commented 2 years ago

@evacchi should this issue be moved to Kogito? I don't believe we would be able to do anything from Quarkus.

evacchi commented 2 years ago

I have created https://issues.redhat.com/browse/KOGITO-6616

cescoffier commented 2 years ago

Thanks @evacchi !

Closing this one.