apache / rocketmq-flink

RocketMQ integration for Apache Flink. This module includes the RocketMQ source and sink that allows a flink job to either write messages into a topic or read from topics in a flink job.
https://rocketmq.apache.org/
Apache License 2.0
139 stars 88 forks source link

Add startMode to specify offset when consumer start.Similar to kafka connector #59

Closed deemogsw closed 1 year ago

deemogsw commented 1 year ago

current: 1.Startmode is configured by ’consumer.offset.reset.to‘ This is an internal property to set the offset when consumer start. 2.Only three modes has been supported. 1.latest 2.earliest 3.timestamp When consumer.offset.reset.to = latest is set. if job restart from none checkpint,current offset will be reset to the maxOffset.You will lost the date before state of job turn to running. It has a lot of limits when job need whol stream data.

I'm working:

1.Add two mode:GROUP_OFFSETS and SPECIFIC_OFFSETS

2.Abandon ’consumer.offset.reset.to‘.Use setFunction.

env.addSource(new RocketMQSourceFunction(new SimpleStringValueDeserializationSchema(), consumerProps)
.setStartFromGroupOffset(StartupMode.latest))
.name("rocketmq-source")

When job is first running,it will consum from the latest offset. When job recover from checkPoint ,it will consum from the stored offset. When job has been modified,and restart with no state,it will consum from the group offset.

3.SPECIFIC_OFFSETS is a special case. It only be useful when job need recover from an unexpected error and none of checkpoint can touched.

RocketMQSourceFunction<String> source = new RocketMQSourceFunction<>(new SimpleStringDeserializationSchema(), props);
        HashMap<MessageQueue, Long> offsets = new HashMap<>();
        offsets.put(new MessageQueue("user_info","bigdate_node1",0),45985L);
        offsets.put(new MessageQueue("user_info","bigdate_node1",1),35917L);
        offsets.put(new MessageQueue("user_info","bigdate_node2",0),40980L);
        offsets.put(new MessageQueue("user_info","bigdate_node3",1),44237L);
        source.setStartFromSpecificOffsets(offsets);

4.Add metric currentFetchEventTimeLag:abs(fetchTime - storedTime) currentEmitEventTimeLag:abs(emitTime - storedTime) FLIP-33: Standardize Connector Metrics currentEmitEventTimeLag would be showed as delay in job deployments page at ali-cloud-product

image

It alse can be fetched in flink-web's metric page,as a judgment when the job delay.

image
SteNicholas commented 1 year ago

@deemogsw, thanks for the feature reporter. Do you have interest to create a pull request to contribute the feature?

deemogsw commented 1 year ago

@deemogsw, thanks for the feature reporter. Do you have interest to create a pull request to contribute the feature?

yeah,I have finished test case.After update the example code in README,I will push a branch