apache / rocketmq-flink

RocketMQ integration for Apache Flink. This module includes the RocketMQ source and sink that allows a flink job to either write messages into a topic or read from topics in a flink job.
https://rocketmq.apache.org/
Apache License 2.0
144 stars 96 forks source link

The message queue is not in assigned list #79

Open icchux opened 2 years ago

icchux commented 2 years ago

When I tested connecting to Alibaba Cloud RocketMQ in Local Idea, I encountered some problems. How can I solve this problem

env

4.7.1 1.15.0 ### Demo Code `Properties consumerProps = new Properties(); consumerProps.setProperty(RocketMQConfig.NAME_SERVER_ADDR, ONS_ADDR); consumerProps.setProperty(RocketMQConfig.CONSUMER_GROUP, CONSUMER_GROUP); consumerProps.setProperty(RocketMQConfig.CONSUMER_TOPIC, CONSUMER_TOPIC); consumerProps.setProperty(RocketMQConfig.ACCESS_KEY, ONS_KEY); consumerProps.setProperty(RocketMQConfig.SECRET_KEY, SECRET_KEY); RocketMQSourceFunction> source = new RocketMQSourceFunction( new SimpleKeyValueDeserializationSchema("id", "data"), consumerProps); source.setStartFromGroupOffsets(OffsetResetStrategy.LATEST); env.addSource(source).setParallelism(1).print(" =======> ");` ### ErrorLog Caused by: org.apache.rocketmq.client.exception.MQClientException: The message queue is not in assigned list, message queue: MessageQueue [topic=MQ_INST_1956905707885195_BauvlVUQ%dev-po-parcel-route-add, brokerName=qd-internet-pull-01, queueId=0] For more information, please visit the url, http://rocketmq.apache.org/docs/faq/ at org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl.seek(DefaultLitePullConsumerImpl.java:543) at org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl.seekToEnd(DefaultLitePullConsumerImpl.java:565) at org.apache.rocketmq.client.consumer.DefaultLitePullConsumer.seekToEnd(DefaultLitePullConsumer.java:297) at org.apache.rocketmq.flink.legacy.RocketMQSourceFunction.initOffsets(RocketMQSourceFunction.java:396) at org.apache.rocketmq.flink.legacy.RocketMQSourceFunction.open(RocketMQSourceFunction.java:254) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:100) at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676) at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) at java.lang.Thread.run(Thread.java:748)
deemogsw commented 1 year ago

please reset the branch of master to 90b00be02dd019ffed48cbc35cd050b07c4a36ab.The pr of #46 has some bugs and author has not fixed it yet. You can look the detail of this bug in issue https://github.com/apache/rocketmq-flink/issues/69#issuecomment-1295919302

gj-zhang commented 1 year ago

is there any progress ?

qiaohanqing commented 1 year ago

is there any progress ?

deemogsw commented 1 year ago

@qiaohanqing @gj-zhang cc https://github.com/apache/rocketmq-flink/issues/79#issuecomment-1340825439