apache / datafusion

Apache DataFusion SQL Query Engine
https://datafusion.apache.org/
Apache License 2.0
5.49k stars 1.02k forks source link

`COPY ... PARTITIONED BY` with parquet causes "out of bounds" panic #10712

Open samuelcolvin opened 1 month ago

samuelcolvin commented 1 month ago

Describe the bug

While investigating #10709, I tried using datafusion CLI to require parquet files to a better size.

But I got a panic:

thread 'tokio-runtime-worker' panicked at /Users/samuel/.cargo/registry/src/index.crates.io-6f17d22bba15001f/datafusion-38.0.0/src/datasource/file_format/write/demux.rs:381:31:
index out of bounds: the len is 1 but the index is 1

To Reproduce

I can't share the file, but we have some parquet data with project_id and day columns (here both are interpreted as strings). I run the following:

RUST_BACKTRACE=1 datafusion-cli
DataFusion CLI v38.0.0
> CREATE EXTERNAL TABLE records
PARTITIONED BY (project_id, day) STORED AS PARQUET
LOCATION 'path/to/records/';
0 row(s) fetched. 
Elapsed 0.015 seconds.

> COPY records to 'path/to/records-big/' partitioned by (project_id, day) stored as parquet;
thread 'tokio-runtime-worker' panicked at /Users/samuel/.cargo/registry/src/index.crates.io-6f17d22bba15001f/datafusion-38.0.0/src/datasource/file_format/write/demux.rs:381:31:
index out of bounds: the len is 1 but the index is 1
stack backtrace:
   0: _rust_begin_unwind
   1: core::panicking::panic_fmt
   2: core::panicking::panic_bounds_check
   3: datafusion::datasource::file_format::write::demux::compute_take_arrays
   4: datafusion::datasource::file_format::write::demux::start_demuxer_task::{{closure}}
   5: tokio::runtime::task::core::Core<T,S>::poll
   6: tokio::runtime::task::harness::Harness<T,S>::poll
   7: tokio::runtime::scheduler::multi_thread::worker::Context::run_task
   8: tokio::runtime::scheduler::multi_thread::worker::Context::run
   9: tokio::runtime::context::scoped::Scoped<T>::set
  10: tokio::runtime::context::runtime::enter_runtime
  11: tokio::runtime::scheduler::multi_thread::worker::run
  12: <tokio::runtime::blocking::task::BlockingTask<T> as core::future::future::Future>::poll
  13: tokio::runtime::task::core::Core<T,S>::poll
  14: tokio::runtime::task::harness::Harness<T,S>::poll
  15: tokio::runtime::blocking::pool::Inner::run
note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.

If I remove partitioned by (project_id, day) it finishes fine.

Expected behavior

No response

Additional context

I also tried with v37.0.0 and got the same panic.

devinjdangelo commented 1 month ago

Thank you for the report @samuelcolvin. I will look into this and try to make a self contained reproducer. This panic should not be possible. I suspect there could be a bug in how dictionary encoded arrays are handled in the relevant code, since that could cause this panic but I have yet to confirm.

devinjdangelo commented 1 month ago

I was unable to reproduce this panic on main or v38. @samuelcolvin if you are able to provide more details about the parquet files which are triggering the issue (schema, sanitized values, directory structure) that may help reproduce.

Here is what I tried to reproduce:

datafusion-cli
DataFusion CLI v38.0.0
> COPY (values ('1', 'a', 'x'), ('2', 'b', 'y'), ('3', 'c', 'z')) TO 'test_files/scratch/copy/partitioned_table3/' STORED AS parquet PARTITIONED BY (column1, column3)
;
+-------+
| count |
+-------+
| 3     |
+-------+
1 row(s) fetched. 
Elapsed 0.003 seconds.

> CREATE EXTERNAL TABLE validate_partitioned_parquet3 STORED AS PARQUET 
LOCATION 'test_files/scratch/copy/partitioned_table3/' PARTITIONED BY (column1, column3);
0 row(s) fetched. 
Elapsed 0.001 seconds.

> COPY validate_partitioned_parquet3
TO 'test_files/scratch/copy/partitioned_table3_rewrite'
PARTITIONED BY (column1, column3)
STORED AS PARQUET;
+-------+
| count |
+-------+
| 3     |
+-------+
1 row(s) fetched. 
Elapsed 0.003 seconds.

And trying to force dictionary encoding where possible also runs without panic

DataFusion CLI v38.0.0
> COPY (values 
('c', arrow_cast('foo', 'Dictionary(Int32, Utf8)'), arrow_cast('foo2', 'Dictionary(Int32, Utf8)')), 
('d', arrow_cast('bar', 'Dictionary(Int32, Utf8)'), arrow_cast('bar2', 'Dictionary(Int32, Utf8)'))) 
to 'test_files/scratch/copy/part_dict_test' STORED AS PARQUET PARTITIONED BY (column2, column3);
+-------+
| count |
+-------+
| 2     |
+-------+
1 row(s) fetched. 
Elapsed 0.004 seconds.

> CREATE EXTERNAL TABLE dict_partitioned_test STORED AS PARQUET 
LOCATION 'test_files/scratch/copy/part_dict_test/' PARTITIONED BY (column2, column3);
0 row(s) fetched. 
Elapsed 0.001 seconds.

> select * from dict_partitioned_test;
+---------+---------+---------+
| column1 | column2 | column3 |
+---------+---------+---------+
| d       | bar     | bar2    |
| c       | foo     | foo2    |
+---------+---------+---------+
2 row(s) fetched. 
Elapsed 0.002 seconds.

> COPY (select column1, arrow_cast(column2, 'Dictionary(Int32, Utf8)') as column2, 
arrow_cast(column3, 'Dictionary(Int32, Utf8)') as column3 from dict_partitioned_test)
TO 'test_files/scratch/copy/part_dict_test_rewrite'
PARTITIONED BY (column2, column3)
STORED AS PARQUET;
+-------+
| count |
+-------+
| 2     |
+-------+
1 row(s) fetched. 
Elapsed 0.003 seconds.

The line that is panicking with index OOB would only happen if one of the partition arrays in the RecordBatch was extracted with fewer values than RecordBatch::num_rows().

For plain Utf8 arrays, this seems completely impossible given this code executes first (array.value(i) would panic first):

https://github.com/apache/datafusion/blob/c775e4d6ea6dfe9c26a772b676552b9711004a3d/datafusion/core/src/datasource/file_format/write/demux.rs#L339-L341

For dictionary encoded arrays, I could imagine something in the downcast / iteration code here producing fewer values. I thought that the iteration over the downcasted array should always produce exactly RecordBatch::num_rows() values, but perhaps there is a case where this is wrong.

https://github.com/apache/datafusion/blob/c775e4d6ea6dfe9c26a772b676552b9711004a3d/datafusion/core/src/datasource/file_format/write/demux.rs#L346-L353

Finally, the arrays constructed above are accessed like this:

https://github.com/apache/datafusion/blob/c775e4d6ea6dfe9c26a772b676552b9711004a3d/datafusion/core/src/datasource/file_format/write/demux.rs#L378-L385

alamb commented 1 week ago

Has this been fixed @samuelcolvin ?