pola-rs / polars

Dataframes powered by a multithreaded, vectorized query engine, written in Rust
https://docs.pola.rs
Other
30.59k stars 1.99k forks source link

reading 10GB parquet with polars.read_parquet crashes python with infinite recursion (0xC00000FD) error #7237

Open bjpinegar opened 1 year ago

bjpinegar commented 1 year ago

Polars version checks

Issue description

reading a very large (10GB) parquet file consistently crashes with "Process finished with exit code -1073741571 (0xC00000FD)" error in version 0.16.9, but not in 0.16.1

Reproducible example

import hashlib
import numpy as np
import polars as pl

df = pl.DataFrame({'id': [x for x in range(500_000_000)],
                   'md5': [hashlib.md5(str(x).encode('UTF-8')).hexdigest() 
for x in range(500_000_000)],
                       'temp_id':np.random.randint(0, 100_000, 500_000_000),
                       'sect_id': np.random.randint(0, 500_000, 500_000_000),
                       'revenue': np.random.randint(0, 525, 500_000_000)})

df.write_parquet('test.parquet')

df = pl.read_parquet('test.parquet')

Expected behavior

python doesn't crash

Installed versions

``` Polars: 0.16.9 Index type: UInt32 Platform: Windows-10-10.0.22621-SP0 Python: 3.11.2 (tags/v3.11.2:878ead1, Feb 7 2023, 16:38:35) [MSC v.1934 64 bit (AMD64)] ---Optional dependencies--- pyarrow: 11.0.0 pandas: 1.5.3 numpy: 1.24.2 fsspec: connectorx: xlsx2csv: deltalake: matplotlib: 3.7.0```
ritchie46 commented 1 year ago

I am very tight on memory. Can you prune all unneeded columns that are required for this to happen and what is the minimal dataset size that this happens?

ritchie46 commented 1 year ago

Can somebody help me with this one? I cannot replicate this due to memory issues.

ghuls commented 1 year ago
In [1]: import hashlib
   ...: import numpy as np
   ...: import polars as pl
   ...: 
   ...: df = pl.DataFrame({'id': [x for x in range(500_000_000)],
   ...:                    'md5': [hashlib.md5(str(x).encode('UTF-8')).hexdigest()
   ...: for x in range(500_000_000)],
   ...:                        'temp_id':np.random.randint(0, 100_000, 500_000_000),
   ...:                        'sect_id': np.random.randint(0, 500_000, 500_000_000),
   ...:                        'revenue': np.random.randint(0, 525, 500_000_000)})
   ...: 
   ...: df.write_parquet('test.parquet')

# Reading 'test.parquet' with polars will crash python, so try writing some other files first.

In [3]: df.write_parquet('test.64.parquet', row_group_size=64 * 1024 * 1024)

In [4]: df2 = pl.read_parquet("test.64.parquet")

In [5]: df.write_parquet('test.640.parquet', row_group_size=640 * 1024 * 1024)

In [6]: df2 = pl.read_parquet("test.640.parquet")

In [7]: df.write_parquet('test.640.parquet', row_group_size=64000 * 1024 * 1024)

In [8]: 

In [8]: df2 = pl.read_parquet("test.640.parquet")

In [9]: df.write_parquet('test.640.parquet', row_group_size=6400000 * 1024 * 1024)

In [10]: df2 = pl.read_parquet("test.640.parquet")

# Reading the original parquet file, crashes python.
In [11]: df2 = pl.read_parquet("test.parquet")
Segmentation fault

PyArrow seems to be able to read that parquet file, just fine, so probably it is not corrupt:

In [1]: import polars as pl

In [2]: import pyarrow as pa

In [3]: import pyarrow.parquet as pf

In [5]: pa_table = pf.read_table("test.parquet")

