apache / rocketmq-flink

RocketMQ integration for Apache Flink. This module includes the RocketMQ source and sink that allows a flink job to either write messages into a topic or read from topics in a flink job.
https://rocketmq.apache.org/
Apache License 2.0
139 stars 88 forks source link

[BUG] topic route changed,job can't recover normally #83

Open deemogsw opened 1 year ago

deemogsw commented 1 year ago

1.Add partition Flink will check partition is changed in function of snapshotState.If route changed,job will switch state from running to failed. Then job will recover from checkpoint without new message queue record.Offset table alse has none new message queue,NPE will throw in RockertMQSourceFunction$run

image

2.Decrease partition If partition decreased,can't connect execption will throw in RockertMQSourceFunction$run.After five reries,function of run will close normally.Job will switch state from running to finished rather than failed because thread poll will swallow the connection exception

image
deemogsw commented 1 year ago

Refer to the following commit https://github.com/deemogsw/rocketMQ-flink-connector/commit/b070213f3a2976f691d0a7811f197b12f8cc18fb

SOD-DOB commented 1 year ago

I also encountered the same problem. After modification according to the above link, has the problem been fixed? @deemogsw

deemogsw commented 1 year ago

@SOD-DOB Fixed! But it just work for RokcetMQSourceFounction. You can merge above commit to your own code or using the latest code in my private repertory.

SOD-DOB commented 1 year ago

@deemogsw Have you ever encountered this kind of problem? It looks like I timed out while pulling messages, but nothing unusual was found on the server side of rocketmq

rocketmq-client version: 4.5.2

2023-02-27 08:47:14,109 WARN org.apache.rocketmq.flink.legacy.common.util.RetryUtil [] - RuntimeException, retry 5/5 java.lang.RuntimeException: org.apache.rocketmq.remoting.exception.RemotingSendRequestException: send request to <xx:10911> failed at org.apache.rocketmq.flink.legacy.RocketMQSourceFunction.lambda$null$1(RocketMQSourceFunction.java:389) ~[blob_p-62baae343e6f0a7fca0f63942204ff47885115d5-f336abddcb7f2a64f472c7c5d92f1394:?] at org.apache.rocketmq.flink.legacy.common.util.RetryUtil.call(RetryUtil.java:52) ~[blob_p-62baae343e6f0a7fca0f63942204ff47885115d5-f336abddcb7f2a64f472c7c5d92f1394:?] at org.apache.rocketmq.flink.legacy.RocketMQSourceFunction.lambda$run$2(RocketMQSourceFunction.java:279) ~[blob_p-62baae343e6f0a7fca0f63942204ff47885115d5-f336abddcb7f2a64f472c7c5d92f1394:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_302] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_302] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_302] Caused by: org.apache.rocketmq.remoting.exception.RemotingSendRequestException: send request to <xx:10911> failed at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract.invokeSyncImpl(NettyRemotingAbstract.java:429) ~[blob_p-62baae343e6f0a7fca0f63942204ff47885115d5-f336abddcb7f2a64f472c7c5d92f1394:?] at org.apache.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:375) ~[blob_p-62baae343e6f0a7fca0f63942204ff47885115d5-f336abddcb7f2a64f472c7c5d92f1394:?] at org.apache.rocketmq.client.impl.MQClientAPIImpl.pullMessageSync(MQClientAPIImpl.java:737) ~[blob_p-62baae343e6f0a7fca0f63942204ff47885115d5-f336abddcb7f2a64f472c7c5d92f1394:?] at org.apache.rocketmq.client.impl.MQClientAPIImpl.pullMessage(MQClientAPIImpl.java:691) ~[blob_p-62baae343e6f0a7fca0f63942204ff47885115d5-f336abddcb7f2a64f472c7c5d92f1394:?] at org.apache.rocketmq.client.impl.consumer.PullAPIWrapper.pullKernelImpl(PullAPIWrapper.java:199) ~[blob_p-62baae343e6f0a7fca0f63942204ff47885115d5-f336abddcb7f2a64f472c7c5d92f1394:?] at org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl.pullSyncImpl(DefaultMQPullConsumerImpl.java:249) ~[blob_p-62baae343e6f0a7fca0f63942204ff47885115d5-f336abddcb7f2a64f472c7c5d92f1394:?] at org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl.pullBlockIfNotFound(DefaultMQPullConsumerImpl.java:529) ~[blob_p-62baae343e6f0a7fca0f63942204ff47885115d5-f336abddcb7f2a64f472c7c5d92f1394:?] at org.apache.rocketmq.client.consumer.DefaultMQPullConsumer.pullBlockIfNotFound(DefaultMQPullConsumer.java:364) ~[blob_p-62baae343e6f0a7fca0f63942204ff47885115d5-f336abddcb7f2a64f472c7c5d92f1394:?] at org.apache.rocketmq.flink.legacy.RocketMQSourceFunction.lambda$null$1(RocketMQSourceFunction.java:288) ~[blob_p-62baae343e6f0a7fca0f63942204ff47885115d5-f336abddcb7f2a64f472c7c5d92f1394:?] ... 5 more

deemogsw commented 1 year ago

@SOD-DOB This exception looks like an inside error in RMQ broker.You can check the log in this machine. [send request to ]

humkum commented 3 months ago

See https://github.com/apache/rocketmq-flink/pull/96