Netflix / iceberg

Iceberg is a table format for large, slow-moving tabular data
Apache License 2.0
478 stars 60 forks source link

Promoting Idempotency Through Metadata #47

Open omervk opened 6 years ago

omervk commented 6 years ago

Currently, implementing idempotent jobs over Iceberg is done via data-based predicates. For instance, if a job run is presumed to have written the data for 2018-08-05, you will write something like:

df.write(t).overwrite($"day" === "2018-08-05")

However, this may be:

  1. Slow - Need to push down filter for partitions on rewrite and to calculate the boundary values for each write (might be slow if it's on-demand)
  2. Incomplete (false negatives) - the predicate doesn't cover it. e.g. late arriving data included in the previous output
  3. Overzealous (false positives) - the predicate covers it, but it's not data we want to overwrite. e.g. data that has arrived before or after this job's run for the same day
  4. Mix domain knowledge into the operations - each job needs to understand which field determines what it wrote and what value it wrote and preserve that value somewhere

To promote more complete idempotency, we can use the metadata Iceberg provides to revert previous snapshots based on their metadata. If, for instance, Partition P1 writes file F1, and we want to re-run the job that wrote it, we can write P2 which deletes F1 and writes F2 with the new data, effectively reverting P1.

The benefits from this would be:

  1. Snapshot isolation is preserved
  2. No duplicate data can be read (F1+F2)
  3. No incomplete data can be read (neither F1 nor F2)
  4. We can revert a snapshot, regardless of how far back it happened

Note: This would only be usable in cases where we are only appending new data in snapshots, so cases where we also regularly compact or coalesce files may not be supported.

To achieve this, we could:

  1. Use the com.netflix.iceberg.RewriteFiles operation, but this would keep us at a very low-level, close to Iceberg, and force us to manually manage the files ourselves.
  2. Use the com.netflix.iceberg.Rollback operation, but this only rolls back the previous snapshot, which is something we don't want to be tied to.
  3. Use the com.netflix.iceberg.DeleteFiles operation, but this would create a new snapshot, causing us to either read duplicate or incomplete data.

What could be great is an API that lets us have some sort of transaction over both high-level (Spark) and low-level (Iceberg) APIs, so that we could delete the files written in a snapshot and write data using Spark, only then committing the transaction and creating a new snapshot.

@rdblue I would love to hear what you think this kind of API would look like.

rdblue commented 6 years ago

I think we're talking about different use cases that are both idempotent:

I think the right choice depends on the job and how it is written. Our data engineers write jobs all the time that overwrite specific partitions and are idempotent, but we also have a separate system for them to run incremental processing.

Also, the point about overwrite being slow doesn't really apply unless what you're trying to delete is mixed with other data you're not trying to delete. This isn't intended to support that case.

I think there is a use case for incremental processing, so I'd like to see support for writes that also revert a previous commit. I'm skeptical about an API that bridges both Spark and Iceberg, though. To do this, I'd just pass the snapshot to revert through Spark's API as a DataFrameWriter option to Iceberg. Then let Iceberg handle the revert and commit.

I think this requires:

rdblue commented 6 years ago

@omervk, you might want to have a look at #52, which adds single-table transactions. That would enable you to revert a commit and replace its data in the same commit.

Also, I've updated the Snapshots section of the spec with a set of operations to help identify what changed in a snapshot. It would be great to hear your take on the update and whether revert should be one of those actions. If so, maybe the operation info should be an object instead of a string that can encode more information, like what previous snapshot was reverted or how many data files were added/removed.