apache / iceberg

Apache Iceberg
https://iceberg.apache.org/
Apache License 2.0
5.91k stars 2.08k forks source link

[Feature Request] Support for change data capture #3941

Open NOD507 opened 2 years ago

NOD507 commented 2 years ago

A change data feed option like the one on delta lake is very usefull for incremental syncs between tables or systems.

CDC is one feature thats not on open source delta lake or iceberg. I dont think theres a format that supports CDC on spark open source at this moment.

Hidden partitioning is currently a plus for iceberg vs delta lake, and CDC could be another one.

https://docs.databricks.com/delta/delta-change-data-feed.html

flyrain commented 2 years ago

I'm working on a Iceberg CDC design doc. It will be like the CDF(change data feed) in the proprietary Delta. Will keep you posted. It'd be helpful if you can share your CDC use case.

SreeramGarlapati commented 2 years ago

@flyrain - we started this design doc, which scopes the problem down to the tables that are written with MoR, a while back - after a community sync up. pl. see if you wanna adapt it or even change the same doc.

flyrain commented 2 years ago

Thanks for sharing, @SreeramGarlapati! That is a good start and I'm glad you guys have started working on it. Can we have a discuss when my design doc is ready?

mohitgargk commented 2 years ago

+1

@flyrain We are looking into two possible approaches here.

  1. Materialised where, like DeltaLake Change Data Feed that is generated in the write path.
  2. Change Data Feed is evaluated at the read path by consumers.

At this point of time, we are grokking and spiking on approach #2. Would love to know your early thoughts on this.

flyrain commented 2 years ago

Hi @mohitgargk, we had multiple discussions internally and are working on a doc. Will share it later. Generally speaking, CDC on MOR is in a better position, we should target it first. CDC on COW is almost impossible without additional metadata column or extra logging. cc @aokolnychyi @RussellSpitzer

mohitgargk commented 2 years ago

@flyrain Agree on that. With CoW, without any row-identifier, it is not possible to establish link with the new state and old state of the records without ambiguities.

For the CDF record structure, we grokked a bit on the design and came up with this.

Above proposal is based on our use case where we are interested in change of a certain column but also want the values of other columns. If the whole state is part of CDF record, the consumer can avoid query/lookup/join with the table to get rest of the state.

cc - @SreeramGarlapati @rawataaryan9 @dharamshoo

NOD507 commented 2 years ago

I'm working on a Iceberg CDC design doc. It will be like the CDF(change data feed) in the proprietary Delta. Will keep you posted. It'd be helpful if you can share your CDC use case.

We are interested in CDC to be able to incrementally sync changes from a datalake to a datawarehouse. To be able to do ETL in the datalake and use the dwh for end user consumption.

NOD507 commented 2 years ago

@flyrain Agree on that. With CoW, without any row-identifier, it is not possible to establish link with the new state and old state of the records without ambiguities.

For the CDF record structure, we grokked a bit on the design and came up with this.

Above proposal is based on our use case where we are interested in change of a certain column but also want the values of other columns. If the whole state is part of CDF record, the consumer can avoid query/lookup/join with the table to get rest of the state.

cc - @SreeramGarlapati @rawataaryan9 @dharamshoo

This design would work great for our use case, is exactly what we need. Thanks for sharing

rajarshisarkar commented 2 years ago

Hi @flyrain Please let us know once the design doc is ready.

flyrain commented 2 years ago

We've discussed the solution we are working on in today's community sync-up. The solution can support both COW and MOR. We think it is the good first step to approach CDC. Will post the design doc soon.

flyrain commented 2 years ago

The design doc is here, https://docs.google.com/document/d/1bN6rdLNcYOHnT3xVBfB33BoiPO06aKBo56SZmuU9pnY/edit?usp=sharing.

Reo-LEI commented 2 years ago

@flyrain We are looking into two possible approaches here.

  1. Materialised where, like DeltaLake Change Data Feed that is generated in the write path.
  2. Change Data Feed is evaluated at the read path by consumers.

I drafted a design document for case 1 base on write-time logging.

But as @flyrain mentioned in email, the approach based on write-time logging would require changing the table spec and changing all engines to fit the new table spec. Therefore, I would tend to take the approach of @flyrain and try to integrate it into flink.

Just paste my approach here so we can find it later when we need it.

donatobarone commented 2 years ago

+1 we have started to investigate iceberg internally and we realised that the CDC available today was just for append operations, which is definitely not ideal as processes will have to be built around the point in time functionalities to be able to get the changes. Great that the community is looking into this

