pola-rs / polars

Dataframes powered by a multithreaded, vectorized query engine, written in Rust
https://docs.pola.rs
Other
30.49k stars 1.98k forks source link

Support `chunk_size` argument in `write_ipc` and `sink_ipc`. #2639

Open ghuls opened 2 years ago

ghuls commented 2 years ago

Are you using Python or Rust?

Python.

What version of polars are you using?

0.13.0

What operating system are you using polars on?

CentOS 7

Describe your bug.

Writing to an IPC file, first seems to write to an intermediate buffer.

What are the steps to reproduce the behavior?

In [1]: import numpy as np

In [2]: import polars as pl

# Create numpy array of around 3G in size.
In [3]: a = np.ones((40000, 10000), dtype=np.float64)

# Create polars dataframe from it.
In [4]: df = pl.DataFrame(a)

# Release numpy array memory.
In [5]: del a

# Check memory usage in htop while running the next commands:

# Temporary memory increase of 3G.
In [6]: df.to_ipc("test.uncompressed.feather", compression="uncompressed")

# No noticeable memory increase (as dataframe with all ones compresses very well).
In [7]: df.to_ipc("test.lz4.feather", compression="lz4")

# No noticeable memory increase (as dataframe with all ones compresses very well).
In [9]: df.to_ipc("test.zstd.feather", compression="zstd")

# File sizes.
In [11]: ! ls -lh test.*.feather
-rw-r--r-- 1 user domain users  16M Feb 13 21:55 test.lz4.feather
-rw-r--r-- 1 user domain users 3,0G Feb 13 21:54 test.uncompressed.feather
-rw-r--r-- 1 user domain users 2,5M Feb 13 21:55 test.zstd.feather

@jorgecarleitao It seems that arrow2 creates Feather files that pyarrow (I used pyarrow 7.0.0) does not like (if compressed with lz4 or zstd):

# Reading witth arrow2 IPC reader:
In [12]: df_tmp = pl.read_ipc("test.uncompressed.feather", use_pyarrow=False)

In [18]: df_tmp = pl.read_ipc("test.lz4.feather", use_pyarrow=False)

In [19]: df_tmp = pl.read_ipc("test.zstd.feather", use_pyarrow=False)

# Reading witth pyarrow IPC reader wrapped by polars:
In [21]: df_tmp = pl.read_ipc("test.uncompressed.feather", use_pyarrow=True)

In [22]: df_tmp = pl.read_ipc("test.lz4.feather", use_pyarrow=True)
---------------------------------------------------------------------------
OSError                                   Traceback (most recent call last)
<ipython-input-22-ec7cb7acbdf0> in <module>
----> 1 df_tmp = pl.read_ipc("test.lz4.feather", use_pyarrow=True)

~/software/polars/py-polars/polars/io.py in read_ipc(file, columns, n_rows, use_pyarrow, memory_map, storage_options, row_count_name, row_count_offset, **kwargs)
    783                 )
    784
--> 785             tbl = pa.feather.read_table(data, memory_map=memory_map, columns=columns)
    786             return DataFrame._from_arrow(tbl)
    787

~/software/anaconda3/envs/polars_test/lib/python3.9/site-packages/pyarrow/feather.py in read_table(source, columns, memory_map, use_threads)
    246
    247     if columns is None:
--> 248         return reader.read()
    249
    250     column_types = [type(column) for column in columns]

~/software/anaconda3/envs/polars_test/lib/python3.9/site-packages/pyarrow/_feather.pyx in pyarrow._feather.FeatherReader.read()

~/software/anaconda3/envs/polars_test/lib/python3.9/site-packages/pyarrow/error.pxi in pyarrow.lib.check_status()

OSError: Lz4 compressed input contains more than one frame

In [23]: df_tmp = pl.read_ipc("test.zstd.feather", use_pyarrow=True)
---------------------------------------------------------------------------
OSError                                   Traceback (most recent call last)
<ipython-input-23-23f53479fddc> in <module>
----> 1 df_tmp = pl.read_ipc("test.zstd.feather", use_pyarrow=True)

~/software/polars/py-polars/polars/io.py in read_ipc(file, columns, n_rows, use_pyarrow, memory_map, storage_options, row_count_name, row_count_offset, **kwargs)
    783                 )
    784
--> 785             tbl = pa.feather.read_table(data, memory_map=memory_map, columns=columns)
    786             return DataFrame._from_arrow(tbl)
    787

~/software/anaconda3/envs/polars_test/lib/python3.9/site-packages/pyarrow/feather.py in read_table(source, columns, memory_map, use_threads)
    246
    247     if columns is None:
--> 248         return reader.read()
    249
    250     column_types = [type(column) for column in columns]

~/software/anaconda3/envs/polars_test/lib/python3.9/site-packages/pyarrow/_feather.pyx in pyarrow._feather.FeatherReader.read()

