pola-rs / polars

Dataframes powered by a multithreaded, vectorized query engine, written in Rust
https://docs.pola.rs
Other
29.33k stars 1.86k forks source link

write_parquet function in polars-u64-idx does not support large data frames #3120

Closed jnthnhss closed 2 years ago

jnthnhss commented 2 years ago

What language are you using?

Python

What version of polars are you using?

0.13.21

What operating system are you using polars on?

Ubuntu 20.04.1 LTS

What language version are you using

Python 3.8.5

Describe your bug.

I'm using the 64 bit version of Polars. However, the write_parquet function does not seem to support large data frames.

What are the steps to reproduce the behavior?

df = pl.select(pl.repeat(0,n=2**32).alias('col_1'))
df.write_parquet('tmp.parquet')

What is the actual behavior?

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/local/lib/python3.8/dist-packages/polars/internals/frame.py", line 1400, in write_parquet
    self._df.to_parquet(file, compression, statistics)
exceptions.ArrowErrorException: ExternalFormat("underlying snap error: snappy: input buffer (size = 34359738368) is larger than allowed (size = 4294967295)")
ritchie46 commented 2 years ago

I think we must split these large dataframes up in different row groups. I already open a discussion/issue for this upstream: https://github.com/jorgecarleitao/arrow2/issues/932

Maybe we should also default to another compression like lzo. :thinking:

ritchie46 commented 2 years ago

@jnthnhss I wan to change the default compression algorithm to "LZ4". Could you give that one a try? I don't have the memory capacity to test these numbers out.

jnthnhss commented 2 years ago

I tried out all the options for compression below. With no compression I also get an error. I should mentioned that it does work with use_pyarrow=True.

import polars as pl

df = pl.select(pl.repeat(0,n=2**32).alias('col_1'))

df.write_parquet('tmp.parquet',compression='uncompressed')

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/local/lib/python3.8/dist-packages/polars/internals/frame.py", line 1400, in write_parquet
    self._df.to_parquet(file, compression, statistics)
exceptions.ArrowErrorException: ExternalFormat("A page can only contain i32::MAX uncompressed bytes. This one contains 34896609285")

df.write_parquet('tmp.parquet',compression='snappy')

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/local/lib/python3.8/dist-packages/polars/internals/frame.py", line 1400, in write_parquet
    self._df.to_parquet(file, compression, statistics)
exceptions.ArrowErrorException: ExternalFormat("underlying snap error: snappy: input buffer (size = 34359738368) is larger than allowed (size = 4294967295)")

df.write_parquet('tmp.parquet',compression='gzip')

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/local/lib/python3.8/dist-packages/polars/internals/frame.py", line 1400, in write_parquet
    self._df.to_parquet(file, compression, statistics)
exceptions.ArrowErrorException: ExternalFormat("A page can only contain i32::MAX uncompressed bytes. This one contains 34896609285")

df.write_parquet('tmp.parquet',compression='lzo')

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/local/lib/python3.8/dist-packages/polars/internals/frame.py", line 1400, in write_parquet
    self._df.to_parquet(file, compression, statistics)
exceptions.ArrowErrorException: ExternalFormat("Compression Lzo is not supported")

df.write_parquet('tmp.parquet',compression='brotli')

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/local/lib/python3.8/dist-packages/polars/internals/frame.py", line 1400, in write_parquet
    self._df.to_parquet(file, compression, statistics)
exceptions.ArrowErrorException: ExternalFormat("A page can only contain i32::MAX uncompressed bytes. This one contains 34896609285")

df.write_parquet('tmp.parquet',compression='lz4')

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/local/lib/python3.8/dist-packages/polars/internals/frame.py", line 1400, in write_parquet
    self._df.to_parquet(file, compression, statistics)
exceptions.ArrowErrorException: ExternalFormat("A page can only contain i32::MAX uncompressed bytes. This one contains 34896609285")

df.write_parquet('tmp.parquet',compression='zstd')

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/local/lib/python3.8/dist-packages/polars/internals/frame.py", line 1400, in write_parquet
    self._df.to_parquet(file, compression, statistics)
exceptions.ArrowErrorException: ExternalFormat("A page can only contain i32::MAX uncompressed bytes. This one contains 34896609285")
ritchie46 commented 2 years ago

This should be fixed by #3181. We now split large pages before writing.

jnthnhss commented 2 years ago

