delta-io / delta-rs

A native Rust library for Delta Lake, with bindings into Python
https://delta-io.github.io/delta-rs/
Apache License 2.0
1.98k stars 365 forks source link

pyarrow options for `write_delta` #2515

Closed geekodour closed 1 month ago

geekodour commented 1 month ago

I was trying to merge to a large table, everytime I am trying to merge, It's loading the entire table in memory and based on how python+polars work and how delta table merge works we're already taking up some memory.

Delta Lake merge operations typically require two passes over the source data. If your source data contains nondeterministic expressions, multiple passes on the source data can produce different rows causing incorrect results. Some common examples of nondeterministic expressions include the current_date and current_timestamp functions. In Delta Lake 2.2 and above this issue is solved by automatically materializing the source data as part of the merge command, so that the source data is deterministic in multiple passes. In Delta Lake 2.1 and below if you cannot avoid using non-deterministic functions, consider saving the source data to storage, for example as a temporary Delta table. Caching the source data may not address this issue, as cache invalidation can cause the source data to be recomputed partially or completely (for example when a cluster loses some of it executors when scaling down).

I think the caching logic mentioned here is still not applied in delta-rs.

Apart from these optimizations which are about memory release and allocation, I am facing an issue related to data loading into polars from delta lake created using delta-rs. Following is the issue description:

I want to be merging to a delta table, I can't seem to find a way to specify the partition keys it should use to lookup the tables. Should it be part of the predicate (using datafusion syntax) in delta_merge_options when using write_delta? When using polars we can pass pyarrow_options for both read_delta and scan_delta.

related docs:

related issues:

I am posting more of a polars related question here, but this directly translates to the delta-rs python api aswell so I think this question still relevant because I can't seem to find a way to specify partition in the table merger docs aswell: https://delta-io.github.io/delta-rs/api/delta_table/delta_table_merger/

From this discussion, @ion-elgreco mentions that merge is done in rust so we can't pass it to pyarrow.

I'll update this issue with more info related to this.

Please let me know if any more info here.

geekodour commented 1 month ago

For anyone who finds themselves here:

Here's what worked, (my issue was that i was stuck on a typo for hours)

# In our usecase, these if these 3 together are unique, then we have a match for a merge
identity_columns = ["a", "b", "c"]

# This can be computed from elsewhere based on your usecase
# the value of x and y are both string
lookup_partitions = ["x", "y"]

# These follow datafusion sql syntax
merge_predicate = " AND ".join([f"s.{i} = t.{i}" for i in identity_columns])
lookup_predicate = " OR ".join([f"t.block_range='{v}'" for v in lookup_partitions])
predicate = f"({lookup_predicate}) AND ({merge_predicate})"

df.write_delta(
    table_path,
    mode="merge",
    storage_options=storage_options,
    delta_merge_options={
        "predicate": predicate,
        "source_alias": "s",
        "target_alias": "t",
    },
).when_matched_update_all().when_not_matched_insert_all().execute()