delta-io / delta-rs

A native Rust library for Delta Lake, with bindings into Python
https://delta-io.github.io/delta-rs/
Apache License 2.0
1.98k stars 365 forks source link

Merge is slower than expected and loads more than expected into memory. #2573

Open adamfaulkner-at opened 3 weeks ago

adamfaulkner-at commented 3 weeks ago

Environment

Delta-rs version: 0.17.3

Binding: Rust

Environment:


Bug

What happened:

Given a source table with ~100M rows in it, stored as a delta lake table in S3, sorted by "rideid". (This is ~38 parquet files that are about 100MB each). I'm trying to "upsert" 1 row using code that looks like this:

        (table, _) = DeltaOps(table)
            .merge(source_df, col("source.rideid").eq(col("target.rideid")))
            .with_source_alias("source")
            .with_target_alias("target")
            .when_not_matched_insert(|insert| {
                COLUMNS
                    .iter()
                    .fold(insert, |insert, &column| {
                        insert.set(column, col(format!("source.{}", column)))
                    })
            })?
            .when_matched_update(|update| {
                COLUMNS.iter().fold(update, |update, &column| {
                    update.update(column, col(format!("source.{}", column)))
                })
            })?
            .await?;

(COLUMNS is simply an array that contains all 13 of the columns in the table.) This consumes all of my computer's memory then crashes.

I've tried partitioning the data by using a hash of the rideid, this doesn't seem to change the fact that I run out of memory and cannot run this operation.

What you expected to happen:

This is pretty surprising, because I can run the join in DataFusion pretty efficiently:

        let overlap_results = ctx.sql("SELECT target.rideid FROM target LEFT JOIN source ON target.rideid = source.rideid WHERE source.rideid IS NOT NULL").await?.collect().await?;

This query takes about 2 seconds and consumes only ~500MB of memory. I can build my own upsert on top of this sort of datafusion query, delete, and write that seems to work fine.

How to reproduce it:

Implement an "upsert" operation using merge on a table with ~100M rows, observe how much memory this consumes.

More details:

ion-elgreco commented 3 weeks ago

It needs to scan the entire table if you don't use partitioning, if you do partition then you need to give an explicit partition predicate to reduce the amount of partitions you read

adamfaulkner-at commented 3 weeks ago

Thanks for the reply @ion-elgreco , why does it need to scan the entire table into memory before it starts writing data? Is this just a lack of optimization, or is there something fundamental to what merge is doing that prevents this kind of optimization?

vegarsti commented 1 week ago

Thanks for the reply @ion-elgreco , why does it need to scan the entire table into memory before it starts writing data? Is this just a lack of optimization, or is there something fundamental to what merge is doing that prevents this kind of optimization?

It needs to scan the entire table because it needs to find out which rows the merge into condition applies to.

adamfaulkner-at commented 2 days ago

It needs to scan the entire table because it needs to find out which rows the merge into condition applies to.

I understand this. However, it doesn't need to hold the entire table in memory while it is performing the merge. It could do this in a streaming fashion – this is more or less what you get out of the box with datafusion.