apache / eventmesh

EventMesh is a new generation serverless event middleware for building distributed event-driven applications.
https://eventmesh.apache.org/
Apache License 2.0
1.61k stars 639 forks source link

RabbitMQ durable error when consuming #4588

Open brampurnot opened 11 months ago

brampurnot commented 11 months ago

Search before asking

Environment

Mac

EventMesh version

master

What happened

I'm the event mesh with rabbitMQ as the store. Producing messages is working fine and it nicely appears in my RabbitMq dashboard. However when I try to consume messages via Apache Event Mesh (HTTP API call), then I'm seeing this error: 2023-11-30 13:01:35,309 ERROR [EventMesh-Rabbitmq-Consumer-1] RabbitmqConsumerHandler(RabbitmqConsumerHandler.java:71) - [RabbitmqConsumerHandler] thread run happen exception. com.rabbitmq.client.AlreadyClosedException: channel is already closed due to channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for exchange 'eventmesh.default' in vhost '/': received 'true' but current is 'false', class-id=40, method-id=10) at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:258) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.AMQChannel.rpc(AMQChannel.java:341) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:282) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:141) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.ChannelN.basicGet(ChannelN.java:1154) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicGet(AutorecoveringChannel.java:434) ~[amqp-client-5.16.0.jar:5.16.0] at org.apache.eventmesh.storage.rabbitmq.consumer.RabbitmqConsumerHandler.run(RabbitmqConsumerHandler.java:53) [eventmesh-storage-rabbitmq-1.9.0-release.jar:1.9.0-release] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_111] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_111] at java.lang.Thread.run(Thread.java:745) [?:1.8.0_111]

Initially I had the RabbitMQ exchange configured as durable but I changed it to transient. However the problem is still there. Here is the config in RabbitMQ: Screenshot 2023-11-30 at 14 07 38

And the queue: Screenshot 2023-11-30 at 14 07 47

Anyone faced this issue before? I tried with multiple different settings but still no luck.

How to reproduce

Configure RabbitMQ as store. Consume a message via an HTTP API call.

Debug logs

