Closed JingsongLi closed 9 months ago
@JingsongLi we should use the new FLIP-27 source interface, right?
We probably don't want enumerator statically assign all discovered splits up front. Dynamic assignment is better for load balancing with straggler/outlier reader nodes.
Hi @stevenzwu , yes, The advantage is that the assignment will be more dynamic balanced.
It depends on the progress of FLIP-27. We are trying to implement Filesystem/Hive source on FLIP-27 in FLINK 1.12. And in order to achieve this goal, we are modifying the interfaces of FLIP-27 too. (FLIP-27 in Flink 1.11 is not ready)
If the time is not urgent, we can wait for FLINK 1.12.
@stevenzwu , we have implemented an internal version for flink streaming reader, which is not built on top of FLIP-27 now. Here is the pull request https://github.com/generic-datalake/iceberg-poc/pull/3/files for our own branch. As Jingsong described, once FLIP-27 is ready, we'd happy to switch the current implementation to FLIP-27.
@JingsongLi @openinx thx. We are currently implementing an Iceberg source based on FLIP-27 interface. Our initial goal is for backfill purpose. it is bounded but with streaming behavior. Meaning app code stayed with DataStream API, just switching source from Kafka to Iceberg. We are also very interested in streaming/continuous read pattern. It is not urgent. we can probably collaborate. Would love to see building blocks being pushed upstream slowly.
regarding TableScan.appendsBetween
, we might need more flexibility of fine-grained control. E.g. if Flink job is lagging behind or bootstrap from an old snapshot, we probably don't want to eagerly plan all the unconsumed FileScanTask
. That might blow up Flink checkpoint state if the enumerated list of FileScanTask
is too big.
I am thinking about two level of enumerations to keep the enumerator memory footprint in check.
DataOperations.APPEND
snapshots. It is cheap to track and checkpoint this listFileScanTask
up to a configurable number of oldest snapshots (e.g. 6) from the first stepif job is keeping up with the ingestion, we should only have one unconsumed snapshots.
That might blow up Flink checkpoint state if the enumerated list of FileScanTask is too big.
@stevenzwu , what is the maximum size of a table in your production environment ? I'm thinking whether it's worth to implement the two-phase enumerators in the first version.
If we have 1PB data and each file have the size 128MB, then it will have 8388608 files. If every FileScanTask
consume 1KB , then its state is ~ 8GB. That should be acceptable for the flink state backend.
Hi @stevenzwu , about TableScan.appendsBetween
, we can limit the snapshot number of scan, even scan only one at a time. Because TableScan.appendsBetween
seems to be just a combination of single incremental snapshots, we can handle only one snapshot at a time.
I was mainly discussing in the context of FLIP-27 source. Regardless how we implement the enumeration, there are two pieces of info that enumerator needs to track and checkpoint.
I was mainly concerned about the state size for the latter. That is where I was referring to throttle the eagerness of planned splits. I was thinking about using TableScan.useSnapshot(long snapshotId)
so that we can control how many snapshots we plan the splits into state.
Here are some additional benefits of enumerating splits snapshot by snapshot.
@openinx note that this is not keyed state where state is distributed among parallel tasks. Here, 8 GB operator state can be problematic enumerator state. I vaguely remember RocksDB can't handle a list larger than 1 GB. the bigger the list, the slower it gets. also if we do planTasks
(vs planFiles
), the number of splits can be a few times bigger. I can definitely buy the point of starting with sth simple, and optimize it later. It will be an internal change to the enumerator. So it has no user impact.
@JingsongLi Yeah, the key thing is how coordinator/enumerator controls how the splits are generated. I was saying that we may need some control/throttling there to avoid eagerly enumerate all pending snapshots so that the checkpointed split list is manageable/capped. I thought the idea TableScan.appendsBetween
was to run planFiles
or planTasks
between last planned snapshot and the latest table snapshot. that is what I was referring earlier as eager discovery/planning of all unconsumed splits.
NIT: I think we still need use appendsBetween(snapshot-1, snapshot)
since we want to get incremental data.
True, I think this is easy to do.
This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.
This issue has been closed because it has not received any activity in the last 14 days since being marked as 'stale'
Flink is famous for its streaming computation.
After https://github.com/apache/iceberg/pull/1346 , it is easy to build Flink streaming reading based on it. Unlike Spark, Flink streaming continuous monitor new Files of table, and directly send the splits to downstream tasks. The source don't need take care of micro-batch size, because the downstream tasks stores incoming splits into state, and consume one by one.
Monitor ----(Splits)-----> ReaderOperator
Monitor (Single task):
FlinkSplitGenerator
. (Actually usingTableScan.appendsBetween
).ReaderOperator (multiple tasks):
FlinkInputFormat
in a checkpoint cycle.