apache / paimon

Apache Paimon is a lake format that enables building a Realtime Lakehouse Architecture with Flink and Spark for both streaming and batch operations.
https://paimon.apache.org/
Apache License 2.0
2.44k stars 959 forks source link

[Feature] Create new tag for flink savepoint #1635

Closed FangYongs closed 1 year ago

FangYongs commented 1 year ago

Search before asking

Motivation

Currently Paimon supports tag to store data which can be associated with flink savepoint. When Flink perform a savepoint on sink operator, it can create a tag automatically after create a snapshot.

Solution

No response

Anything else?

No response

Are you willing to submit a PR?

liming30 commented 1 year ago

@FangYongs I also reproduce this problem when testing locally, can you assign it to me?

liming30 commented 1 year ago

At present, the type of checkpoint cannot be distinguished in the operator, so I created an issue(FLINK-32698) in the flink community. Maybe we can wait for this issue to be discussed before proceeding.

FangYongs commented 1 year ago

@liming30 I think we can change the implementation of PrepareCommitOperator in paimon not extends AbstractStreamOperator, then you can implement method OperatorSnapshotFutures snapshotState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions, CheckpointStreamFactory storageLocation) throws Exception in it which has CheckpointOptions as parameter and you can get SnapshotType.

At the same time, you can push FLINK-32698, I think it will be a long time after it is finished.

JingsongLi commented 1 year ago

Hi @FangYongs , can you explain what scenario requires this ability?

FangYongs commented 1 year ago

@JingsongLi This is used for data incremental correction for streaming processing. For example, there's a Flink streaming etl job reads data from kafka and writes results to Paimon and we will regularly trigger savepoints in Flink to ensure job recovery in case of disaster. We sometimes need to redo data for kafka from specified timestamp, and resume Flink streaming etl job from the specified offset. At present, we cannot incrementally recover and can only discard the results of previous computation. But with Flink savepoint and tag in Paimon, we can rollback data in Paimon to given tag and resume Flink job from given savepoint to incremental streaming process without lose any data. The detail of incremental recover is described in b) Increment Recover of PIP-5

FangYongs commented 1 year ago

@liming30 I reviewed this PR overall, there are two issues and you can consider whether it need to create some new issues for them:

  1. How to get the specified tag name or id for given savepoint path, so that users can rollback the table to that tag
  2. We need to validate the tag in the table when job is resumed from given savepoint

What do you think?

JingsongLi commented 1 year ago

Thanks @FangYongs , I see. We can document this in the PR~