delta-io / delta-rs

A native Rust library for Delta Lake, with bindings into Python
https://delta-io.github.io/delta-rs/
Apache License 2.0
1.98k stars 365 forks source link

Segmentation Fault during DeltaTable merge #2546

Open nfoerster2 opened 1 month ago

nfoerster2 commented 1 month ago

Environment

Delta-rs version: 0.17.4

Binding: Python 3.12

Environment:

K8s resources: requests: memory: 25Gi cpu: "3" limits: memory: 60Gi cpu: "4"

Environment: adlfs==2024.2.0 agate==1.7.1 aiohttp==3.9.5 aiosignal==1.3.1 annotated-types==0.7.0 attrs==23.2.0 azure-core==1.30.1 azure-datalake-store==0.0.53 azure-identity==1.15.0 azure-storage-blob==12.20.0 Babel==2.15.0 certifi==2024.2.2 cffi==1.16.0 charset-normalizer==3.3.2 click==8.1.7 colorama==0.4.6 cryptography==42.0.7 dbt-core==1.7.15 dbt-duckdb==1.7.4 dbt-extractor==0.5.1 dbt-semantic-interfaces==0.4.4 deltalake==0.17.4 duckdb==0.10.1 duckdb_deltalake_dbt==0.2.3rc1 frozenlist==1.4.1 fsspec==2024.3.1 idna==3.7 importlib-metadata==6.11.0 isodate==0.6.1 Jinja2==3.1.4 jsonschema==4.22.0 jsonschema-specifications==2023.12.1 leather==0.4.0 Logbook==1.5.3 loguru==0.7.2 MarkupSafe==2.1.5 mashumaro==3.13 minimal-snowplow-tracker==0.0.2 more-itertools==10.2.0 msal==1.28.0 msal-extensions==1.1.0 msgpack==1.0.8 multidict==6.0.5 networkx==3.3 numpy==1.26.4 packaging==24.0 pandas==2.2.2 parsedatetime==2.6 pathspec==0.11.2 polars==0.20.29 portalocker==2.8.2 protobuf==4.25.3 pyarrow==15.0.2 pyarrow-hotfix==0.6 pycparser==2.22 pydantic==2.7.1 pydantic_core==2.18.2 PyJWT==2.8.0 python-dateutil==2.9.0.post0 python-dotenv==1.0.1 python-slugify==8.0.4 pytimeparse==1.1.8 pytz==2024.1 PyYAML==6.0.1 referencing==0.35.1 requests==2.32.2 rpds-py==0.18.1 setuptools==69.5.1 six==1.16.0 sqlparse==0.5.0 text-unidecode==1.3 typing_extensions==4.12.0 tzdata==2024.1 urllib3==1.26.18 wheel==0.43.0 yarl==1.9.4 zipp==3.19.0


Bug

What happened: I have a script which does data munging with duckdb and deltatable. After duckdb query finished I call the arrow() function and pass it to a merge function on a deltatable on azure blob storage. On cloud I get a segmentation fault during merge operation, however locally on my Macbook Pro with M2 and about 8GB RAM it processes all 15M rows in 5min. Local test uses same storage location on azure blob.

I tried during debugging to use a batch reader although cloud resources are much bigger than on the Macbook. Chunk_size 10k rows succeed but is increadible slow, however chunk_size 100k and above results in Segmentation Fault. The column has 17 columns, so 100k should fit in a very small memory. I observed memory in top in parallel, memory is around 10% when segmentation fault happens.

What you expected to happen: Same behavior in cloud as on local. Local deltatable run should be much slower.

How to reproduce it: Here is my code:

dt_curated_errors = DeltaTable(
    "abfs://featurestore/curated_errors", version=dt_curated_errors_last_version
)

df = quack.sql(f"select * from vw_errors where 1=1 {where_statement}").fetch_arrow_reader(chunk_size)

