apache / rocketmq-flink

RocketMQ integration for Apache Flink. This module includes the RocketMQ source and sink that allows a flink job to either write messages into a topic or read from topics in a flink job.
https://rocketmq.apache.org/
Apache License 2.0
139 stars 88 forks source link

Persist consumer offset to broker when new msg found from queues. #36

Closed ShannonDing closed 1 year ago

ShannonDing commented 1 year ago

private void updateMessageQueueOffset(MessageQueue mq, long offset) throws MQClientException { offsetTable.put(mq, offset); if (!enableCheckpoint) { consumer.updateConsumeOffset(mq, offset); consumer.getOffsetStore().persist(consumer.queueWithNamespace(mq)); } }