aokolnychyi commented 2 years ago

Based on Yufei’s design doc and what we discussed during the sync, I’d like to share my thoughts on what can be efficiently supported right now.

Assumptions

Iceberg does not distinguish the order of records within a snapshot. Users may manually deduce this using a data column. However, there is nothing in the metadata to support this at the moment. I believe it is sufficient to have a summary of what changed per snapshot for most use cases (except the audit use case). Specifically, if we add a record and immediately delete it in the same snapshot (Flink CDC), it is okay to skip this record and not report it in the CDC log.

From what I see, just knowing what records to remove and to add should be enough to support most use cases. I have an example below even if the target system is not Iceberg. In the future, we can add pre/post images but computing that with the current table metadata would require a join and would be expensive.

Whenever we apply a delta log to a target table, most CDC use cases rely on a primary key or a set of identity columns. I think it is reasonable to assume Iceberg tables should have identity columns defined to support generation of CDC records.

It is not required to output the entire deleted record if other columns are not stored in equality delete files. If values for other columns are not persisted, this would require an expensive join to reconstruct the entire row.

Unsupported

The CDC record can include:

_record_type, _commit_snapshot_id, _commit_timestamp, _commit_order, _identity_columns, col1, col2, ...

Algorithm (per snapshot)

The algorithm would only require one join to find identity columns for position deletes. However, it should be fairly efficient as position deletes are scoped to partitions and we know the exact file names to read. We don’t have to read any other files or do a full table scan.

I think it should be sufficient to have only delete/insert record types in most cases. A generic MERGE statement can be used to apply changes for a single snapshot to both Iceberg and non-Iceberg tables.

MERGE INTO target t USING source s
ON t.id_col = s.id_col AND s._record_type = 'DELETE'
WHEN MATCHED
  DELETE
WHEN NOT MATCHED
  INSERT …

If the destination table is another Iceberg table, we may skip the MERGE statement and write equality deletes and data files directly without querying the destination table.

Here are a few examples (inspired by what Yufei has put together) I went through to see if this approach is going to work (double check it, please).

image

dharamsahoo commented 2 years ago

Hi @aokolnychyi , thanks for the well illustrated writeup. I have a few comments related to the assumptions.

Output only delete and insert record types

Some consumers might want to directly consume the CDC records and generate events/notifications. It would be useful for such consumers to have CDC records relate to the actual operation on Iceberg table record. So having an Update record type would help to segregate it from Insert and Delete records in more convenient way.

Delete records may only include values for identity columns

For consumers wanting to generate notifications which include old & new values like sending email for customer preference update OR generate events when an attribute value crosses a threshold, the inclusion of non-identity column(s) in the Delete record (related to an Update operation on Iceberg table) will be very useful.

Also, I noticed that in example 2 above, CDC records are generated for unchanged records (id=106). For Copy-On-Write tables, would this be the behaviour of CDC? If so, for snapshots where a small fraction of records are added/modified, there would be too many CDC records.

aokolnychyi commented 2 years ago

So having an Update record type would help to segregate it from Insert and Delete records in more convenient way.

The problem right now is that there is no update in Iceberg. There are only inserts and deletes. An update is represented as a delete followed by an insert. That being said, there may be a way to construct an update record given delete and insert records. For example, we can shuffle the delete/insert records so that all record types for the same identity columns are next to each other.

delete, s1, 100, null
insert, s1, 100, 1

