apache / eventmesh

EventMesh is a new generation serverless event middleware for building distributed event-driven applications.
https://eventmesh.apache.org/
Apache License 2.0
1.61k stars 638 forks source link

[Bug] Unable to Consume Messages using AsyncSubscribe in RabbitMQ Storage Plugin #4394

Open Pil0tXia opened 1 year ago

Pil0tXia commented 1 year ago

Search before asking

Environment

Windows

EventMesh version

master

What happened

When using RabbitMQ as the storage plugin, the org.apache.eventmesh.tcp.demo.pub.cloudevents.AsyncPublish can place messages into the queue. However, the org.apache.eventmesh.tcp.demo.sub.cloudevents.AsyncSubscribe is unable to consume the messages.

The RabbitMQ storage plugin was not authored by xwm1992.

How to reproduce

image image image

Debug logs

org.apache.eventmesh.tcp.demo.pub.cloudevents.AsyncPublish:

2023-08-23 19:00:42,022 INFO  [main] CloudEventTCPSubClient(CloudEventTCPSubClient.java:73) - SimpleSubClientImpl|997|started!
2023-08-23 19:00:42,028 INFO  [main] AsyncPublish(AsyncPublish.java:58) - begin send async msg[0]: CloudEvent{id='ebe0443b-d6cd-49aa-8b14-46932343ed92', source=/, type='cloudevents', datacontenttype='application/cloudevents+json', subject='TEST-TOPIC-TCP-ASYNC', data=BytesCloudEventData{value=[123, 34, 99, 111, 110, 116, 101, 110, 116, 34, 58, 34, 116, 101, 115, 116, 65, 115, 121, 110, 99, 77, 101, 115, 115, 97, 103, 101, 34, 125]}, extensions={ttl=30000}}
2023-08-23 19:00:42,050 INFO  [main] CloudEventTCPPubClient(CloudEventTCPPubClient.java:108) - SimplePubClientImpl cloud event|997|publish|send|type=ASYNC_MESSAGE_TO_SERVER|protocol=cloudevents|msg=org.apache.eventmesh.common.protocol.tcp.Package@25f4878b
2023-08-23 19:00:42,050 INFO  [main] RequestContext(RequestContext.java:76) - _RequestContext|create|key=7750722003
2023-08-23 19:00:42,055 DEBUG [nioEventLoopGroup-2-1] Codec(Codec.java:62) - Encoder pkg={"header":{"cmd":"ASYNC_MESSAGE_TO_SERVER","code":0,"seq":"7750722003","properties":{"protocolversion":"1.0","protocoldesc":"tcp","protocoltype":"cloudevents"},"command":"ASYNC_MESSAGE_TO_SERVER"},"body":"eyJzcGVjdmVyc2lvbiI6IjEuMCIsImlkIjoiZWJlMDQ0M2ItZDZjZC00OWFhLThiMTQtNDY5MzIzNDNlZDkyIiwic291cmNlIjoiLyIsInR5cGUiOiJjbG91ZGV2ZW50cyIsImRhdGFjb250ZW50dHlwZSI6ImFwcGxpY2F0aW9uL2Nsb3VkZXZlbnRzK2pzb24iLCJzdWJqZWN0IjoiVEVTVC1UT1BJQy1UQ1AtQVNZTkMiLCJ0dGwiOiIzMDAwMCIsImRhdGEiOnsiY29udGVudCI6InRlc3RBc3luY01lc3NhZ2UifX0="}
2023-08-23 19:00:42,172 DEBUG [nioEventLoopGroup-2-1] Codec(Codec.java:150) - Decode headerJson={"cmd":"ASYNC_MESSAGE_TO_SERVER_ACK","code":0,"desc":"success","seq":"7750722003","properties":{},"command":"ASYNC_MESSAGE_TO_SERVER_ACK"}
2023-08-23 19:00:42,172 DEBUG [nioEventLoopGroup-2-1] Codec(Codec.java:162) - Decode bodyJson={"data":{"node":{"content":"testAsyncMessage"}},"id":"ebe0443b-d6cd-49aa-8b14-46932343ed92","source":"/","type":"cloudevents","subject":"TEST-TOPIC-TCP-ASYNC","dataContentType":"application/cloudevents+json","specVersion":"V1","extensionNames":["protocolversion","reqeventmesh2mqtimestamp","reqsendeventmeship","protocoldesc","protocoltype","ttl","reqc2eventmeshtimestamp"],"attributeNames":["datacontenttype","subject","specversion","id","source","type"]}
2023-08-23 19:00:42,173 INFO  [nioEventLoopGroup-2-1] AbstractEventMeshTCPPubHandler(AbstractEventMeshTCPPubHandler.java:45) - SimplePubClientImpl|receive|msg=org.apache.eventmesh.common.protocol.tcp.Package@4ae1b36b
2023-08-23 19:00:43,187 INFO  [main] AsyncPublish(AsyncPublish.java:58) - begin send async msg[1]: CloudEvent{id='ea09364e-e3ac-49b6-bcc4-31f61e432156', source=/, type='cloudevents', datacontenttype='application/cloudevents+json', subject='TEST-TOPIC-TCP-ASYNC', data=BytesCloudEventData{value=[123, 34, 99, 111, 110, 116, 101, 110, 116, 34, 58, 34, 116, 101, 115, 116, 65, 115, 121, 110, 99, 77, 101, 115, 115, 97, 103, 101, 34, 125]}, extensions={ttl=30000}}
2023-08-23 19:00:43,187 INFO  [main] CloudEventTCPPubClient(CloudEventTCPPubClient.java:108) - SimplePubClientImpl cloud event|997|publish|send|type=ASYNC_MESSAGE_TO_SERVER|protocol=cloudevents|msg=org.apache.eventmesh.common.protocol.tcp.Package@4e423aa2
2023-08-23 19:00:43,187 INFO  [main] RequestContext(RequestContext.java:76) - _RequestContext|create|key=1345627544
2023-08-23 19:00:43,187 DEBUG [nioEventLoopGroup-2-1] Codec(Codec.java:62) - Encoder pkg={"header":{"cmd":"ASYNC_MESSAGE_TO_SERVER","code":0,"seq":"1345627544","properties":{"protocolversion":"1.0","protocoldesc":"tcp","protocoltype":"cloudevents"},"command":"ASYNC_MESSAGE_TO_SERVER"},"body":"eyJzcGVjdmVyc2lvbiI6IjEuMCIsImlkIjoiZWEwOTM2NGUtZTNhYy00OWI2LWJjYzQtMzFmNjFlNDMyMTU2Iiwic291cmNlIjoiLyIsInR5cGUiOiJjbG91ZGV2ZW50cyIsImRhdGFjb250ZW50dHlwZSI6ImFwcGxpY2F0aW9uL2Nsb3VkZXZlbnRzK2pzb24iLCJzdWJqZWN0IjoiVEVTVC1UT1BJQy1UQ1AtQVNZTkMiLCJ0dGwiOiIzMDAwMCIsImRhdGEiOnsiY29udGVudCI6InRlc3RBc3luY01lc3NhZ2UifX0="}
2023-08-23 19:00:43,192 DEBUG [nioEventLoopGroup-2-1] Codec(Codec.java:150) - Decode headerJson={"cmd":"ASYNC_MESSAGE_TO_SERVER_ACK","code":0,"desc":"success","seq":"1345627544","properties":{},"command":"ASYNC_MESSAGE_TO_SERVER_ACK"}
2023-08-23 19:00:43,192 DEBUG [nioEventLoopGroup-2-1] Codec(Codec.java:162) - Decode bodyJson={"data":{"node":{"content":"testAsyncMessage"}},"id":"ea09364e-e3ac-49b6-bcc4-31f61e432156","source":"/","type":"cloudevents","subject":"TEST-TOPIC-TCP-ASYNC","dataContentType":"application/cloudevents+json","specVersion":"V1","extensionNames":["protocolversion","reqeventmesh2mqtimestamp","reqsendeventmeship","protocoldesc","protocoltype","ttl","reqc2eventmeshtimestamp"],"attributeNames":["datacontenttype","subject","specversion","id","source","type"]}
2023-08-23 19:00:43,192 INFO  [nioEventLoopGroup-2-1] AbstractEventMeshTCPPubHandler(AbstractEventMeshTCPPubHandler.java:45) - SimplePubClientImpl|receive|msg=org.apache.eventmesh.common.protocol.tcp.Package@1b2e16dc