dt_curated_errors_last_version = merge(
    dt_curated_errors,
    df,
    ["serialnumber", "ts", "state", "label", "id", "bucket", "partition_year_month"],
)
def merge(dt, results, merge_keys, columns=None, update=True, insert=True):
    merge_stmts = []
    for merge_key in merge_keys:
        merge_stmts.append(f"target.{merge_key} = source.{merge_key}")

    while True:
        try:
            batch = results.read_next_batch()
            if update:
                merger = dt.merge(
                    source=batch,
                    predicate=" and ".join(merge_stmts),
                    source_alias="source",
                    target_alias="target",
                )
                if columns is None:
                    log.info(f"update: {merger.when_matched_update_all().execute()}")
                else:
                    log.info(f"update: {merger.when_matched_update(columns).execute()}")

            if insert:
                merger = dt.merge(
                    source=batch,
                    predicate=" and ".join(merge_stmts),
                    source_alias="source",
                    target_alias="target",
                )
                log.info(f"insert: {merger.when_not_matched_insert_all().execute()}")
        except StopIteration:
            log.info("Finished all batches")
            break

...

More details: Local run:

2024-05-28 13:56:55.033 | INFO     | __main__:merge:121 - update: {'num_source_rows': 14021956, 'num_target_rows_inserted': 0, 'num_target_rows_updated': 0, 'num_target_rows_deleted': 0, 'num_target_rows_copied': 0, 'num_output_rows': 0, 'num_target_files_added': 0, 'num_target_files_removed': 0, 'execution_time_ms': 12624, 'scan_time_ms': 0, 'rewrite_time_ms': 3527}
2024-05-28 13:59:48.201 | INFO     | __main__:merge:132 - insert: {'num_source_rows': 14021956, 'num_target_rows_inserted': 12994925, 'num_target_rows_updated': 0, 'num_target_rows_deleted': 0, 'num_target_rows_copied': 0, 'num_output_rows': 12994925, 'num_target_files_added': 1371, 'num_target_files_removed': 0, 'execution_time_ms': 172053, 'scan_time_ms': 0, 'rewrite_time_ms': 162790}

100k batch on cluster:

Fatal Python error: Segmentation fault

Thread 0x00007f6a9ae4d700 (most recent call first):
  File "/usr/local/lib/python3.12/concurrent/futures/thread.py", line 89 in _worker
  File "/usr/local/lib/python3.12/threading.py", line 1010 in run
  File "/usr/local/lib/python3.12/threading.py", line 1073 in _bootstrap_inner
  File "/usr/local/lib/python3.12/threading.py", line 1030 in _bootstrap

Thread 0x00007f6a9b64e700 (most recent call first):
  File "/usr/local/lib/python3.12/selectors.py", line 468 in select
  File "/usr/local/lib/python3.12/asyncio/base_events.py", line 1949 in _run_once
  File "/usr/local/lib/python3.12/asyncio/base_events.py", line 641 in run_forever
  File "/usr/local/lib/python3.12/threading.py", line 1010 in run
  File "/usr/local/lib/python3.12/threading.py", line 1073 in _bootstrap_inner
  File "/usr/local/lib/python3.12/threading.py", line 1030 in _bootstrap

Current thread 0x00007f6ac63a2740 (most recent call first):
  File "/usr/local/lib/python3.12/site-packages/deltalake/table.py", line 1800 in execute
  File "/tmp/cpm_prediction_duckdb2.py", line 134 in merge
  File "/tmp/cpm_prediction_duckdb2.py", line 385 in <module>