~/software/anaconda3/envs/polars_test/lib/python3.9/site-packages/pyarrow/error.pxi in pyarrow.lib.check_status()

OSError: ZSTD decompression failed: Src size is incorrect

# Reading witth pyarrow IPC reader directly:
In [27]: pa_table = pf.read_table("test.uncompressed.feather")

In [28]: pa_table = pf.read_table("test.lz4.feather")
---------------------------------------------------------------------------
OSError                                   Traceback (most recent call last)
<ipython-input-28-6579c49191df> in <module>
----> 1 pa_table = pf.read_table("test.lz4.feather")

~/software/anaconda3/envs/polars_test/lib/python3.9/site-packages/pyarrow/feather.py in read_table(source, columns, memory_map, use_threads)
    246
    247     if columns is None:
--> 248         return reader.read()
    249
    250     column_types = [type(column) for column in columns]

~/software/anaconda3/envs/polars_test/lib/python3.9/site-packages/pyarrow/_feather.pyx in pyarrow._feather.FeatherReader.read()

~/software/anaconda3/envs/polars_test/lib/python3.9/site-packages/pyarrow/error.pxi in pyarrow.lib.check_status()

OSError: Lz4 compressed input contains more than one frame

In [29]: pa_table = pf.read_table("test.zstd.feather")
---------------------------------------------------------------------------
OSError                                   Traceback (most recent call last)
<ipython-input-29-60c57a9ccc7f> in <module>
----> 1 pa_table = pf.read_table("test.zstd.feather")

~/software/anaconda3/envs/polars_test/lib/python3.9/site-packages/pyarrow/feather.py in read_table(source, columns, memory_map, use_threads)
    246
    247     if columns is None:
--> 248         return reader.read()
    249
    250     column_types = [type(column) for column in columns]

~/software/anaconda3/envs/polars_test/lib/python3.9/site-packages/pyarrow/_feather.pyx in pyarrow._feather.FeatherReader.read()

~/software/anaconda3/envs/polars_test/lib/python3.9/site-packages/pyarrow/error.pxi in pyarrow.lib.check_status()

OSError: ZSTD decompression failed: Src size is incorrect
ritchie46 commented 2 years ago

I have double checked that we don't accidentally create a memory buffer in python, but that is not the case. We open a Rust file handler and dispatch to arrow2.

I've skimmed a bit through the source of IPC writing and I can tell we write the data to an in memory buffer. I think we must expose a chunk_size argument to the IPC writer so that we can influence how much memory is used before written.

ghuls commented 2 years ago

When writing with pyarrow i can see the memory usage going up and down with 1G (or rare cases 2G) when it is writing to a Feather file.

jorgecarleitao commented 2 years ago

Yeap, arrow2 currently has an intermediary write buffer. I have been trying to fix it but haven't been able yet. Good to know that pyarrow does not use it - it means that it is possible :p

ghuls commented 2 years ago