org.apache.eventmesh.tcp.demo.sub.cloudevents.AsyncSubscribe:

2023-08-23 19:00:43,479 DEBUG [nioEventLoopGroup-3-1] Codec(Codec.java:62) - Encoder pkg={"header":{"cmd":"HELLO_REQUEST","code":0,"seq":"3280786264","properties":{},"command":"HELLO_REQUEST"},"body":{"env":"test","subsystem":"5017","path":"/data/app/umg_proxy","pid":42893,"host":"localhost","port":9362,"version":"2.0.11","username":"PU4283","password":"21524617","idc":"FT","group":"EventmeshTestGroup","purpose":"sub","unack":0}}
2023-08-23 19:00:43,484 DEBUG [nioEventLoopGroup-3-1] Codec(Codec.java:150) - Decode headerJson={"cmd":"HELLO_RESPONSE","code":0,"desc":"success","seq":"3280786264","properties":{},"command":"HELLO_RESPONSE"}
2023-08-23 19:00:43,484 INFO  [nioEventLoopGroup-3-1] AbstractEventMeshTCPSubHandler(AbstractEventMeshTCPSubHandler.java:48) - |receive|type=HELLO_RESPONSE|msg=org.apache.eventmesh.common.protocol.tcp.Package@3e758221
2023-08-23 19:00:43,485 ERROR [nioEventLoopGroup-3-1] AbstractEventMeshTCPSubHandler(AbstractEventMeshTCPSubHandler.java:66) - msg ignored|HELLO_RESPONSE|org.apache.eventmesh.common.protocol.tcp.Package@3e758221
2023-08-23 19:00:43,485 INFO  [main] CloudEventTCPSubClient(CloudEventTCPSubClient.java:73) - SimpleSubClientImpl|745|started!
2023-08-23 19:00:43,489 INFO  [main] RequestContext(RequestContext.java:76) - _RequestContext|create|key=1162117433
2023-08-23 19:00:43,497 DEBUG [nioEventLoopGroup-3-1] Codec(Codec.java:62) - Encoder pkg={"header":{"cmd":"SUBSCRIBE_REQUEST","code":0,"seq":"1162117433","properties":{},"command":"SUBSCRIBE_REQUEST"},"body":{"topicList":[{"topic":"TEST-TOPIC-TCP-ASYNC","mode":"CLUSTERING","type":"ASYNC"}]}}
2023-08-23 19:00:43,538 DEBUG [nioEventLoopGroup-3-1] Codec(Codec.java:150) - Decode headerJson={"cmd":"SUBSCRIBE_RESPONSE","code":0,"desc":"success","seq":"1162117433","properties":{},"command":"SUBSCRIBE_RESPONSE"}
2023-08-23 19:00:43,539 INFO  [nioEventLoopGroup-3-1] AbstractEventMeshTCPSubHandler(AbstractEventMeshTCPSubHandler.java:48) - |receive|type=SUBSCRIBE_RESPONSE|msg=org.apache.eventmesh.common.protocol.tcp.Package@6b701d42
2023-08-23 19:00:43,539 ERROR [nioEventLoopGroup-3-1] AbstractEventMeshTCPSubHandler(AbstractEventMeshTCPSubHandler.java:66) - msg ignored|SUBSCRIBE_RESPONSE|org.apache.eventmesh.common.protocol.tcp.Package@6b701d42
2023-08-23 19:00:43,539 INFO  [main] RequestContext(RequestContext.java:76) - _RequestContext|create|key=8573122138
2023-08-23 19:00:43,540 DEBUG [nioEventLoopGroup-3-1] Codec(Codec.java:62) - Encoder pkg={"header":{"cmd":"LISTEN_REQUEST","code":0,"seq":"8573122138","properties":{},"command":"LISTEN_REQUEST"}}
2023-08-23 19:00:43,631 DEBUG [nioEventLoopGroup-3-1] Codec(Codec.java:150) - Decode headerJson={"cmd":"LISTEN_RESPONSE","code":0,"desc":"success","seq":"8573122138","properties":{},"command":"LISTEN_RESPONSE"}
2023-08-23 19:00:43,631 INFO  [nioEventLoopGroup-3-1] AbstractEventMeshTCPSubHandler(AbstractEventMeshTCPSubHandler.java:48) - |receive|type=LISTEN_RESPONSE|msg=org.apache.eventmesh.common.protocol.tcp.Package@13383e23
2023-08-23 19:00:43,631 ERROR [nioEventLoopGroup-3-1] AbstractEventMeshTCPSubHandler(AbstractEventMeshTCPSubHandler.java:66) - msg ignored|LISTEN_RESPONSE|org.apache.eventmesh.common.protocol.tcp.Package@13383e23
2023-08-23 19:01:13,482 INFO  [TCPClientScheduler-1] RequestContext(RequestContext.java:76) - _RequestContext|create|key=8077210517

