Currently Flink Paimon writer operator is stateless. However for an async lookup table (that is, specify both 'changelog-producer' = 'lookup' and 'changelog-producer.lookup-wait' = 'false'), when a job failure occurs, it is possible that some writers are still compacting records. When the job restarts, if there are no new records for these writers to write, they won't be created again and these buckets will remain un-compacted. Thus, some changelog might be lost because lookup tables rely on compaction to produce changelog.
This PR stores active buckets into Flink state for async lookup writer to make sure changelog can be produced.
Purpose
Currently Flink Paimon writer operator is stateless. However for an async lookup table (that is, specify both
'changelog-producer' = 'lookup'
and'changelog-producer.lookup-wait' = 'false'
), when a job failure occurs, it is possible that some writers are still compacting records. When the job restarts, if there are no new records for these writers to write, they won't be created again and these buckets will remain un-compacted. Thus, some changelog might be lost because lookup tables rely on compaction to produce changelog.This PR stores active buckets into Flink state for async lookup writer to make sure changelog can be produced.
Tests
WriterOperatorTest#testAsyncLookupWithFailure
API and Format
No changes.
Documentation
No new feature.