delta-io / delta-rs

A native Rust library for Delta Lake, with bindings into Python
https://delta-io.github.io/delta-rs/
Apache License 2.0
1.98k stars 365 forks source link

Delete appears to be single threaded. #2574

Open adamfaulkner-at opened 3 weeks ago

adamfaulkner-at commented 3 weeks ago

Environment

Delta-rs version: 0.17.3

Binding: Rust

Environment:


Bug

What happened:

I'm trying to delete 18 rows from a delta lake table that has 100M rows in it. This table is stored in Amazon S3 as 38 parquet files, which are all about 100MB each. To accomplish this, I've built a predicate that uses an IN on the "rideid", which is the column that this table is sorted by.

    let session_config = SessionConfig::new()
        .set_bool("datafusion.execution.parquet.pushdown_filters", true)
        .set_bool("datafusion.execution.parquet.reorder_filters", true)
        .set_usize("datafusion.execution.target_partitions", 32);
    let ctx = SessionContext::new_with_config(session_config);
...
            let delete_predicate = col("rideid").in_list(
                overlap_results
                    .iter()
                    .flat_map(|overlap_result_batch| {
                        let rideid_column: &StringArray = overlap_result_batch
                            .column_by_name("rideid")
                            .unwrap()
                            .as_string();
                        (0..rideid_column.len()).map(|i| lit(rideid_column.value(i)))
                    })
                    .collect(),
                false,
            );
            let (delete_table, delete_metrics) = DeltaOps(table)
                .delete()
                .with_predicate(delete_predicate)
                .with_session_state(SessionState::new_with_config_rt(
                    ctx.copied_config(),
                    ctx.runtime_env(),
                ))
                .with_writer_properties(
                    WriterProperties::builder()
                        .set_compression(Compression::ZSTD(ZstdLevel::default()))
                        .build(),
                )
                .await?;
            println!(
                "Overlapping records deleted. Elapsed Time: {} {:?}",
                batch_start_time.elapsed().as_secs_f64(),
                delete_metrics
            );
Overlapping records deleted. Elapsed Time: 55.433372588 DeleteMetrics { num_added_files: 12, num_removed_files: 12, num_deleted_rows: Some(18), num_copied_rows: Some(32170369), execution_time_ms: 52076, scan_time_ms: 243, rewrite_time_ms: 51832 }

(overlap_result_batch is is just a RecordBatch that contains the rideids that I'd like to delete, I don't think it's important for understanding here.)

I don't expect this to be fast, since it does need to rewrite half of the data. What I'm observing though is that it only uses 1 core for 50 seconds or so, as it rewrites all 18 files one at a time.

What you expected to happen: I would expect for delete to concurrently rewrite all of the files that need rewriting instead of doing them one at a time. I'd expect this concurrency to be exposed in configuration somewhere.

How to reproduce it: Perform a delete of a table that contains multiple parquet files.

More details: