Netflix / iceberg

Iceberg is a table format for large, slow-moving tabular data
Apache License 2.0
468 stars 59 forks source link

Reading Deltas via Metadata #48

Open omervk opened 5 years ago

omervk commented 5 years ago

(this is very similar in philosophy to #47 and it would be good to read that before this, same caveats applying)

A job that wants to read only new data since the last time it ran must understand what the high-water mark was and read new data from its source based on a predicate. For instance:

val newData = spark
  .read
  /* ... */
  .filter($"day" === "2018-08-05")

However, we can base our reads on the building-up of snapshots along time, so if our snapshots are S1, S2, S3 and S4 and the last snapshot we processed was S1, we can read the new data from S2, S3 and S4 and skip the filtering completely. This would essentially make our high-water mark metadata-based, rather than data-based.

This can be achieved using the low-level Iceberg API, but not using the Spark API, which would be a great addition to the project.

Here's a sketch of how this API may look like:

spark
  .read
  .format("iceberg")
  .snapshots(2, 3, 4)
  .load(path)

Note: Specifying the list of snapshots would also let this API support other use cases, such as parallel-processing of snapshots, etc.

rdblue commented 5 years ago

This can be done fairly easily by adding key-value properties when reading with Spark. We plan to do this to implement AS OF SYSTEM TIME SQL statements as well. You'd pass extra information and in the IcebergSource, use it to customize the scan.

Scans don't currently support filtering by what is "new" in a snapshot (or multiple snapshots) but that should be easy to add by extending the TableScan interface and the underlying BaseTableScan implementation.