delta-io / delta-rs

A native Rust library for Delta Lake, with bindings into Python
https://delta-io.github.io/delta-rs/
Apache License 2.0
2.21k stars 395 forks source link

`write_deltalake` with predicate throw index out of bounds #2867

Closed Cpaulyz closed 3 weeks ago

Cpaulyz commented 3 weeks ago

Environment

Delta-rs version: 0.19.2

Binding: Python


Bug

What happened:

stack backtrace:
   0:     0x7fb4ba025ef5 - <std::sys::backtrace::BacktraceLock::print::DisplayBacktrace as core::fmt::Display>::fmt::h1b9dad2a88e955ff
   1:     0x7fb4ba0541eb - core::fmt::write::h4b5a1270214bc4a7
   2:     0x7fb4ba02243f - std::io::Write::write_fmt::hd04af345a50c312d
   3:     0x7fb4ba0271f1 - std::panicking::default_hook::{{closure}}::h96ab15e9936be7ed
   4:     0x7fb4ba026ecc - std::panicking::default_hook::h3cacb9c27561ad33
   5:     0x7fb4ba027851 - std::panicking::rust_panic_with_hook::hfe205f6954b2c97b
   6:     0x7fb4ba0276b7 - std::panicking::begin_panic_handler::{{closure}}::h6cb44b3a50f28c44
   7:     0x7fb4ba0263b9 - std::sys::backtrace::__rust_end_short_backtrace::hf1c1f2a92799bb0e
   8:     0x7fb4ba027344 - rust_begin_unwind
   9:     0x7fb4b639d213 - core::panicking::panic_fmt::h3d8fc78294164da7
  10:     0x7fb4b639d407 - core::panicking::panic_bounds_check::h9397cb495d89a72d
  11:     0x7fb4b77bf0e4 - datafusion::datasource::physical_plan::file_scan_config::FileScanConfig::project::h4f85f8b0f1192f9d
  12:     0x7fb4b787e3a7 - datafusion::datasource::physical_plan::parquet::ParquetExecBuilder::build::hdee8b3243baa6147
  13:     0x7fb4b787dc9b - datafusion::datasource::physical_plan::parquet::ParquetExecBuilder::build_arc::h01ac3b224ad94879
  14:     0x7fb4b70665dd - deltalake_core::delta_datafusion::DeltaScanBuilder::build::{{closure}}::hc316dc8826884201
  15:     0x7fb4b7051f5c - deltalake_core::operations::write::prepare_predicate_actions::{{closure}}::h820b67c53313f52f
  16:     0x7fb4b705a121 - <deltalake_core::operations::write::WriteBuilder as core::future::into_future::IntoFuture>::into_future::{{closure}}::hae27ffe1a4cb6ccc
  17:     0x7fb4b65915d6 - tokio::runtime::park::CachedParkThread::block_on::h702d530dbbfc5cd6
  18:     0x7fb4b64979f8 - tokio::runtime::context::runtime::enter_runtime::hc00a8f641a2a999d
  19:     0x7fb4b66588cf - tokio::runtime::runtime::Runtime::block_on::h450db0e1c9d19b77
  20:     0x7fb4b65bc486 - pyo3::marker::Python::allow_threads::hd2c2bbfc84ab486f
  21:     0x7fb4b66f0451 - deltalake::write_to_deltalake::h383cbb97c9bb9288
  22:     0x7fb4b66f1d7d - deltalake::__pyfunction_write_to_deltalake::h6299c7779de4961f
  23:     0x7fb4b66c1b2c - pyo3::impl_::trampoline::trampoline::h957c000108d60843
  24:     0x7fb4b66f0621 - deltalake::<impl deltalake::write_to_deltalake::MakeDef>::_PYO3_DEF::trampoline::h5a6543fde0672b00
  25:     0x7fb4c95c7e0b - <unknown>
  26:     0x7fb4c95b35bc - _PyObject_MakeTpCall
  27:     0x7fb4c9676781 - _PyEval_EvalFrameDefault
  28:     0x7fb4c96617d5 - _PyFunction_Vectorcall
  29:     0x7fb4c9671fba - _PyEval_EvalFrameDefault
  30:     0x7fb4c965be73 - <unknown>
  31:     0x7fb4c965ce6f - _PyEval_EvalCodeWithName
  32:     0x7fb4c965cea3 - PyEval_EvalCode
  33:     0x7fb4c96f013d - <unknown>
  34:     0x7fb4c96fa12a - <unknown>
  35:     0x7fb4c9589cea - <unknown>
  36:     0x7fb4c96fa945 - PyRun_SimpleFileExFlags
  37:     0x7fb4c96fadf2 - Py_RunMain
  38:     0x7fb4c96faf39 - Py_BytesMain
  39:     0x7fb4c84fd7e5 - __libc_start_main
  40:     0x555f329c075e - _start
  41:                0x0 - <unknown>

pyo3_runtime.PanicException: index out of bounds: the len is 5 but the index is 5

What you expected to happen:

Overwrite partition data, merge schema.

How to reproduce it:

from deltalake import write_deltalake
from datetime import datetime
import pandas as pd
import boto3

table_path = "S3PATH"
session = boto3.Session()
credentials = session.get_credentials().get_frozen_credentials()
storage_options = {
    "AWS_ACCESS_KEY_ID": credentials.access_key,
    "AWS_SECRET_ACCESS_KEY": credentials.secret_key,
    "AWS_REGION": session.region_name,
    "AWS_S3_ALLOW_UNSAFE_RENAME": "true"
}

data = [
    (datetime(2024, 7, 31, 9, 30, 0), "AAPL", "20240731", 100, 11.1),
    (datetime(2024, 7, 31, 9, 30, 0), "GOOG", "20240731", 200, 11.1),
]
columns = ["ts", "ins", "date", "f1", "f2"]
df = pd.DataFrame(data, columns=columns)

predicate = 'date == 20240731'
write_deltalake(
    table_or_uri=table_path,
    data=df,
    partition_by="date",
    mode="overwrite",
    schema_mode="merge",
    predicate=predicate,
    storage_options=storage_options,
)

data = [
    (datetime(2024, 7, 31, 9, 30, 0), "AAPL", "20240731", 666, 666),
    (datetime(2024, 7, 31, 9, 30, 0), "GOOG", "20240731", 777, 777),
]
columns = ["ts", "ins", "date", "fb", "fc"]
df = pd.DataFrame(data, columns=columns)
write_deltalake(
    table_or_uri=table_path,
    data=df,
    partition_by="date",
    mode="overwrite",
    schema_mode="merge",
    predicate=predicate,
    storage_options=storage_options,
)

More details:

  1. It works well without predicate.
  2. It works well if second data frame is
    data = [
    (datetime(2024, 7, 31, 9, 30, 0), "AAPL", "20240731", 666),
    (datetime(2024, 7, 31, 9, 30, 0), "GOOG", "20240731", 777),
    ]
    columns = ["ts", "ins", "date", "fb"]
sherlockbeard commented 3 weeks ago

same code runs perfectly in 0.18.2 its bisect time

ion-elgreco commented 3 weeks ago

@sherlockbeard 👨‍💻🕵️