Are you willing to submit PR?

Code of Conduct

fabian4 commented 1 year ago

I can't reproduce it since the meshMQAdmin plugin for rabbitmq is still in progress.

23:05:29.431 [main] ERROR org.apache.eventmesh.runtime.core.plugin.MQAdminWrapper - can't load the meshMQAdmin plugin, please check.
23:05:29.434 [main] ERROR org.apache.eventmesh.runtime.boot.EventMeshStartup - EventMesh start fail.
java.lang.RuntimeException: doesn't load the meshMQAdmin plugin, please check.
    at org.apache.eventmesh.runtime.core.plugin.MQAdminWrapper.<init>(MQAdminWrapper.java:41) ~[eventmesh-runtime-1.9.0-release.jar:1.9.0-release]
    at org.apache.eventmesh.runtime.admin.handler.TopicHandler.<init>(TopicHandler.java:77) ~[eventmesh-runtime-1.9.0-release.jar:1.9.0-release]
    at org.apache.eventmesh.runtime.admin.controller.ClientManageController.initClientHandler(ClientManageController.java:152) ~[eventmesh-runtime-1.9.0-release.jar:1.9.0-release]
    at org.apache.eventmesh.runtime.admin.controller.ClientManageController.start(ClientManageController.java:117) ~[eventmesh-runtime-1.9.0-release.jar:1.9.0-release]
    at org.apache.eventmesh.runtime.boot.EventMeshServer.start(EventMeshServer.java:169) ~[eventmesh-runtime-1.9.0-release.jar:1.9.0-release]
    at org.apache.eventmesh.runtime.boot.EventMeshStartup.main(EventMeshStartup.java:40) [eventmesh-runtime-1.9.0-release.jar:1.9.0-release]
    at org.apache.eventmesh.starter.StartUp.main(StartUp.java:25) [main/:?]