In [6]: pa_table
Out[6]: 
pyarrow.Table
id: int64
md5: large_string
temp_id: int64
sect_id: int64
revenue: int64
----
id: [[0,1,2,3,4,...,131067,131068,131069,131070,131071],[131072,131073,131074,131075,131076,...,262139,262140,262141,262142,262143],...,[499777529,499777530,499777531,499777532,499777533,...,499908596,499908597,499908598,499908599,499908600],[499908601,499908602,499908603,499908604,499908605,...,499999995,499999996,499999997,499999998,499999999]]
md5: [["cfcd208495d565ef66e7dff9f98764da","c4ca4238a0b923820dcc509a6f75849b","c81e728d9d4c2f636f067f89cc14862c","eccbc87e4b5ce2fe28308fd9f2a7baf3","a87ff679a2f3e71d9181a67b7542122c",...,"2232fa8580a3d4fb210acf7f47d19431","75861df61ecd4e6b108d8f6f18f736fe","e3aef13f99d51ef539344bd6d06e60d9","e83a0812cede8fa1f0efd82374b27ea6","aa963bfc51c8bfcfdb6d0bc1dcb402f4"],["f7de594e1d11e06450f211067665ff17","8f377f673be1da3ecd6cd7c846ae80d9","871cc4c57f21320216fd23b2b5d83eb9","2f41366afe9d070a6c0fb766f97b50a9","984f1377e3fdecd71a107fcd3a8681f2",...,"c1e9ff8ead0d93306f08d558f2979376","5694aa9aea6a913d6d18c686d459b687","5995b51c9ed07b0155ed896489a84b95","e49285990aca7f784004e8c22fc5d4b7","fb99fdd57688e0dd7b4e0ac064b2fef9"],...,["bb824d1b4ed53a7841da9735a94dea82","84856175057f66757d1cb0f31d4e9af9","0392675012fd4cf157351ea1909ebe22","e1650244e6f0a7266c9ade0996e3875f","0b07e7b813f643c79e13f9f05b0331bc",...,"013aa0ecbab6cea2cfaa912dedc19f74","8d235a25af2c5ced8783cee5a562f850","0cd93a5614590124d29107ac4c276b50","5f5f198fc37a2ba326931831912443c6","d8750358a76bc117465b0942ba6c5613"],["5d4a11bfa372f433fe124f1599c795f2","d4593a1909102ad87cb84201fb7be36a","51b1b5cfcdad3e045cc0de132de9cc05","7be81f734790978a2eaf1d3d956501b4","d80e00a07b6723cdbcf47815980fe171",...,"9ebf34ad4ce536cdf949942bcfc86f93","27dd883c1cc4afaadbce053cc92d691f","f70eab87e8a7dc456be1642308f395e8","5d0e61f6fb38a42dc546a18771c1d10e","cd51fb92a0467cbc9f1e7cba40fce616"]]
temp_id: [[58253,47801,82163,52286,26690,...,9534,29918,64926,18833,55018],[53936,10805,87494,56940,55759,...,77960,34627,95020,10809,5156],...,[79444,97878,60122,75746,95217,...,39021,45443,83774,37015,59047],[96000,51485,14007,90160,25814,...,42445,75065,78411,45446,98548]]
sect_id: [[239811,480625,41409,150869,271728,...,106728,325362,460691,284431,48880],[473711,317603,267847,382833,228937,...,247242,208313,99435,161064,286396],...,[54136,56948,235626,353737,408695,...,60762,182216,255883,159269,62167],[458865,378666,227774,333216,191161,...,241150,397135,192310,263018,25043]]
revenue: [[338,21,377,201,14,...,497,105,354,156,215],[369,401,109,162,3,...,511,14,283,403,502],...,[170,441,436,432,515,...,215,90,106,439,379],[481,316,3,60,218,...,512,77,122,125,418]]

# Get number of chunks and chunk lengths.
In [4]: id_array = pa_table["id"]                                                                                                                                                             

In [5]: chunk_sizes = [ len(c) for c in id_array.chunks ]

# Number of chunks.
In [17]: len(chunk_sizes)
Out[17]: 3815

In [14]: chunk_sizes_count = {}

In [15]: for c in chunk_sizes:
    ...:     chunk_sizes_count[c] = chunk_sizes_count.get(c, 0) + 1
    ...: 

# Chunk sizes and occurrence.
In [16]: chunk_sizes_count
Out[16]: {131072: 3807, 131071: 7, 91399: 1}

# Create Polars DataFrames with same chunks as PyArrow from non working parquet file.
In [20]: df_no_rechunk = pl.from_arrow(pa_table, rechunk=False)

# Write without rechunking.
In [21]: df_no_rechunk.write_parquet("test.no_rechunk.parquet", row_group_size=None)

# Reading the parquet again works.
In [22]: df = pl.read_parquet("test.no_rechunk.parquet")
ritchie46 commented 1 year ago

Thanks @ghuls. Could you try to find the minimal size this happens?

ghuls commented 1 year ago

I managed to make use of n_rows to trigger it:

In [2]: import polars as pl

In [3]: df2 = pl.read_parquet("test.parquet", n_rows=250_000_000)

In [4]: df2 = pl.read_parquet("test.parquet", n_rows=255_000_000)
Segmentation fault
ghuls commented 1 year ago

Narrowed it down a bit more. It starts segfaulting when n_rows=250_000_001:

In [2]: df2 = pl.read_parquet("test.parquet", n_rows=250_000_001)
Segmentation fault
ritchie46 commented 1 year ago

zstd has a stackoverflow on on this huge row group:

Don't really know what to do about this. We could document this in the docstrings or print a warning when users want to write huge row groups?

Thread 5 "memcheck" received signal SIGSEGV, Segmentation fault.
[Switching to Thread 0x7ffff7676640 (LWP 46704)]
0x00005555562b44d1 in FSE_decompress_wksp_bmi2 (dst=0x7fffe009c56c, dstCapacity=255, cSrc=0x7fffe00756cf, cSrcSize=cSrcSize@entry=14, maxLog=maxLog@entry=6, workSpace=workSpace@entry=0x7fffe009c66c, wkspSize=876, bmi2=1) at zstd/lib/common/fse_decompress.c:301
301 {
(gdb) bt
#0  0x00005555562b44d1 in FSE_decompress_wksp_bmi2 (dst=0x7fffe009c56c, dstCapacity=255, 
    cSrc=0x7fffe00756cf, cSrcSize=cSrcSize@entry=14, maxLog=maxLog@entry=6, 
    workSpace=workSpace@entry=0x7fffe009c66c, wkspSize=876, bmi2=1)
    at zstd/lib/common/fse_decompress.c:301
#1  0x00005555562b2ad5 in HUF_readStats_body (bmi2=1, wkspSize=876, workSpace=0x7fffe009c66c, 
    srcSize=48956, src=<optimized out>, tableLogPtr=0x7ffff7477180, nbSymbolsPtr=0x7ffff7477184, 
    rankStats=0x7fffe009c3fc, hwSize=<optimized out>, huffWeight=huffWeight@entry=0x7fffe009c56c "")
    at zstd/lib/common/entropy_common.c:272
#2  HUF_readStats_body_bmi2 (huffWeight=huffWeight@entry=0x7fffe009c56c "", hwSize=<optimized out>, 
    rankStats=0x7fffe009c3fc, nbSymbolsPtr=0x7ffff7477184, tableLogPtr=0x7ffff7477180, 
    src=<optimized out>, srcSize=48956, workSpace=0x7fffe009c66c, wkspSize=876)
    at zstd/lib/common/entropy_common.c:323
#3  0x00005555562b3406 in HUF_readStats_wksp (huffWeight=huffWeight@entry=0x7fffe009c56c "", 
    hwSize=hwSize@entry=256, rankStats=rankStats@entry=0x7fffe009c3fc, 
    nbSymbolsPtr=nbSymbolsPtr@entry=0x7ffff7477184, tableLogPtr=tableLogPtr@entry=0x7ffff7477180, 
    src=src@entry=0x7fffe00756ce, srcSize=48956, workSpace=0x36c, wkspSize=1, flags=-536228936)
    at zstd/lib/common/entropy_common.c:335
#4  0x00005555562c24f1 in HUF_readDTableX2_wksp (DTable=DTable@entry=0x7fffe0097f08, 
    src=src@entry=0x7fffe00756ce, srcSize=srcSize@entry=48956, workSpace=0x7fffe009c18c, 
    wkspSize=wkspSize@entry=2560, flags=flags@entry=1) at zstd/lib/decompress/huf_decompress.c:1152
#5  0x00005555562c574f in HUF_decompress4X2_DCtx_wksp (flags=1, wkspSize=2560, 
    workSpace=<optimized out>, cSrcSize=48956, cSrc=0x7fffe00756ce, dstSize=96377, 
    dst=0x7fffe00d5557, dctx=0x7fffe0097f08) at zstd/lib/decompress/huf_decompress.c:1713
#6  HUF_decompress4X_hufOnly_wksp (dctx=0x7fffe0097f08, dst=0x7fffe00d5557, 
    dstSize=dstSize@entry=96377, cSrc=cSrc@entry=0x7fffe00756ce, cSrcSize=48956, 
    workSpace=<optimized out>, wkspSize=2560, flags=1) at zstd/lib/decompress/huf_decompress.c:1878
#7  0x00005555562d79df in ZSTD_decodeLiteralsBlock (dctx=dctx@entry=0x7fffe00956d0, 
    src=src@entry=0x7fffe00756c9, srcSize=srcSize@entry=65854, dst=dst@entry=0x7fffe00ccdd0, 
    dstCapacity=dstCapacity@entry=2359360, streaming=streaming@entry=is_streaming)
    at zstd/lib/decompress/zstd_decompress_block.c:209