Thank you so much. I will give it a try today.

sammysheep commented 2 years ago

Same error implementing in the Rust API using 0.20.0 yesterday. Is the mentioned patch only available on a pre-release version?

Amazing work, by the way.

For completeness:

Error: ArrowError(ExternalFormat("A page can only contain i32::MAX uncompressed bytes. This one contains 4913383091"))
GZIP ^^

Error: ArrowError(ExternalFormat("underlying snap error: snappy: input buffer (size = 4929419665) is larger than allowed (size = 4294967295)"))
SNAPPY ^^

FWIW, Snappy > Lzo as a default. At least Cloudera thinks so and they helped develop parquet.

sammysheep commented 2 years ago

Also, is there an option to control the page size?

ritchie46 commented 2 years ago

Also, is there an option to control the page size?

No, there is not. The fix upstream now halves the page size when it is estimated to hit the limit.

ritchie46 commented 2 years ago

Did you took the PR branch? It is not yet on master.

sammysheep commented 2 years ago

Apologies, that was a noob error. I have re-built using the PR branch this time. Used Snappy with this test on a 43GB TSV file being converted to parquet. I get a new error:

thread '<unknown>' has overflowed its stack
fatal runtime error: stack overflow
Aborted

Full code is just based on your example:

use polars::prelude::*;

fn main() -> Result<()> {
    let input_tsv = std::env::args().nth(1).expect("no input tsv given");
    let output_parquet = std::env::args().nth(2).expect("no output parquet given");

    let mut df = LazyCsvReader::new(input_tsv)
        .has_header(false)
        .with_delimiter(b'\t')
        .with_null_values(Some(NullValues::AllColumns("\\N".to_owned())))
        .finish()?
        .select([all()])
        .collect()?;

    dbg!(&df);

    if std::fs::metadata(&output_parquet).is_err() {
        let f = std::fs::File::create(&output_parquet).unwrap();
        ParquetWriter::new(f)
            .with_statistics(true)
            .with_compression(ParquetCompression::Snappy)
            .finish(&mut df)?;
    }

    Ok(())
}

Using CentOS Linux release 7.9.2009 (Core)

ritchie46 commented 2 years ago

Thanks for giving it another try. I will see if I can try to reproduce this in a few days.

ritchie46 commented 2 years ago

Hi @sammysheep, could you give it another try? There might still be a problem, but that would then mean you have a single value that is huge. Could that be the case?

sammysheep commented 2 years ago

I'll give it a shot and let you know. This file has been converted before via Spark, which I think also uses arrow, albeit a different implementation. The largest single values are about 32k bytes or so.

sammysheep commented 2 years ago

This time it does something different. It just says Killed. No SO on this test. :)

However, I watched the process and it died after it reached the memory limit of the server, which is about 187G. The file size this time was 56G, which was my mistake that I did not save the earlier version of the file. I can try to do something closer to the original test on Monday.

I converted the same file yesterday on a similar server using local mode and coalescing to one partition. This is the closest to what I am doing with polars. The peak memory in the history are much lower for the Spark job, about 5G. The jobs aren't exactly the same because the Spark one does a distinct() call, though most of the data if not all should be distinct. In any case, I think Spark may target block sizes similar to the HDFS block size, about 128 MB.

https://medium.com/analytics-vidhya/spark-parquet-file-cac4af92981d#:~:text=Parquet%3A%20Data%20Organization&text=Horizontal%20Partitioning(Row%20Groups)%20is,of%20128%20MB%20(default).

More on Monday with different sizes.

cbilot commented 2 years ago

df = pl.select(pl.repeat(0,n=2**32,eager=True).alias('col_1')) df.write_parquet('tmp.parquet')

I installed polars-u64-idx (0.13.26), and ran the above code. No errors. (And reading the resultant parquet file showed no problems.)

If there's anything I can do to test/benchmark/whatever, please let me know. Just point me to the files. I have only 0.5TB of RAM, so my machine may not be able to read the absolute largest of files.

sammysheep commented 2 years ago

Sorry for the delay. I chunked the data by lines equally and it ran successfully*:

-rw-rw----. 1 vfn4 vfn4 56G May  1 07:36 CDS-202204.txt
-rw-rw----. 1 vfn4 vfn4 28G May  4 11:39 CDS-202204_chunkaa
-rw-rw----. 1 vfn4 vfn4 28G May  4 11:39 CDS-202204_chunkab
-rw-rw----. 1 vfn4 vfn4 65G May  4 11:57 CDS-202204_chunkab.parquet
-rw-rw----. 1 vfn4 vfn4 65G May  4 12:12 CDS-202204_chunkaa.parquet

The runtime stats show it is using around 150 to 160G memory for each file:

# First
        Command being timed: "cargo run --release CDS-202204_chunkaa CDS-202204_chunkaa.parquet"
        User time (seconds): 268.11
        System time (seconds): 552.91
        Percent of CPU this job got: 126%
        Elapsed (wall clock) time (h:mm:ss or m:ss): 10:47.14
        Average shared text size (kbytes): 0
        Average unshared data size (kbytes): 0
        Average stack size (kbytes): 0
        Average total size (kbytes): 0
        Maximum resident set size (kbytes): 161270116
        Average resident set size (kbytes): 0
        Major (requiring I/O) page faults: 128
        Minor (reclaiming a frame) page faults: 119604642
        Voluntary context switches: 10822679
        Involuntary context switches: 2236
        Swaps: 0
        File system inputs: 58113944
        File system outputs: 134753416
        Socket messages sent: 0
        Socket messages received: 0
        Signals delivered: 0
        Page size (bytes): 4096
        Exit status: 0
# Then:
        Command being timed: "cargo run --release CDS-202204_chunkab CDS-202204_chunkab.parquet"
        User time (seconds): 274.96
        System time (seconds): 556.81
        Percent of CPU this job got: 124%
        Elapsed (wall clock) time (h:mm:ss or m:ss): 11:06.36
        Average shared text size (kbytes): 0
        Average unshared data size (kbytes): 0
        Average stack size (kbytes): 0
        Average total size (kbytes): 0
        Maximum resident set size (kbytes): 157404136
        Average resident set size (kbytes): 0
        Major (requiring I/O) page faults: 125
        Minor (reclaiming a frame) page faults: 124380936
        Voluntary context switches: 11137327
        Involuntary context switches: 1992
        Swaps: 0
        File system inputs: 57980432
        File system outputs: 134989248
        Socket messages sent: 0
        Socket messages received: 0
        Signals delivered: 0
        Page size (bytes): 4096
        Exit status: 0

My system didn't like the parquet files CDS-202204_chunkab.parquet is of an unsupported version. file version: 2. This is likely a setting I missed. But parquet-tools was able to decode it. I couldn't verify the parquet data further than that.

*Some oddities include that the compressed parquet is much larger than the original file (over 2x). I don't know if the behavior I am showing is expected or not, but if not expected they likely fit into separate issues. Thanks for everything and let me know if you have other stuff I can test down the road.

ritchie46 commented 2 years ago

I will ping @jorgecarleitao in here as he has more understanding of this.

What compression algorithm did you use? And do you see a difference with other ones?

sammysheep commented 2 years ago

I used Snappy because that is the cluster default we use. Happy to try another one. What do you recommend?

jorgecarleitao commented 2 years ago

I am sorry this slipped through the cracks on my triage. @ritchie46 din't you fixed this by with the split of the array in multiple pages?

ritchie46 commented 2 years ago

I am sorry this slipped through the cracks on my triage. @ritchie46 din't you fixed this by with the split of the array in multiple pages?

I believe so. My machine is only a mere 16GB, so I am not able to test this.

But @cbilot's successful run gives me trust that it is fixed now.

cbilot commented 2 years ago

Hmmm, when I ran that code, I was using polars-u64-idx (0.13.26), which was actually regular polars, due to #3279, a bug in the build CI.

So just to re-confirm the result, today, using polars-u64-idx (0.13.38), I created the following dataset of 10 billion records, wrote it as a parquet file, and then read it back in:

import polars as pl
import numpy as np