2023-11-30 13:01:35,309 ERROR [EventMesh-Rabbitmq-Consumer-1] RabbitmqConsumerHandler(RabbitmqConsumerHandler.java:71) - [RabbitmqConsumerHandler] thread run happen exception. com.rabbitmq.client.AlreadyClosedException: channel is already closed due to channel error; protocol method: #method(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for exchange 'eventmesh.default' in vhost '/': received 'true' but current is 'false', class-id=40, method-id=10) at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:258) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.AMQChannel.rpc(AMQChannel.java:341) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:282) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:141) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.ChannelN.basicGet(ChannelN.java:1154) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicGet(AutorecoveringChannel.java:434) ~[amqp-client-5.16.0.jar:5.16.0] at org.apache.eventmesh.storage.rabbitmq.consumer.RabbitmqConsumerHandler.run(RabbitmqConsumerHandler.java:53) [eventmesh-storage-rabbitmq-1.9.0-release.jar:1.9.0-release] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_111] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_111] at java.lang.Thread.run(Thread.java:745) [?:1.8.0_111] 2023-11-30 13:01:35,309 ERROR [EventMesh-Rabbitmq-Consumer-1] RabbitmqConsumerHandler(RabbitmqConsumerHandler.java:71) - [RabbitmqConsumerHandler] thread run happen exception. com.rabbitmq.client.AlreadyClosedException: channel is already closed due to channel error; protocol method: #method(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for exchange 'eventmesh.default' in vhost '/': received 'true' but current is 'false', class-id=40, method-id=10) at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:258) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.AMQChannel.rpc(AMQChannel.java:341) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:282) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:141) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.ChannelN.basicGet(ChannelN.java:1154) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicGet(AutorecoveringChannel.java:434) ~[amqp-client-5.16.0.jar:5.16.0] at org.apache.eventmesh.storage.rabbitmq.consumer.RabbitmqConsumerHandler.run(RabbitmqConsumerHandler.java:53) [eventmesh-storage-rabbitmq-1.9.0-release.jar:1.9.0-release] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_111] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_111] at java.lang.Thread.run(Thread.java:745) [?:1.8.0_111] 2023-11-30 13:01:35,309 ERROR [EventMesh-Rabbitmq-Consumer-1] RabbitmqConsumerHandler(RabbitmqConsumerHandler.java:71) - [RabbitmqConsumerHandler] thread run happen exception. com.rabbitmq.client.AlreadyClosedException: channel is already closed due to channel error; protocol method: #method(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for exchange 'eventmesh.default' in vhost '/': received 'true' but current is 'false', class-id=40, method-id=10) at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:258) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.AMQChannel.rpc(AMQChannel.java:341) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:282) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:141) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.ChannelN.basicGet(ChannelN.java:1154) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicGet(AutorecoveringChannel.java:434) ~[amqp-client-5.16.0.jar:5.16.0] at org.apache.eventmesh.storage.rabbitmq.consumer.RabbitmqConsumerHandler.run(RabbitmqConsumerHandler.java:53) [eventmesh-storage-rabbitmq-1.9.0-release.jar:1.9.0-release] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_111] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_111] at java.lang.Thread.run(Thread.java:745) [?:1.8.0_111] 2023-11-30 13:01:35,309 ERROR [EventMesh-Rabbitmq-Consumer-1] RabbitmqConsumerHandler(RabbitmqConsumerHandler.java:71) - [RabbitmqConsumerHandler] thread run happen exception. com.rabbitmq.client.AlreadyClosedException: channel is already closed due to channel error; protocol method: #method(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for exchange 'eventmesh.default' in vhost '/': received 'true' but current is 'false', class-id=40, method-id=10) at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:258) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.AMQChannel.rpc(AMQChannel.java:341) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:282) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:141) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.ChannelN.basicGet(ChannelN.java:1154) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicGet(AutorecoveringChannel.java:434) ~[amqp-client-5.16.0.jar:5.16.0] at org.apache.eventmesh.storage.rabbitmq.consumer.RabbitmqConsumerHandler.run(RabbitmqConsumerHandler.java:53) [eventmesh-storage-rabbitmq-1.9.0-release.jar:1.9.0-release] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_111] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_111] at java.lang.Thread.run(Thread.java:745) [?:1.8.0_111] 2023-11-30 13:01:35,309 ERROR [EventMesh-Rabbitmq-Consumer-1] RabbitmqConsumerHandler(RabbitmqConsumerHandler.java:71) - [RabbitmqConsumerHandler] thread run happen exception. com.rabbitmq.client.AlreadyClosedException: channel is already closed due to channel error; protocol method: #method(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for exchange 'eventmesh.default' in vhost '/': received 'true' but current is 'false', class-id=40, method-id=10) at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:258) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.AMQChannel.rpc(AMQChannel.java:341) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:282) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:141) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.ChannelN.basicGet(ChannelN.java:1154) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicGet(AutorecoveringChannel.java:434) ~[amqp-client-5.16.0.jar:5.16.0] at org.apache.eventmesh.storage.rabbitmq.consumer.RabbitmqConsumerHandler.run(RabbitmqConsumerHandler.java:53) [eventmesh-storage-rabbitmq-1.9.0-release.jar:1.9.0-release] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_111] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_111] at java.lang.Thread.run(Thread.java:745) [?:1.8.0_111] 2023-11-30 13:01:35,309 ERROR [EventMesh-Rabbitmq-Consumer-1] RabbitmqConsumerHandler(RabbitmqConsumerHandler.java:71) - [RabbitmqConsumerHandler] thread run happen exception. com.rabbitmq.client.AlreadyClosedException: channel is already closed due to channel error; protocol method: #method(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for exchange 'eventmesh.default' in vhost '/': received 'true' but current is 'false', class-id=40, method-id=10) at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:258) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.AMQChannel.rpc(AMQChannel.java:341) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:282) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:141) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.ChannelN.basicGet(ChannelN.java:1154) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicGet(AutorecoveringChannel.java:434) ~[amqp-client-5.16.0.jar:5.16.0] at org.apache.eventmesh.storage.rabbitmq.consumer.RabbitmqConsumerHandler.run(RabbitmqConsumerHandler.java:53) [eventmesh-storage-rabbitmq-1.9.0-release.jar:1.9.0-release] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_111] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_111] at java.lang.Thread.run(Thread.java:745) [?:1.8.0_111] 2023-11-30 13:01:35,309 ERROR [EventMesh-Rabbitmq-Consumer-1] RabbitmqConsumerHandler(RabbitmqConsumerHandler.java:71) - [RabbitmqConsumerHandler] thread run happen exception. com.rabbitmq.client.AlreadyClosedException: channel is already closed due to channel error; protocol method: #method(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for exchange 'eventmesh.default' in vhost '/': received 'true' but current is 'false', class-id=40, method-id=10) at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:258) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.AMQChannel.rpc(AMQChannel.java:341) ~[amqp-client-5.16.0.jar:5.16.0] at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:282) ~[amqp-client-5.16.0.jar:5.16.0]