#8  0x00005555562d821f in ZSTD_decompressBlock_internal (streaming=is_streaming, frame=1, 
    srcSize=65854, src=0x7fffe00756c9, dstCapacity=<optimized out>, dst=0x7fffe00ccdd0, 
    dctx=0x7fffe00956d0) at zstd/lib/decompress/zstd_decompress_block.c:2082
#9  ZSTD_decompressBlock_internal (dctx=dctx@entry=0x7fffe00956d0, dst=dst@entry=0x7fffe00ccdd0, 
    dstCapacity=2359360, src=src@entry=0x7fffe00756c9, srcSize=srcSize@entry=65854, 
    frame=frame@entry=1, streaming=is_streaming) at zstd/lib/decompress/zstd_decompress_block.c:2064
#10 0x00005555562c73fd in ZSTD_decompressContinue (dctx=dctx@entry=0x7fffe00956d0, 
    dst=0x7fffe00ccdd0, dstCapacity=2359360, src=0x7fffe00756c9, srcSize=65854)
    at zstd/lib/decompress/zstd_decompress.c:1319
#11 0x00005555562c7942 in ZSTD_decompressContinue (srcSize=<optimized out>, src=<optimized out>, 
    dstCapacity=<optimized out>, dst=<optimized out>, dctx=<optimized out>)
    at zstd/lib/decompress/zstd_decompress.c:2010
#12 ZSTD_decompressContinueStream (zds=zds@entry=0x7fffe00956d0, op=op@entry=0x7ffff74774d0, 
    oend=oend@entry=0x7ffff4f7790b "", src=src@entry=0x7fffe00756c9, srcSize=srcSize@entry=65854)
    at zstd/lib/decompress/zstd_decompress.c:2009
#13 0x00005555562c9ee3 in ZSTD_decompressStream (zds=0x7fffe00956d0, output=0x7ffff7477548, 
    input=0x7ffff7477568) at zstd/lib/decompress/zstd_decompress.c:2225
#14 0x000055555629f00e in zstd_safe::DCtx::decompress_stream<[u8]> (self=0x7ffff7477708, 
    output=<optimized out>, input=0x7ffff7477600)
    at /home/ritchie46/.cargo/registry/src/github.com-1ecc6299db9ec823/zstd-safe-5.0.2+zstd.1.5.2/src/lib.rs:907
#15 0x00005555562aeef1 in zstd::stream::raw::{impl#3}::run<[u8]> (self=0x7ffff7477708, 
    input=0x7ffff7477600, output=0x7ffff74775d0)
    at /home/ritchie46/.cargo/registry/src/github.com-1ecc6299db9ec823/zstd-0.11.2+zstd.1.5.2/src/str--Type <RET> for more, q to quit, c to continue without paging--qQuit
(gdb) thread apply all bt