> Task :eventmesh-starter:StartUp.main() FAILED

Maybe you can provide a valid branch for this scenario or the eventmesh-conncetor-rabbitmq may do some help. https://github.com/apache/eventmesh/pull/4393

BTW I think it will be more clear if you can provide the detail log in org.apache.eventmesh.storage.rabbitmq.consumer.RabbitmqConsumer.

Pil0tXia commented 1 year ago

@fabian4 Thank you for your attention. You may merge PR https://github.com/apache/eventmesh/pull/4395 to resolve the startup failure you mentioned and proceed to reach the bug of this PR.

BTW, there is no message production/consumption relationship between the connector module and the storage-plugin module.

pandaapo commented 1 year ago

You may merge PR https://github.com/apache/eventmesh/pull/4395 to resolve the startup failure you mentioned and proceed to reach the bug of this PR.

Do you mean that the current master code will not encounter this bug? If that's the case, I suggest you wait until PR 4395 is merged before opening the issue. Otherwise, it will be confusing.

BTW, there is no message production/consumption relationship between the connector module and the storage-plugin module.

Community is moving the storage module to the connector module. The storage module may be removed in the future.

fabian4 commented 1 year ago

After checkout https://github.com/Pil0tXia/eventmesh/tree/pil0txia_feat_4390 in https://github.com/apache/eventmesh/pull/4395

Eventmesh-storage-rabbitmq still works when producing and consuming messages.

my env: https://github.com/fabian4/eventmesh/tree/pil0txia_feat_4390

image

image

pandaapo commented 1 year ago

Was it fixed by your modifications here? @fabian4

https://github.com/apache/eventmesh/commit/3f1783f6abc58722e14f85d673e3f5c42ca4fa4d

fabian4 commented 1 year ago

I think the problem is in producer. We should bind the channel with routeKey and exchangeName when we init the producer connection otherwise there is no message in the queue to consume. (But we actually do bind it when we init the consumer connection. @Pil0tXia

image

https://github.com/apache/eventmesh/blob/fd1102ac04644e1e22e2c2cda11038a0819d8520/eventmesh-storage-plugin/eventmesh-storage-rabbitmq/src/main/java/org/apache/eventmesh/storage/rabbitmq/producer/RabbitmqProducer.java#L92-L98

https://github.com/apache/eventmesh/blob/fd1102ac04644e1e22e2c2cda11038a0819d8520/eventmesh-storage-plugin/eventmesh-storage-rabbitmq/src/main/java/org/apache/eventmesh/storage/rabbitmq/consumer/RabbitmqConsumer.java#L113-L118

https://github.com/apache/eventmesh/blob/fd1102ac04644e1e22e2c2cda11038a0819d8520/eventmesh-connectors/eventmesh-connector-rabbitmq/src/main/java/org/apache/eventmesh/connector/rabbitmq/sink/connector/RabbitMQSinkConnector.java#L77-L84

https://github.com/apache/eventmesh/blob/fd1102ac04644e1e22e2c2cda11038a0819d8520/eventmesh-connectors/eventmesh-connector-rabbitmq/src/main/java/org/apache/eventmesh/connector/rabbitmq/source/connector/RabbitMQSourceConnector.java#L99-L107


Was it fixed by your modifications here? @fabian4 https://github.com/apache/eventmesh/commit/3f1783f6abc58722e14f85d673e3f5c42ca4fa4d

@pandaapo This is to roll back the change I have made when I do some local test. It's irrelevant.

github-actions[bot] commented 7 months ago

It has been 90 days since the last activity on this issue. Apache EventMesh values the voices of the community. Please don't hesitate to share your latest insights on this matter at any time, as the community is more than willing to engage in discussions regarding the development and optimization directions of this feature.

If you feel that your issue has been resolved, please feel free to close it. Should you have any additional information to share, you are welcome to reopen this issue.