apache / rocketmq-flink

RocketMQ integration for Apache Flink. This module includes the RocketMQ source and sink that allows a flink job to either write messages into a topic or read from topics in a flink job.
https://rocketmq.apache.org/
Apache License 2.0
139 stars 88 forks source link

I have a question about DEFAULT_START_MESSAGE_OFFSET #75

Open SOD-DOB opened 1 year ago

SOD-DOB commented 1 year ago

public static final long DEFAULT_START_MESSAGE_OFFSET = -1;

Why is the default value -1? It causes the following error:

java.lang.RuntimeException: org.apache.rocketmq.client.exception.MQClientException: offset < 0
For more information, please visit the url, http://rocketmq.apache.org/docs/faq/
    at org.apache.rocketmq.flink.legacy.RocketMQSourceFunction.lambda$null$1(RocketMQSourceFunction.java:339) ~[blob_p-b39848a6520d4473030e80a1af68842bda0d1497-439037d8a6ae48b5fa69464d73041c8f:?]
    at org.apache.rocketmq.flink.legacy.common.util.RetryUtil.call(RetryUtil.java:46) ~[blob_p-b39848a6520d4473030e80a1af68842bda0d1497-439037d8a6ae48b5fa69464d73041c8f:?]
    at org.apache.rocketmq.flink.legacy.RocketMQSourceFunction.lambda$run$2(RocketMQSourceFunction.java:243) ~[blob_p-b39848a6520d4473030e80a1af68842bda0d1497-439037d8a6ae48b5fa69464d73041c8f:?]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_302]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_302]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_302]
deemogsw commented 1 year ago

public static final long DEFAULT_START_MESSAGE_OFFSET = -1;

It doesn't look like master branch.Could you provide more information like the latest commit ID or code snippet .

SOD-DOB commented 1 year ago

public static final long DEFAULT_START_MESSAGE_OFFSET = -1; This parameter is in RocketMQConfig https://github.com/apache/rocketmq-flink/blob/main/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQConfig.java

When I read rocketMq for the first time, the rocketMq console displayed the offset=0 of the queue in the current topic. At this time, I started flink-rocketmq to consume the topic, it would report the following error, and it could not consume the queue with offset=0

java.lang.RuntimeException: org.apache.rocketmq.client.exception.MQClientException: offset < 0
For more information, please visit the url, http://rocketmq.apache.org/docs/faq/
    at org.apache.rocketmq.flink.legacy.RocketMQSourceFunction.lambda$null$1(RocketMQSourceFunction.java:339) ~[blob_p-b39848a6520d4473030e80a1af68842bda0d1497-439037d8a6ae48b5fa69464d73041c8f:?]
    at org.apache.rocketmq.flink.legacy.common.util.RetryUtil.call(RetryUtil.java:46) ~[blob_p-b39848a6520d4473030e80a1af68842bda0d1497-439037d8a6ae48b5fa69464d73041c8f:?]
    at org.apache.rocketmq.flink.legacy.RocketMQSourceFunction.lambda$run$2(RocketMQSourceFunction.java:243) ~[blob_p-b39848a6520d4473030e80a1af68842bda0d1497-439037d8a6ae48b5fa69464d73041c8f:?]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_302]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_302]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_302]

so I think It's should set offset = 0 instead -1 in org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java image