memiiso / debezium-server-iceberg

Replicates any database (CDC events) to Apache Iceberg (To Cloud Storage)
Apache License 2.0
200 stars 36 forks source link

Correctness issue when handling UPDATE events #403

Closed maximethebault closed 1 month ago

maximethebault commented 2 months ago

As stated in the documentation, when running in upsert mode, it is recommended to use Event Flattening

In case of an UPDATE, the Event Flattening will discard the "before" payload and only keep the "after" payload, which is what's needed to write the updated record.

However, when writing the equality delete files, the full record is also written in the equality delete files, with the "after" version. This is not what Iceberg expects.

Excerpt of the spec (emphasis is mine):

Equality delete files identify deleted rows in a collection of data files by one or more column values, and may optionally contain additional columns of the deleted row.

Another excerpt of the spec about delete file stats:

Manifests hold the same statistics for delete files and data files. For delete files, the metrics describe the values that were deleted.

Because of this, this will create correctness issues for some queries with filters on updated columns. For example, if column A was changed from 0 to 1, running a SELECT query with WHERE A=0 will return the old record because of Iceberg's metadata pruning.

In its sink implementation, Tabular uses the following solution: only the column(s) holding the primary key are written into the equality delete file, instead of the whole record.

ismailsimsek commented 2 months ago

Thank you @maximethebault for reporting this, i will look into this. and feel free to submit a fix if you like to contribute.

maximethebault commented 2 months ago

I have two options in mind:

  1. apply the same fix as what Tabular does, by just passing the key column(s) - it is a good enough fix, but highly selective query like the example I posted above would not leverage delete files pruning, leading to suboptimal query optimization (and equality deletes are very costly on the read path in iceberg's current engine implementations)
  2. tweak the debezium SMT to extract both before and after payload, pass the before payload in the equality deletes, and pass the after payload in the regular data files. But it's going to require much more work for at least two reasons : first, there are various edge cases to address (partially or completely empty before payload for which we need to fallback to the after payload), second, the changes are going to be more widespread. Plus, I'm using the kafka connect variant of this project where the usage of the SMT is "hardcoded" (see there), and I'm going to work from this code. I don't know if all changes would be compatible with this project.

Before choosing between approach 1 and 2, there is a performance aspect that I didn't factor in: as I said, equality deletes are very costly on the read path. The biggest cost is memory : all equality deletes that apply to the current query are kept in memory to be applied. But I don't know if it's keeping whole records in memory, or just the equality delete column(s). If it's currently keeping whole records, approach 1 is probably the best also for performance aspects. I'll look into this and tell you my findings.

ismailsimsek commented 2 months ago

@maximethebault im leaning more towards option 1, it seems like Flink is also using same solution as tabular solution.

im thinking, Use the core iceberg StructProjection class. and extend it if needed, extend it like tabular doing. both classes(tabular-kafka, iceberg) seems identical actually, so the core class doesnt need more logic it seems, it is sufficient.

maximethebault commented 2 months ago

Before choosing between approach 1 and 2, there is a performance aspect that I didn't factor in: as I said, equality deletes are very costly on the read path. The biggest cost is memory : all equality deletes that apply to the current query are kept in memory to be applied. But I don't know if it's keeping whole records in memory, or just the equality delete column(s). If it's currently keeping whole records, approach 1 is probably the best also for performance aspects. I'll look into this and tell you my findings.

Regarding this, Iceberg is keeping just the equality delete column(s) in memory, so all in all, performance-wise, option 2 sounds better. Complexity-wise, option 1 sounds better.

I think I'll give a shot to option 2 to have a grasp at how complex it is and fallback to option 1 if it's too complex.

ismailsimsek commented 2 months ago

@maximethebault i created PR with the option 1, currently investigating the error.

regarding delete file performance. i believe its mater of using writer.deleteKey(row) vs writer.delete(row), right? it seems like writer.deleteKey only writes key fields instead of writing entire row as equality delete.

maximethebault commented 1 month ago

Sorry for getting back to you late, and thanks for the fix.

regarding delete file performance. i believe its mater of using writer.deleteKey(row) vs writer.delete(row), right? it seems like writer.deleteKey only writes key fields instead of writing entire row as equality delete.

Yes, performance can be improved by writing the whole "before" record in the equality delete files, because then Iceberg is able to generate file statistics (upper bound, lower bound, null value count, etc.), which helps pruning equality delete files.

These stats wouldn't be used in basic select queries (like SELECT * FROM table), but become useful when the queries are highly selective (multiple WHERE conditions), because these filters can take advantage of file statistics to avoid reading some equality delete files.

Equality deletes are resource-intensive, especially memory-intensive, and when they start pilling up because of a highly updated table, not reading useless one is a big performance gain.

We still plan on providing option 2 as a fork of the kafka-connect variant of this project.

Other sink projects like Tabular's one (might be the same for Flink) are currently unable to do such an optimization unless they change their required input format to something that can hold both a before and an after payload.

This is however possible for this sink because it relies on Debezium-formatted messages, which hold a before and an after payload.

I'm closing this issue but will keep you updated on our progress if you're interested!