In this case, we can construct a post update image. I am not sure how we can construct a pre update image without joining the delete record with the target table (that's going to be expensive). We can do that more or less efficiently for position deletes and copy-on-write but equality deletes may include only values for identity columns. We will have to scan a lot of data to reconstruct a pre update image for equality deletes.

This would only work if the identity columns are not modified.

Also, I noticed that in example 2 above, CDC records are generated for unchanged records (id=106). For Copy-On-Write tables, would this be the behaviour of CDC?

This one is a little bit easier. To start with, we can report unchanged rows as it is exactly what happens in the table. Whenever we rewrite a file in copy-on-write, we delete all rows from that file and add new records where some records can be simply copied over. In the future, we can use the above idea and co-locate entries for the same identity columns. Then we can remove pairs where a record is deleted and added without any changes. This won't require any joins with the target table so won't be that expensive. Maybe, the action can have an option to perform this deduplication. That way, rows that were copied over in copy-on-write won't be part of the output.

aokolnychyi commented 2 years ago

The algorithm above is not complete and does not cover some edge cases. However, I think it makes sense to explore it and check if we can modify/extend it to cover more use cases.

aokolnychyi commented 2 years ago

After thinking more, we may change the assumptions a bit.

Revisited Assumptions

Iceberg does not distinguish the order of records within a snapshot. That's why Iceberg can only report a summary of what changed in a snapshot.

By default, rows added and removed in the same snapshot (e.g. Flink CDC) are not shown in the output. In most cases, we just want to see a net delta per snapshot. If a use case requires all records to be seen, it is still doable. For example, if a snapshot adds a data file A with two records (pos 0, 1) and a position delete file D that removes pos 0, we can output something like this.

 _record_type | _commit_snapshot_id | _commit_order | col1 | col2
-------------------------------------------------------------------
insert, s1, 0, 100, a
insert, s1, 0, 101, a
delete, s1, 1, 100, null

That means records within the same snapshot may have different _commit_order. Seems a little bit odd but kind of represents what happens in the Iceberg table. Audit use cases may need this.

Iceberg does not have a notion of an update, which means constructing pre/post images will require some computation. In a lot of cases, this won't be needed (e.g. refresh of a materialized view, syncing changes with an external system). I think we should prefer a more efficient algorithm if possible.

There seems to be a way to build pre/post update images (see the comment above) but it will require joins and equality deletes will be the trickiest. For instance, a single equality delete may match a number of records and we have to report all of them.

Whenever we apply a delta log to a target table, most CDC use cases rely on a primary key or a set of identity columns. I think it is reasonable to assume Iceberg tables should have identity columns defined to support generation of CDC records.

It is not required to output the entire deleted record if other columns are not stored in equality delete files. If values for other columns are not persisted, this would require an expensive join to reconstruct the entire row.

Open questions

MVP

Future

coolderli commented 2 years ago

Just share my thought.

aokolnychyi commented 2 years ago

Constructing pre/post images may be necessary. If the downstream is an external system that can handle the insert as upsert, the update_before(delete) can be dropped.

I agree pre/post images may be necessary and I think there will be a way to build them (like I mention in the comment above). It will require more resources and will be tricky for equality deletes. That's why the proposal is not to include them by default. However, I am not sure I agree that pre/post images are required to apply changes to other systems.

For example, consider the following CDC records (I ignored some columns).

 _record_type | _commit_snapshot_id | _commit_order | id | value
-------------------------------------------------------------------
delete, s1, 0, 100, null
insert, s1, 0, 100, a
insert, s1, 0, 101, a

If the external system supports MERGE statements, we can issue the following command:

MERGE INTO target t USING source s
ON t.id = s.id AND s._record_type = 'delete'
WHEN MATCHED
  DELETE
WHEN NOT MATCHED
  INSERT …

Even though we only had deletes/inserts, we still managed to represent an update in the external system.

But if the downstream requires aggregation operation like sum, the update_before can not be dropped.

Can you elaborate a little bit more to make sure I have a good example to think through? When I explored such cases, I though it would be sufficient to know what partitions/keys changed so that aggregations affected by this can be recomputed. For instance, if you have an aggregation by day and you know a particular day has either deletes or new inserts, the aggregation must be recomputed.

Because in the current implementation, the records have no metadata like create_timestamp, we can't determine the time of deletion and insertion, so maybe we have to delete before inserting. But this is unacceptable, this will cause data to jitter, and users will see the data decrease, and then the data returns to normal.

Can you give an example here too? In my examples, I assumed deletes with the same _commit_order are applied before inserts. Changes from different snapshots will have different _commit_order.

 _record_type | _commit_snapshot_id | _commit_order | id | value
-------------------------------------------------------------------
insert, s1, 0, 100, a
delete, s2, 1, 100, null
insert, s2, 1, 100, b
insert, s3, 2, 101, a

The above example would be append in s1, update in s2, another append in s3.

aokolnychyi commented 2 years ago

@flyrain @RussellSpitzer and I chatted a little bit about the design and what a potential implementation can look like. There is a promising idea of using _deleted metadata column for building delete records.

I'll summarize my current way of thinking.

Algorithm (per snapshot)

The algorithm is simple with the only downside is that it may be expensive to resolve equality deletes as they may not be scoped to any partitions. If there is a global equality delete, resolving it and finding all records that were deleted will require a full table scan. In the future, we may consider exposing an option to include equality deletes as-is without finding actually deleted records. If set to false, a delete record will only contain whatever values are in the delete file. Outputting equality deletes as-is may be enough in many cases.

How to apply changes to Iceberg tables? We should be able to apply a CDC log from one Iceberg table to another by simply converting the log to data and equality delete files without doing a MERGE operation.

How to apply changes to non-Iceberg tables? Use a MERGE operation. Note: whenever a CDC log is fetched, consumers may need to collapse changes that happened across snapshots to not violate the cardinality check in MERGE.

Do we have to output _identity_columns if equality deletes are resolved? Probably, not. That’s why we may omit the requirement for having identity columns defined (at least, for now).

What should happen if a row is added and removed in the same snapshot? Skip such records by default. If configured to output such records, we may do so by having different _commit_order within the same snapshot.

Can we support pre/post images? I think pre/post images can be computed if equality delete resolution is enabled and identity columns are defined and never changed. Then we can distribute and order records in a way that will co-locate deletes and inserts for the same key next to each other. This should be sufficient to produce pre and post images.

Shall we output unchanged rows that were copied over during copy-on-write? I'd output copied over rows by default as it is technically correct: we remove and add back such records from the table format perspective. We may expose an option in the future to skip such records but it will require extra work. Similarly to to pre/post images, if we can co-locate deletes and inserts for the same key next to each other, we may skip delete and insert if no value has changed.

flyrain commented 2 years ago

The draft PR is here, https://github.com/apache/iceberg/pull/4539. Let me know if you have any feedback.

stevenzwu commented 2 years ago

@aokolnychyi great write-up on the algorithm. I think the per-snapshot algorithm is a good starting point that should cover a lot of use cases.

Build a predicate to use when looking for data records removed by position deletes.

is this needed? position deletes are already applied when building insert records.

In the future, we may consider exposing an option to include equality deletes as-is without finding actually deleted records. If set to false, a delete record will only contain whatever values are in the delete file. Outputting equality deletes as-is may be enough in many cases.

Why don't we start/default to this simpler behavior of emitting equality delete as it is? Do we need to join and emit the full deleted rows?

Regarding more complex types like pre/post images, I am wondering if Iceberg CDC is the right source.

Let's discuss two scenarios

  1. Iceberg table is populated by a streaming writer which may read from an upstream CDC stream, which has rich context of insert, delete, update (pre/post). Then we don't need to use Iceberg as CDC source
  2. Iceberg table is populated by periodical batch writers (like Spark). there is no upstream CDC source. In this case, since there is no pre/post update concept in Iceberg, this is not relevant here.
donatobarone commented 1 year ago

I see there is a draft PR for this change, but no updates since the 20th of May, is there any approximate timeline on when this will be released?

flyrain commented 1 year ago

We've made a lot of progress on that. Here is the page to trace the progress, https://github.com/apache/iceberg/projects/26. There are more PRs merged and PRs in the review. Unfortunately, I don't have the permission to update the page. We don't have a timeline. I'd guess the CDC feature for copy-on-write will be available in 2 month. cc @aokolnychyi @RussellSpitzer

donatobarone commented 1 year ago

Any news on this feature? :) Once again thanks for working on this!