Extension modules: numpy.core._multiarray_umath, numpy.core._multiarray_tests, numpy.linalg._umath_linalg, numpy.fft._pocketfft_internal, numpy.random._common, numpy.random.bit_generator, numpy.random._bounded_integers, numpy.random._mt19937, numpy.random.mtrand, numpy.random._philox, numpy.random._pcg64, numpy.random._sfc64, numpy.random._generator, pyarrow.lib, pyarrow._hdfsio, pyarrow._compute, pyarrow._acero, pyarrow._fs, pyarrow._csv, pyarrow._json, pandas._libs.tslibs.ccalendar, pandas._libs.tslibs.np_datetime, pandas._libs.tslibs.dtypes, pandas._libs.tslibs.base, pandas._libs.tslibs.nattype, pandas._libs.tslibs.timezones, pandas._libs.tslibs.fields, pandas._libs.tslibs.timedeltas, pandas._libs.tslibs.tzconversion, pandas._libs.tslibs.timestamps, pandas._libs.properties, pandas._libs.tslibs.offsets, pandas._libs.tslibs.strptime, pandas._libs.tslibs.parsing, pandas._libs.tslibs.conversion, pandas._libs.tslibs.period, pandas._libs.tslibs.vectorized, pandas._libs.ops_dispatch, pandas._libs.missing, pandas._libs.hashtable, pandas._libs.algos, pandas._libs.interval, pandas._libs.lib, pandas._libs.ops, pandas._libs.hashing, pandas._libs.arrays, pandas._libs.tslib, pandas._libs.sparse, pandas._libs.internals, pandas._libs.indexing, pandas._libs.index, pandas._libs.writers, pandas._libs.join, pandas._libs.window.aggregations, pandas._libs.window.indexers, pandas._libs.reshape, pandas._libs.groupby, pandas._libs.json, pandas._libs.parsers, pandas._libs.testing, pyarrow._dataset, pyarrow._dataset_orc, pyarrow._parquet, pyarrow._parquet_encryption, pyarrow._dataset_parquet_encryption, pyarrow._dataset_parquet, pyarrow._hdfs, pyarrow._gcsfs, pyarrow._s3fs, yaml._yaml, _cffi_backend, charset_normalizer.md, requests.packages.charset_normalizer.md, requests.packages.chardet.md, markupsafe._speedups, google._upb._message, msgpack._cmsgpack, multidict._multidict, yarl._quoting_c, aiohttp._helpers, aiohttp._http_writer, aiohttp._http_parser, aiohttp._websocket, frozenlist._frozenlist (total: 84)
Segmentation fault (core dumped)

10k batch on cluster:

2024-05-28 11:48:36.520 | INFO     | __main__:merge:134 - update: {'num_source_rows': 10000, 'num_target_rows_inserted': 0, 'num_target_rows_updated': 0, 'num_target_rows_deleted': 0, 'num_target_rows_copied': 0, 'num_output_rows': 0, 'num_target_files_added': 0, 'num_target_files_removed': 0, 'execution_time_ms': 3905, 'scan_time_ms': 0, 'rewrite_time_ms': 16}
2024-05-28 11:48:55.301 | INFO     | __main__:merge:145 - insert: {'num_source_rows': 10000, 'num_target_rows_inserted': 1415, 'num_target_rows_updated': 0, 'num_target_rows_deleted': 0, 'num_target_rows_copied': 0, 'num_output_rows': 1415, 'num_target_files_added': 107, 'num_target_files_removed': 0, 'execution_time_ms': 18672, 'scan_time_ms': 0, 'rewrite_time_ms': 14681}
2024-05-28 11:49:14.429 | INFO     | __main__:merge:134 - update: {'num_source_rows': 10000, 'num_target_rows_inserted': 0, 'num_target_rows_updated': 8449, 'num_target_rows_deleted': 0, 'num_target_rows_copied': 873709, 'num_output_rows': 882158, 'num_target_files_added': 846, 'num_target_files_removed': 847, 'execution_time_ms': 18944, 'scan_time_ms': 0, 'rewrite_time_ms': 14578}
2024-05-28 11:49:30.345 | INFO     | __main__:merge:145 - insert: {'num_source_rows': 10000, 'num_target_rows_inserted': 1623, 'num_target_rows_updated': 0, 'num_target_rows_deleted': 0, 'num_target_rows_copied': 0, 'num_output_rows': 1623, 'num_target_files_added': 114, 'num_target_files_removed': 0, 'execution_time_ms': 15823, 'scan_time_ms': 0, 'rewrite_time_ms': 11333}
ion-elgreco commented 1 month ago

Why is the one, on the cluster showing logs with asyncio and threading?

nfoerster2 commented 1 month ago

I spent more time in investigating, seems to be a problem on amd64 with multiple partition levels which are also part of predicates. The reproduce.py script attached below runs on arm64.

I attached:

Core dump: https://file.io/3K1NXipK9eLy

Reproduce python script: reproduce.py.zip

Working script with just one partition: unittest_single_partition.py.zip

Testdata: test_first_16000.parquet.zip

nfoerster2 commented 1 month ago

Why is the one, on the cluster showing logs with asyncio and threading?

I think its crashing in the Rust code, attached core dump may give information about that

nfoerster2 commented 3 weeks ago

did someone had the chance to check the core dump?

BR