StarRocks / starrocks-connector-for-apache-flink

Apache License 2.0
195 stars 156 forks source link

[Enhancement] Try to cleanup lingering transactions when restoring in exactly-once mode #271

Closed banmoy closed 1 year ago

banmoy commented 1 year ago

What type of PR is this:

Which issues of this PR fixes :

Fixes #

Problem Summary(Required) :

What's the problem

When using exactly-once, the connector will not abort the PREPARED transactions when the flink job failovers or exits because it's 2PC mechanism. Some of those PREPARED transactions may be in a successful checkpoint, and will be committed when the job restores from the checkpoint, but some of them are just useless, and should be aborted, otherwise they will be lingering in StarRocks until timeout which maybe make StarRocks unstable. We should try to abort those lingering transactions when restoring

How to solve it

When flink job restores, the connector will try to find those lingering transactions, and abort them. The key is how to find those transactions because the labels of them are not stored in the checkpoint. Here we design a label generator (ExactlyOnceLabelGenerator) to solve it

  1. the user must set option sink.label-prefix which is used as the prefix of the labels, and it must be unique across all the ingestions, including flink connector, broker load and routine load, running on the same StarRocks cluster
  2. the connector will generate label in the format {labelPrefix}-{tableName}-{subtaskIndex}-{id}.
    • the subtaskIndex will make the label unique across subtasks if the sink writes parallel
    • id is incremental, and it will make the label unique across different transactions in a subtask
  3. when checkpointing, current id will be stored as the state in the checkpoint, and the labels whose ids are less than the current id must be successful, and only those labels whose ids are equal or larger than the current id can be lingering
  4. when restoring, read the current id from the checkpoint, construct the label with the id, and get label status from StarRocks. The transaction is lingering if it's in PREPARED state, and should abort it

Checklist:

dyp12 commented 1 year ago

This PREPARED transaction is because the import encountered an exception. So when the import encountered an exception,Should the PREPARED transaction be closed and then restarted ? Each task needs to be set with sink.label-prefix, which makes it easy to repeat

banmoy commented 1 year ago

@dyp12 Thanks for your comments

  1. The transaction is set to PREPARED because a checkpoint is triggered, and we need to prepare it, see StarRocksDynamicSinkFunctionV2#snapshotState(). It will be committed finally when StarRocksDynamicSinkFunctionV2#notifyCheckpointComplete() is called which indicates the Flink checkpoint is successful globally. This is the two-phase-commit mechanism of Flink to implement exactly-once. Before notifyCheckpointComplete, we can not abort it even an exception happens because it may lead to data loss if the checkpoint is successful globally but has not notified this subtask.
  2. It brings burden for users to keep sink.label-prefix unique, but it seems there is not a better solution currently. The solution is similar to that of Flink Kafka connector, see sink.transactional-id-prefix