Thread 17 (Thread 0x7ffff5e4c640 (LWP 46716) "memcheck"):
#0  syscall () at ../sysdeps/unix/sysv/linux/x86_64/syscall.S:38
#1  0x00005555563493e0 in std::sys::unix::futex::futex_wait () at library/std/src/sys/unix/futex.rs:62
#2  std::sys::unix::locks::futex_condvar::Condvar::wait_optional_timeout () at library/std/src/sys/unix/locks/futex_condvar.rs:49
#3  std::sys::unix::locks::futex_condvar::Condvar::wait () at library/std/src/sys/unix/locks/futex_condvar.rs:33
#4  0x0000555555f9a5b3 in std::sync::condvar::Condvar::wait<bool> (self=0x5555563c1f88, guard=...) at /rustc/2c8cc343237b8f7d5a3c3703e3a87f2eb2c54a74/library/std/src/sync/condvar.rs:191
#5  rayon_core::sleep::Sleep::sleep<rayon_core::registry::{impl#10}::wait_until_cold::{closure_env#0}> (self=<optimized out>, idle_state=0x7ffff5e4ae50, latch=0x7ffff5e4af40, has_injected_jobs=...) at src/sleep/mod.rs:228
#6  0x0000555555f98eb7 in rayon_core::sleep::Sleep::no_work_found<rayon_core::registry::{impl#10}::wait_until_cold::{closure_env#0}> (self=0xfffffffffffffe00, idle_state=0x7ffff5e4ae50, latch=0x7ffff5e4af40, has_injected_jobs=...) at src/sleep/mod.rs:122
#7  rayon_core::registry::WorkerThread::wait_until_cold (self=0x7ffff5e4b980, latch=0x7ffff5e4af40) at src/registry.rs:823
#8  0x0000555555944c16 in rayon_core::registry::WorkerThread::wait_until<rayon_core::latch::SpinLatch> (self=0x7ffff5e4b980, latch=<optimized out>) at /home/ritchie46/.cargo/registry/src/github.com-1ecc6299db9ec823/rayon-core-1.11.0/src/registry.rs:803
#9  rayon_core::join::join_context::{closure#0}<rayon::iter::plumbing::bridge_producer_consumer::helper::{closure_env#0}<rayon::slice::IterProducer<usize>, rayon::iter::map::MapConsumer<rayon::iter::map::MapConsumer<rayon::iter::while_some::WhileSomeConsumer<rayon::iter::extend::ListVecConsumer>, rayon::result::{impl#4}::from_par_iter::ok::{closure_env#0}<polars_core::series::Series, polars_error::PolarsError>>, polars_io::parquet::read_impl::rg_to_dfs::{closure#0}::{closure_env#0}>>, rayon::iter::plumbing::bridge_producer_consumer::helper::{closure_env#1}<rayon::slice::IterProducer<usize>, rayon::iter::map::MapConsumer<rayon::iter::map::MapConsumer<rayon::iter::while_some::WhileSomeConsumer<rayon::iter::extend::ListVecConsumer>, rayon::result::{impl#4}::from_par_iter::ok::{closure_env#0}<polars_core::series::Series, polars_error::PolarsError>>, polars_io::parquet::read_impl::rg_to_dfs::{closure#0}::{closure_env#0}>>, alloc::collections::linked_list::LinkedList<alloc::vec::Vec<polars_core::series::Series, alloc::alloc::Global>>, alloc::collections::linked_list::LinkedList<alloc::vec::Vec<polars_core::series::Series, alloc::alloc::Global>>> (worker_thread=0x7ffff5e4b980, injected=false) at /home/ritchie46/.cargo/registry/src/github.com-1ecc6299db9ec823/rayon-core-1.11.0/src/join/mod.rs:167
#10 0x0000555555944571 in rayon_core::registry::in_worker<rayon_core::join::join_context::{closure_env#0}<rayon::iter::plumbing::bridge_producer_consumer::helper::{closure_env#0}<rayon::slice::IterProducer<usize>, rayon::iter::map::MapConsumer<rayon::iter::map::MapConsumer<rayon::iter::while_some::WhileSomeConsumer<rayon::iter::extend::ListVecConsumer>, rayon::result::{impl#4}::from_par_iter::ok::{closure_env#0}<polars_core::series::Series, polars_error::PolarsError>>, polars_io::parquet::read_impl::rg_to_dfs::{closure#0}::{closure_env#0}>>, rayon::iter::plumbing::bridge_producer_consumer::helper::{closure_env#1}<rayon::slice::IterProducer<usize>, rayon::iter::map::MapConsumer<rayon::iter::map::MapConsumer<rayon::iter::while_some::WhileSomeConsumer<rayon::iter::extend::ListVecConsumer>, rayon::result::{impl#4}::from_par_iter::ok::{closure_env#0}<polars_core::series::Series, polars_error::PolarsError>>, polars_io::parquet::read_impl::rg_to_dfs::{closure#0}::{closure_env#0}>>, alloc::collections::linked_list::LinkedList<alloc::vec::Vec<polars_core::series::Series, alloc::alloc::Global>>, alloc::collections::linked_list::LinkedList<alloc::vec::Vec<polars_core::series::Series, alloc::alloc::Global>>>, (alloc::collections::linked_list::LinkedList<alloc::vec::Vec<polars_core::series::Series, alloc::alloc::Global>>, alloc::collections::linked_list::LinkedList<alloc::vec::Vec<polars_core::series::Series, alloc::alloc::Global>>)> (op=...) at /home/ritchie46/.cargo/registry/src/github.com-1ecc6299db9ec823/rayon-core-1.11.0/src/registry.rs:521
#11 rayon_core::join::join_context<rayon::iter::plumbing::bridge_producer_consumer::helper::{closure_env#0}<rayon::slice::IterProducer<usize>, rayon::iter::map::MapConsumer<rayon::iter::map::MapConsumer<rayon::iter::while_some::WhileSomeConsumer<rayon::iter::extend::ListVecConsumer>, rayon::result::{im--Type <RET> for more, q to quit, c to continue without paging--Quit
ghuls commented 1 year ago

@ritchie46 There is a new zstd version out 1.5.4 (not the rust bindings yet).

https://github.com/gyscos/zstd-rs/issues/213

ritchie46 commented 1 year ago

Open an issue to get some more info on this: https://github.com/gyscos/zstd-rs/issues/225