delta-io / delta-rs

A native Rust library for Delta Lake, with bindings into Python
https://delta-io.github.io/delta-rs/
Apache License 2.0
2.37k stars 415 forks source link

Optimizer Compact not running parallel to append writers #2700

Open Aiden-Frost opened 4 months ago

Aiden-Frost commented 4 months ago

Environment

Delta-rs version: 0.18.2

Binding:

Environment:


Bug

What happened: When compaction is happening parallel to append writers, the compaction fails due to CommitFailedError.

Traceback (most recent call last):
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "local_testing.py", line 505, in optimizers
    dt.optimize.compact()
  File "/Users/code/delta-rs/venv/lib/python3.9/site-packages/deltalake/table.py", line 1899, in compact
    metrics = self.table._table.compact_optimize(
_internal.CommitFailedError: Failed to commit transaction: 15

What you expected to happen: Compaction is successful.

How to reproduce it:

import pandas as pd
import pyarrow as pa
from deltalake import write_deltalake, DeltaTable
import os
import random
import multiprocessing as mp
from time import sleep

# Creating a dataframe with 50 columns and 10000 rows
num_columns = 50
num_rows = 10000

data = {
    f'col{i + 1}': [f'row{random.randint(1, 2)}' for j in range(num_rows)] for i in range(num_columns)
}

df = pd.DataFrame(data)

table = pa.Table.from_pandas(df)
table_path = "s3a:/deltalake_test"
os.environ["AWS_ACCESS_KEY_ID"] = "***"
os.environ["AWS_ENDPOINT_URL"] = "***"
os.environ["AWS_SECRET_ACCESS_KEY"] = "***"

partition_cols = ["col1", "col2", "col3", "col4"]

storage_options = {
    'AWS_S3_LOCKING_PROVIDER': 'dynamodb',
    'DELTA_DYNAMO_TABLE_NAME': 'delta_log',
}

def writers():
    for i in range(10):
        write_deltalake(table_path, table, partition_by=partition_cols
                        , mode="append", engine="rust", storage_options=storage_options)

def optimizers():
    print("RUNNING OPTIMIZERS")
    dt = DeltaTable(table_path, storage_options=storage_options)
    dt.optimize.compact()
    print("COMPLETED OPTIMIZERS")

if __name__ == '__main__':

    tc = DeltaTable.create(
        table_uri=table_path,
        schema=table.schema,
        mode="append",
        partition_by=partition_cols,
        storage_options=storage_options,
    )

    processes = []
    pn = [f"process{i}" for i  in range(7)]

    for name in pn:
        process = mp.Process(target=writers)
        processes.append(process)

    optimizer = mp.Process(target=optimizers)

    for process in processes:
        process.start()

    sleep(10) # just to make it start after some data has been added
    optimizer.start()

    for process in processes:
        process.join()

    optimizer.join()

More details:

brurucy commented 1 day ago

I second this very annoying bug.