delta-io / delta-rs

A native Rust library for Delta Lake, with bindings into Python
https://delta-io.github.io/delta-rs/
Apache License 2.0
1.97k stars 365 forks source link

Concurrent writers return error despite successfully written to the file #2609

Open Aiden-Frost opened 1 week ago

Aiden-Frost commented 1 week ago

Environment

Delta-rs version: 0.18.1

Binding: rust

Environment:


Bug

What happened: I have writers as python processes, writing to the same file location. Each writer is responsible for creating a pandas dataframe and writing to the exact file location. There are 4 different scenarios regarding this writing:

  1. Wrote to the file location and does not return error.
  2. Wrote to the file location and returns error. 2.1. Delta transaction failed, version 0 already exists 2.2. Generic error: A Delta Lake table already exists at that location
  3. Does not write to the file location and returns error. 3.1. Delta table already exists, write mode set to error

When the process throws error 2.1 or 2.2, I expect the write to fail. But when inspecting the file I observe that the writer's data is appended to the file.

For the below reproducible example, let's consider process-1 and process-2 had the error 2.2. Now I only expected process 3 to be present in the file but then the file contained

pd.read_parquet('s3a://test-bucket/file-4)
           x
0  Process 2
1  Process 1
2  Process 3

What you expected to happen: When the writer threw an error, I expected the write to fail. When multiple writers are writing to the same location then only one should succeed and the other writes should fail and return error.

How to reproduce it:

import multiprocessing
import pandas as pd
from deltalake import write_deltalake

storage_options = {'AWS_S3_LOCKING_PROVIDER': 'dynamodb', 'DELTA_DYNAMO_TABLE_NAME': 'delta_log'}

def print_process_name(process_name):
    for i in range(10):
        try:
            df = pd.DataFrame({'x': [process_name]})
            print(f"WRITING process: {process_name}, i: {i}")
            write_deltalake(f's3a://test-bucket/file-{i}', df, storage_options=storage_options)
        except Exception as e:
            print(f"Error: {e} process: {process_name}, i: {i}")
        else:
            print(f"SUCCESS Process: {process_name}, i: {i}")

if __name__ == "__main__":

    process_names = ["Process 1", "Process 2", "Process 3"]

    processes = []
    for name in process_names:
        process = multiprocessing.Process(target=print_process_name, args=(name,))
        processes.append(process)

    for process in processes:
        process.start()

    for process in processes:
        process.join()

More details:

Regarding the Generic error: A Delta Lake table already exists at that location, I believe this is handled in this part of the code in crates/core/src/logstore/mod.rs

    async fn is_delta_table_location(&self) -> DeltaResult<bool> {
        // TODO We should really be using HEAD here, but this fails in windows tests
        let object_store = self.object_store();
        let mut stream = object_store.list(Some(self.log_path()));
        if let Some(res) = stream.next().await {
            match res {
                Ok(_) => Ok(true),
                Err(ObjectStoreError::NotFound { .. }) => Ok(false),
                Err(err) => Err(err)?,
            }
        } else {
            Ok(false)
        }
    }

This function is called here crates/core/src/operations/create.rs:

            let table_state = if log_store.is_delta_table_location().await? {
                match mode {
                    SaveMode::ErrorIfExists => return Err(CreateError::TableAlreadyExists.into()),

For the Delta table already exists, write mode set to error, this is handled from python side in python/deltalake/writer.py

def write_deltalake 
....
            if mode == "error":
                raise FileExistsError(
                    "Delta table already exists, write mode set to error."
                )
wjones127 commented 1 week ago

When the writer threw an error, I expected the write to fail. When multiple writers are writing to the same location then only one should succeed and the other writes should fail and return error.

I think the information you are missing is that in a delta table, the writes happen in two stages: (1) write data files (Parquet), then (2) commit to the transaction log. It detects the table already exists and fails on step 2. The files created in step (1) that are part of the failed transaction can be cleaned up with the VACUUM operation.

Aiden-Frost commented 1 week ago

Thank you for the clarification. I have gone through the transaction and vacuum documentation. Following the example when I try to vacuum:

dt = DeltaTable("s3a://test-bucket/file-4")
dt.vacuum()
[]

It returns an empty list. Is there something I am missing to indicate to clean up files that have failed transaction?

wjones127 commented 1 week ago

You should read the documentation for the vacuum method, particularly the retention_hours and enforce_retention_duration parameters.

https://delta-io.github.io/delta-rs/api/delta_table/#deltalake.DeltaTable.vacuum

Aiden-Frost commented 1 week ago

After going through the documentation I have set the retention_hours=0 and set enforce_retention_duration=False But even after this I still get empty list for vacuum.

There are totally 3 parquet files generated by the sample program in a path, I observed that the 00000000000000000000.json file has one parquet file under the field value: add, while the other two parquet files are under add in the tmp commit.