Are you willing to submit PR?

Code of Conduct

pandaapo commented 11 months ago

Could you please provide the code for your producer, consumer, and configuration files?

brampurnot commented 11 months ago

Below you can find the config files: config files.zip

I'm currently trying it out directly in Postman by executing a call to product the message (which works fine) and one for consuming it. It fails at the consuming part.

pandaapo commented 11 months ago

What is the consumer URL you use in Postman?

brampurnot commented 11 months ago

https:///eventmesh/subscribe/local

This is the body I'm using: { "url": "https://webhook.site/6ab3a4ba-34a7-47b3-9ffd-fdea2576f2fe", "consumerGroup": "TEST-GROUP", "topic": [ { "mode": "CLUSTERING", "topic": "eventmesh.default", "type": "ASYNC" } ] }

pandaapo commented 11 months ago

I want to reproduce the error you found, but after my subscription request was sent, it kept reporting the following error. Tracing this error, I found that the request body data could not be obtained by EventMesh. This confused me.

[eventMesh-clientManage-1] ERROR org.apache.eventmesh.runtime.core.protocol.http.processor.HandlerService - eventMesh protocol[body] error
java.lang.RuntimeException: eventMesh protocol[body] error
    at org.apache.eventmesh.runtime.core.protocol.http.processor.HandlerService$HandlerSpecific.sendErrorResponse(HandlerService.java:362) [main/:?]
    at org.apache.eventmesh.runtime.core.protocol.http.processor.LocalSubscribeEventProcessor.handler(LocalSubscribeEventProcessor.java:97) [main/:?]
    at org.apache.eventmesh.runtime.core.protocol.http.processor.HandlerService$HandlerSpecific.run(HandlerService.java:280) [main/:?]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_171]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_171]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_171]

The subscription request and request body I set in Postman are the same as yours.

brampurnot commented 11 months ago

Ok so I got some good news. I was able to get it to work. The exchange/queue wasn't created automatically so I created it mysefl. Something must be wrong when doing so because that's where the error came from.

I discovered that the exchange/queue gets created automatically after sending a first message. And with that exchange/queue, there is no issue. I'm looking through it now to see if I can find out what parameter was causing it but they look identical at the moment.

Bram

pandaapo commented 11 months ago

My exchange/queue is automatically created, but it reports the error I mentioned above. Now you can run it normally. I want to know if your subscription request in Postman is still the same as before. And what does your publish request in Postman look like?

brampurnot commented 11 months ago

This is the publish request: curl --location 'https://<k8s-cluster>/eventmesh/publish/eventmesh.default' \ --header 'Content-Type: application/json' \ --header 'Accept: application/json' \ --data '{ "id": "1b080838-2976-493d-897f-07803944f4d4", "specversion": "1.0", "source": "https://demo.synpase.com/workflow-start-event", "type": "com.synapse.demo/workflow-start-event", "datacontenttype": "application/json", "customerid": "CUSTOMER-1", "subject": "TEST-TOPIC", "data": { "customer": { "id": "customer1", "name": "John Doe", "SSN": 123456, "yearlyIncome": 50000, "address": "123 MyLane, MyCity, MyCountry", "employer": "MyCompany" } } }'

Here is the subscription: curl --location 'https://<k8s-cluster>/eventmesh/subscribe/local' \ --header 'Content-Type: application/json' \ --data '{ "url": "https://webhook.site/8cc8f0b6-63c6-45b2-a829-4f2df7a09300", "consumerGroup": "TEST-GROUP", "topic": [ { "mode": "CLUSTERING", "topic": "eventmesh.default", "type": "ASYNC" } ] }'

9997766 commented 7 months ago

nobody solve this problem?

github-actions[bot] commented 4 months ago

It has been 90 days since the last activity on this issue. Apache EventMesh values the voices of the community. Please don't hesitate to share your latest insights on this matter at any time, as the community is more than willing to engage in discussions regarding the development and optimization directions of this feature.

If you feel that your issue has been resolved, please feel free to close it. Should you have any additional information to share, you are welcome to reopen this issue.