apache / rocketmq-flink

RocketMQ integration for Apache Flink. This module includes the RocketMQ source and sink that allows a flink job to either write messages into a topic or read from topics in a flink job.
https://rocketmq.apache.org/
Apache License 2.0
139 stars 87 forks source link

The message queue is not in assigned list #107

Open andyyumiao opened 6 months ago

andyyumiao commented 6 months ago

I got the error when i consume rocketmq message in flink job:

17:51:50.537 [rmq-pull-thread-1] ERROR org.apache.flink.connector.rocketmq.legacy.common.util.RetryUtil - RuntimeException, retry 2/5
java.lang.RuntimeException: org.apache.rocketmq.client.exception.MQClientException: The message queue is not in assigned list, may be rebalancing, message queue: MessageQueue [topic=yumiao1205, brokerName=broker-a, queueId=1]
For more information, please visit the url, https://rocketmq.apache.org/docs/bestPractice/06FAQ
    at org.apache.flink.connector.rocketmq.legacy.RocketMQSourceFunction.lambda$null$1(RocketMQSourceFunction.java:348)
    at org.apache.flink.connector.rocketmq.legacy.common.util.RetryUtil.call(RetryUtil.java:56)
    at org.apache.flink.connector.rocketmq.legacy.RocketMQSourceFunction.lambda$run$2(RocketMQSourceFunction.java:276)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.rocketmq.client.exception.MQClientException: The message queue is not in assigned list, may be rebalancing, message queue: MessageQueue [topic=yumiao1205, brokerName=broker-a, queueId=1]
For more information, please visit the url, https://rocketmq.apache.org/docs/bestPractice/06FAQ
    at org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl.seek(DefaultLitePullConsumerImpl.java:658)
    at org.apache.rocketmq.client.consumer.DefaultLitePullConsumer.seek(DefaultLitePullConsumer.java:298)
    at org.apache.flink.connector.rocketmq.legacy.RocketMQSourceFunction.lambda$null$1(RocketMQSourceFunction.java:282)
    ... 5 common frames omitted
17:51:51.342 [rmq-pull-thread-2] ERROR org.apache.flink.connector.rocketmq.legacy.common.util.RetryUtil - RuntimeException, retry 3/5
java.lang.RuntimeException: org.apache.rocketmq.client.exception.MQClientException: The message queue is not in assigned list, may be rebalancing, message queue: MessageQueue [topic=yumiao1205, brokerName=broker-a, queueId=2]
For more information, please visit the url, https://rocketmq.apache.org/docs/bestPractice/06FAQ
    at org.apache.flink.connector.rocketmq.legacy.RocketMQSourceFunction.lambda$null$1(RocketMQSourceFunction.java:348)
    at org.apache.flink.connector.rocketmq.legacy.common.util.RetryUtil.call(RetryUtil.java:56)
    at org.apache.flink.connector.rocketmq.legacy.RocketMQSourceFunction.lambda$run$2(RocketMQSourceFunction.java:276)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.rocketmq.client.exception.MQClientException: The message queue is not in assigned list, may be rebalancing, message queue: MessageQueue [topic=yumiao1205, brokerName=broker-a, queueId=2]
For more information, please visit the url, https://rocketmq.apache.org/docs/bestPractice/06FAQ
    at org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl.seek(DefaultLitePullConsumerImpl.java:658)
    at org.apache.rocketmq.client.consumer.DefaultLitePullConsumer.seek(DefaultLitePullConsumer.java:298)
    at org.apache.flink.connector.rocketmq.legacy.RocketMQSourceFunction.lambda$null$1(RocketMQSourceFunction.java:282)
    ... 5 common frames omitted

My rocketmq version: 5.1.1 flink version: 1.15.0

cj495840252 commented 3 months ago

I also encounter this error, it's seem that still not solved

humkum commented 3 months ago

See https://github.com/apache/rocketmq-flink/pull/96

cj495840252 commented 3 months ago

I am still have this error while i run the simpleConsumer it's working normally, it's mean can connect rocketmq? image

it's still failed while I run the ConnectorExample RocketMQ: 5.0.0

2024-03-12 23:05:42  WARN [   Source: Custom Source (1/2)#0] [e.flink.runtime.taskmanager.Task] Source: Custom Source (1/2)#0 (ea15ccd58a1ef50baebef0860ee2e52b) switched from INITIALIZING to FAILED with failure cause: org.apache.rocketmq.client.exception.MQClientException: The message queue is not in assigned list, message queue: MessageQueue [topic=test, brokerName=broker-a, queueId=1]
For more information, please visit the url, https://rocketmq.apache.org/docs/bestPractice/06FAQ
    at org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl.seek(DefaultLitePullConsumerImpl.java:660)
    at org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl.seekToEnd(DefaultLitePullConsumerImpl.java:693)
    at org.apache.rocketmq.client.consumer.DefaultLitePullConsumer.seekToEnd(DefaultLitePullConsumer.java:394)
    at org.apache.flink.connector.rocketmq.legacy.RocketMQSourceFunction.initOffsets(RocketMQSourceFunction.java:394)
    at org.apache.flink.connector.rocketmq.legacy.RocketMQSourceFunction.open(RocketMQSourceFunction.java:246)
    at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:100)
    at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643)
    at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
    at java.lang.Thread.run(Thread.java:750)
cj495840252 commented 3 months ago

See #96 hi, good morning , this is my setting image

madi1819 commented 3 months ago

I have also encountered this problem. Has it been resolved image

madi1819 commented 3 months ago

See #96

this is my test code image

loserwang1024 commented 3 months ago

See #96 It seems do nothing!

ping-cai commented 3 weeks ago

it seens do nothing! this brantch 'main-latest' still exist! who can help fix this problem RocketMQ Version:4.9.2 Flink Version: 1.15.2 problem details: Caused by: org.apache.rocketmq.client.exception.MQClientException: The message queue is not in assigned list, message queue: MessageQueue [topic=tp_ld_driver_behavior_score_change, brokerName=broker-onlinemq1-d, queueId=0] For more information, please visit the url, http://rocketmq.apache.org/docs/faq/ I guess this issue comes from the class 'DefaultLitePullConsumer',Previously, it was DefaultMQPullConsumer,This class will be removed in 2022, and a better implementation {@link DefaultLitePullConsumer} is recommend to use