df = pl.DataFrame({
    'col1': np.random.normal(0, 1, 10_000_000_000),
}).with_row_count()
df
>>> df
shape: (10000000000, 2)
┌────────────┬───────────┐
│ row_nr     ┆ col1      │
│ ---        ┆ ---       │
│ u64        ┆ f64       │
╞════════════╪═══════════╡
│ 0          ┆ -0.212407 │
├╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌┤
│ 1          ┆ 0.923423  │
├╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌┤
│ 2          ┆ -0.199929 │
├╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌┤
│ 3          ┆ -0.358613 │
├╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌┤
│ ...        ┆ ...       │
├╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌┤
│ 9999999996 ┆ 2.465729  │
├╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌┤
│ 9999999997 ┆ -0.498744 │
├╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌┤
│ 9999999998 ┆ -0.841243 │
├╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌┤
│ 9999999999 ┆ -0.303019 │
└────────────┴───────────┘
df.write_parquet("tmp.parquet")
$ ls -lh *.parquet
-rw-rw-r-- 1 corey corey 115G May 26 23:01 tmp.parquet
df2 = pl.read_parquet('tmp.parquet')
df2
shape: (10000000000, 2)
┌────────────┬───────────┐
│ row_nr     ┆ col1      │
│ ---        ┆ ---       │
│ u64        ┆ f64       │
╞════════════╪═══════════╡
│ 0          ┆ -0.212407 │
├╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌┤
│ 1          ┆ 0.923423  │
├╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌┤
│ 2          ┆ -0.199929 │
├╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌┤
│ 3          ┆ -0.358613 │
├╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌┤
│ ...        ┆ ...       │
├╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌┤
│ 9999999996 ┆ 2.465729  │
├╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌┤
│ 9999999997 ┆ -0.498744 │
├╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌┤
│ 9999999998 ┆ -0.841243 │
├╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌┤
│ 9999999999 ┆ -0.303019 │
└────────────┴───────────┘
>>> pl.__version__
'0.13.38'

I can't compare the two files using frame_equal due to #3511. But the the number of rows, and the head & tail of the dataset look equal.

sammysheep commented 2 years ago

Hi @jorgecarleitao ! Would you mind taking a look at the example run I posted above?

Questions:

@ritchie46 I think it is "fixed" in terms of it running now but if you have memory like yours (16GB) or too large a file even on a server then when you try to convert a large file you will get a failure when the OS kills your process due to OOM.

For the alternative I am trying to replace, my hunch is that Spark is chunking the data as it is read and writing to a fixed page/block size that typically won't blow up memory (more than the JVM's usual). In Polars am I basically slurping everything to memory at once and trying to do the conversion in a couple (2?) pages?

In terms of large parquet being splittable in distributed systems (Cloud, Hadoop) it looks like you'd want to be able to set a smallish (128MB) page/block size in parquet, no? If it is something that is exposed in pyarrow2 it would be great to have available in polars.

Use-case / scenarios:

ritchie46 commented 2 years ago

Is there a way to specify parquet version for compatibility reasons in pyarrow2 that might not be exposed in polars?

There is. We can/should expose that in polars.

is the memory usage for file conversion something that has to be tweaked at the pyarrow2 level, polars level, or something in the application code?

What do you mean by conversion? Polars is in-memory. So it will always read a file completely into memory (save for memory mapped ipc/feather files).

In terms of large parquet being splittable in distributed systems (Cloud, Hadoop) it looks like you'd want to be able to set a smallish (128MB) page/block size in parquet, no? If it is something that is exposed in pyarrow2 it would be great to have available in polars.

This possible for polars datasets that don't have struct data. Polars will write the chunks of your dataframe to parquet. If the dataframe is contiguous, it will write a single chunk.

I will close this issue as it has been resolved. Discussion may continue.

sammysheep commented 2 years ago

There is. We can/should expose that in polars.

Thanks, looking forward to this!

What do you mean by conversion? Polars is in-memory. So it will always read a file completely into memory (save for memory mapped ipc/feather files).

Conversion from CSV to parquet using POLARS.

I was wondering where one could set the parquet page size functionality that you added recently. If it exists, I'd play with the setting to try to get less peak memory allocated (assuming it re-uses a buffer or de-allocs after writing a page). My earlier analysis indicated that trying to convert a 28G CSV to parquet needed 150G+ of memory, which is OOM for most machines.

This possible for polars datasets that don't have struct data. Polars will write the chunks of your dataframe to parquet. If the dataframe is contiguous, it will write a single chunk.

So if I am understanding you, I have to chunk the data frame to possibly lower the page sizes being written?

Note: earlier I meant "arrow2" not "pyarrow2".

sammysheep commented 2 years ago

By the bye, I really enjoyed your talk at the Data and AI Summit 2022 this year. It was definitely one of the better ones, at least to me.

Thanks so much for your wonderful contributions to the community!