flyrain commented 1 year ago

Hi @donatobarone, here is the last PR (hopefully) for CDC on COW, https://github.com/apache/iceberg/pull/6012. Any review or feedback is welcome.

jhchee commented 1 year ago

@flyrain Can I have your confirm that this effort will unblock https://github.com/apache/iceberg/issues/1949

flyrain commented 1 year ago

Hi @jhchee, yes, it covers the use case in #1949. Here is the usage doc, https://iceberg.apache.org/docs/latest/spark-procedures/#change-data-capture

jhchee commented 1 year ago

This looks good. Can this be integrated with Spark structured streaming? We wish to have a failover when reading from the table/view.

donatobarone commented 1 year ago

@flyrain I don't fully understand what are the latest on this effort, could you shed some light please?

flyrain commented 1 year ago

Hi @jhchee, CDC is presented as a view at this moment. You will need to hook the Spark structured streaming by yourself. Hi @donatobarone, CDC tool is released with Iceberg 1.2.0+. Here is the usage doc, https://iceberg.apache.org/docs/latest/spark-procedures/#change-data-capture. There are limits at this moment though.

  1. It supports COW. MOR is not supported yet.
  2. Vectorized reader is WIP.
Mohsendream commented 10 months ago

does CDC support streaming use case like delta when we add the option .option("readChangeFeed", "true")