@jorgecarleitao it might have a "small" intermediate buffer of 1G (or at least I see allocations of 1G and deallocations of 1GB. Also arrow2 generates compressed IPC files that pyarrow can't read (arrow2 itself can read it fine) (end of first post).

I am trying to write Feather files from a dataframe that is 135G, so it would be nice that writing it to a file does not require another 135G.

jorgecarleitao commented 2 years ago

Yeap, I am also investigating that one. It seems that pyarrow has more requirements than simply "zstd" or "Lz4" encoding, but because the arrow project has no integration tests on these, we can't prove roundtrip.

I am working on the apache/arrow directly to try to improve this situation.

I agree that we should not require an extra buffer here.

ghuls commented 2 years ago

For LZ4 the go implementation has this comment: https://github.com/apache/arrow/blob/bcf3d3e5a2ae5e70034b104ce69f774b78bbb4de/go/arrow/ipc/compression.go#L65-L80

arrow-rs hit the same "Lz4 compressed input contains more than one frame" problem: https://github.com/apache/arrow/pull/9137

jorgecarleitao commented 2 years ago

I finally found the root cause! Fixed in https://github.com/jorgecarleitao/arrow2/pull/840

ghuls commented 2 years ago

@jorgecarleitao thanks a lot of all bugfixes lately.

But it looks like it still isn't fixed completely (file created with polars,to_ipc(.., compression="lz4"):

In [5]: import pyarrow.feather as pf

In [6]: %time a = pf.read_table('tests.v2_lz4.feather')
---------------------------------------------------------------------------
ArrowInvalid                              Traceback (most recent call last)
<timed exec> in <module>

/staging/leuven/stg_00002/lcb/ghuls/software/miniconda3/envs/create_cistarget_databases/lib/python3.8/site-packages/pyarrow/feather.py in read_table(source, columns, memory_map, use_threads)
    246
    247     if columns is None:
--> 248         return reader.read()
    249
    250     column_types = [type(column) for column in columns]

/software/miniconda3/envs/create_cistarget_databases/lib/python3.8/site-packages/pyarrow/_feather.pyx in pyarrow._feather.FeatherReader.read()

/software/miniconda3/envs/create_cistarget_databases/lib/python3.8/site-packages/pyarrow/error.pxi in pyarrow.lib.check_status()

ArrowInvalid: Buffer 6 did not start on 8-byte aligned offset: 3963187

In [7]: %time df.to_ipc('test.v2_zstd.feather', 'zstd')
CPU times: user 1min 54s, sys: 16.2 s, total: 2min 10s
Wall time: 3min

In [8]: %time a = pf.read_table('test.v2_zstd.feather')
---------------------------------------------------------------------------
ArrowInvalid                              Traceback (most recent call last)
<timed exec> in <module>

/software/miniconda3/envs/create_cistarget_databases/lib/python3.8/site-packages/pyarrow/feather.py in read_table(source, columns, memory_map, use_threads)
    246
    247     if columns is None:
--> 248         return reader.read()
    249
    250     column_types = [type(column) for column in columns]

/software/miniconda3/envs/create_cistarget_databases/lib/python3.8/site-packages/pyarrow/_feather.pyx in pyarrow._feather.FeatherReader.read()

/software/miniconda3/envs/create_cistarget_databases/lib/python3.8/site-packages/pyarrow/error.pxi in pyarrow.lib.check_status()

ArrowInvalid: Buffer 4 did not start on 8-byte aligned offset: 513398

In [9]: pl.__version__
Out[9]: '0.13.2'
jorgecarleitao commented 2 years ago

Do you have a minimal example? asking because afai understand pyarrow is writing un-aligned offsets, but apparently it can still read them. Thus, I am misunderstanding the Arrow spec here.

ghuls commented 2 years ago

At the moment I don't have a minimal example (file is 32GB) but I can try to reproduce it with a smaller file.

ghuls commented 2 years ago

Small feather files created by arrow2 (also uncompressed in case you want to generate a compressed one with pyarrow) test.feather_v2.zip :


In [19]: df_head100.to_ipc("test.lz4_v2.feather", compression="lz4")

In [20]: df_head100.to_ipc("test.zstd_v2.feather", compression="zstd")

In [21]: a = pf.read_table("test.lz4_v2.feather")
---------------------------------------------------------------------------
ArrowInvalid                              Traceback (most recent call last)
<ipython-input-21-ebc079c6397a> in <module>
----> 1 a = pf.read_table("test.lz4_v2.feather")
/software/miniconda3/envs/create_cistarget_databases/lib/python3.8/site-packages/pyarrow/feather.py in read_table(source, columns, memory_map, use_threads)
    246
    247     if columns is None:
--> 248         return reader.read()
    249
    250     column_types = [type(column) for column in columns]

/software/miniconda3/envs/create_cistarget_databases/lib/python3.8/site-packages/pyarrow/_feather.pyx in pyarrow._feather.FeatherReader.read()

/software/miniconda3/envs/create_cistarget_databases/lib/python3.8/site-packages/pyarrow/error.pxi in pyarrow.lib.check_status()

ArrowInvalid: Buffer 4 did not start on 8-byte aligned offset: 242

In [22]: a = pf.read_table("test.zstd_v2.feather")
---------------------------------------------------------------------------
ArrowInvalid                              Traceback (most recent call last)
<ipython-input-22-456a8d615b60> in <module>
----> 1 a = pf.read_table("test.zstd_v2.feather")
/software/miniconda3/envs/create_cistarget_databases/lib/python3.8/site-packages/pyarrow/feather.py in read_table(source, columns, memory_map, use_threads)
    246
    247     if columns is None:
--> 248         return reader.read()
    249
    250     column_types = [type(column) for column in columns]

software/miniconda3/envs/create_cistarget_databases/lib/python3.8/site-packages/pyarrow/_feather.pyx in pyarrow._feather.FeatherReader.read()
/software/miniconda3/envs/create_cistarget_databases/lib/python3.8/site-packages/pyarrow/error.pxi in pyarrow.lib.check_status()

ArrowInvalid: Buffer 4 did not start on 8-byte aligned offset: 213

In [23]: df_head100.to_ipc("test.uncompressed_v2.feather", compression="uncompressed")
ghuls commented 2 years ago

@ritchie46 Could you update arrow2 when you make a new release?

zundertj commented 2 years ago

Closing this as https://github.com/jorgecarleitao/arrow2/pull/840 fixes the issue, which is made available in the arrow2 release v0.10.0 (see https://github.com/jorgecarleitao/arrow2/releases), which polars in turn has incorporated with release 0.20.0 (https://github.com/pola-rs/polars/pull/2888).

ghuls commented 2 years ago

@zundertj The original issue is not fixed (writing whole file to memory first).

zundertj commented 2 years ago

My apologies, I thought it was fixed given this conversation and the releases.

jorgecarleitao commented 2 years ago

Filed upstream: https://github.com/jorgecarleitao/arrow2/issues/928