delta-io / delta-rs

A native Rust library for Delta Lake, with bindings into Python
https://delta-io.github.io/delta-rs/
Apache License 2.0
2.21k stars 395 forks source link

CDC support in deltalog when writing delta table #2720

Closed dsalv closed 1 month ago

dsalv commented 2 months ago

Description

Current delta-rs version is not supporting CDC in deltalog. It says that CDC support will be added in Version 4 here.

What's the current timeline for this feature? How many months or years it can take to add support for CDC?

Use Case We need to read CDC from deltalog of delta table written using delta-rs.

Related Issue(s)

ion-elgreco commented 2 months ago

We already have CDC write support for update operations, @rtyler started looking at MERGE.

dsalv commented 2 months ago

We already have CDC write support for update operations, @rtyler started looking at MERGE.

That's awesome! Could you share some links?

mkp-jansen commented 2 months ago

We already have CDC write support for update operations, @rtyler started looking at MERGE.

Would be awesome to have it for MERGE!

waddahAldrobi commented 1 month ago

@ion-elgreco @rtyler

According to the below, insert-only operations can be efficiently computed from the transaction log. https://docs.delta.io/latest/delta-change-data-feed.html#change-data-storage

At least that's what Databricks claims that it can do. https://docs.databricks.com/en/delta/delta-change-data-feed.html#change-data-storage

I tried setting up a table with delta.enableChangeDataFeed = true, but the inserts were no longer registering in the delta table.

I'm not sure if my experiment was wrong, but do you know if this is supported?

Just to clarify, I'm talking about _change_type = insert image

ion-elgreco commented 1 month ago

@waddahAldrobi probably something on your side, this code works fine:

from deltalake import DeltaTable
import polars  as pl

df = pl.DataFrame({
    "foo": [1,2], "bar":['1','2']
})
for i  in range(2):
  df.write_delta('test_table', mode='append',  delta_write_options={"configuration":{"delta.enableChangeDataFeed":"true"}, "engine":"rust"})

dt= DeltaTable("test_table")
pl.from_arrow(dt.load_cdf())

shape: (4, 5)
┌─────┬─────┬──────────────┬─────────────────┬─────────────────────────┐
│ foo ┆ bar ┆ _change_type ┆ _commit_version ┆ _commit_timestamp       │
│ --- ┆ --- ┆ ---          ┆ ---             ┆ ---                     │
│ i64 ┆ str ┆ str          ┆ i64             ┆ datetime[ms]            │
╞═════╪═════╪══════════════╪═════════════════╪═════════════════════════╡
│ 1   ┆ 1   ┆ insert       ┆ 1               ┆ 2024-08-10 17:42:19.836 │
│ 2   ┆ 2   ┆ insert       ┆ 1               ┆ 2024-08-10 17:42:19.836 │
│ 1   ┆ 1   ┆ insert       ┆ 0               ┆ 2024-08-10 17:42:15.639 │
│ 2   ┆ 2   ┆ insert       ┆ 0               ┆ 2024-08-10 17:42:15.639 │
└─────┴─────┴──────────────┴─────────────────┴─────────────────────────┘
waddahAldrobi commented 1 month ago

Thanks @ion-elgreco this is what we needed! 🙏