apache / arrow

Apache Arrow is a multi-language toolbox for accelerated data interchange and in-memory processing
https://arrow.apache.org/
Apache License 2.0
14.3k stars 3.48k forks source link

[Python] Parquet reader cannot read large strings #27341

Open asfimport opened 3 years ago

asfimport commented 3 years ago

When reading or writing a large parquet file, I have this error:


    df: Final = pd.read_parquet(input_file_uri, engine="pyarrow")
  File "/opt/conda/envs/condaenv/lib/python3.8/site-packages/pandas/io/parquet.py", line 459, in read_parquet
    return impl.read(
  File "/opt/conda/envs/condaenv/lib/python3.8/site-packages/pandas/io/parquet.py", line 221, in read
    return self.api.parquet.read_table(
  File "/opt/conda/envs/condaenv/lib/python3.8/site-packages/pyarrow/parquet.py", line 1638, in read_table
    return dataset.read(columns=columns, use_threads=use_threads,
  File "/opt/conda/envs/condaenv/lib/python3.8/site-packages/pyarrow/parquet.py", line 327, in read
    return self.reader.read_all(column_indices=column_indices,
  File "pyarrow/_parquet.pyx", line 1126, in pyarrow._parquet.ParquetReader.read_all
  File "pyarrow/error.pxi", line 99, in pyarrow.lib.check_status
OSError: Capacity error: BinaryBuilder cannot reserve space for more than 2147483646 child elements, got 2147483648

Isn't pyarrow supposed to support large parquets? It let me write this parquet file, but now it doesn't let me read it back. I don't understand why arrow uses 31-bit computing. It's not even 32-bit as sizes are non-negative.

This problem started after I added a string column with 2.5 billion unique rows. Each value was effectively a unique base64 encoded length 24 string. Below is code to reproduce the issue:


from base64 import urlsafe_b64encode

import numpy as np
import pandas as pd
import pyarrow as pa
import smart_open

def num_to_b64(num: int) -> str:
    return urlsafe_b64encode(num.to_bytes(16, "little")).decode()

df = pd.Series(np.arange(2_500_000_000)).apply(num_to_b64).astype("string").to_frame("s")

with smart_open.open("s3://mybucket/mydata.parquet", "wb") as output_file:
    df.to_parquet(output_file, engine="pyarrow", compression="gzip", index=False)

The dataframe is created correctly. When attempting to write it as a parquet file, the last line of the above code leads to the error:


pyarrow.lib.ArrowCapacityError: BinaryBuilder cannot reserve space for more than 2147483646 child elements, got 2500000000

Environment: pyarrow 3.0.0 / 2.0.0 pandas 1.1.5 / 1.2.1 smart_open 4.1.2 python 3.8.6 Reporter: Pac A. He

Note: This issue was originally created as ARROW-11456. Please see the migration documentation for further details.

asfimport commented 3 years ago

Antoine Pitrou / @pitrou: cc @jorisvandenbossche

asfimport commented 3 years ago

Joris Van den Bossche / @jorisvandenbossche: [~apacman] would you be able to provide a reproducible example? (eg some code that writes the parquet file)

asfimport commented 3 years ago

Pac A. He: @jorisvandenbossche This is difficult in this case because the parquet is so large. What I can say is that this issue started after I added a string column with 1.5 billion unique rows. Each value was effectively a unique base64 encoded length 22 string. I hope this helps. If you still need code, I can write a function to generate it.

asfimport commented 3 years ago

Antoine Pitrou / @pitrou: Was the Parquet file generated with Arrow?

asfimport commented 3 years ago

Pac A. He: @pitrou Yes, absolutely. I had used pandas 1.2.1 with pyarrow 2.0.0 to write it as:

df.to_parquet(output_file, engine="pyarrow", compression="gzip", index=False)

asfimport commented 3 years ago

Pac A. He: For what it's worth, fastparquet v0.5.0 had no trouble at all reading such files. That's a workaround for now, if only for Python, until this issue is resolved.

asfimport commented 3 years ago

Joris Van den Bossche / @jorisvandenbossche:

If you still need code, I can write a function to generate it.

That would help, yes.

asfimport commented 3 years ago

Pac A. He: Unfortunately I have not been able to produce a reproducible result in a simple example despite multiple attempts and experiments. I read a dataframe with 10 string columns and 2 billion rows without issue. The issue reproduces only over my actual real-world data. Nevertheless, obviously the exception traceback and the error message could still be indicative of what's causing it. Having 31-bit limit makes no sense to me.

asfimport commented 3 years ago

Weston Pace / @westonpace: The 31 bit limit you are referencing is not the 31 bit limit that is at play here and not really relevant.  There is another 31 bit limit that has to do with how arrow stores strings.  Parquet does not need to support random access of strings.  The way it stores byte arrays & byte array lengths does not support random access.  You could not fetch the ith string of a parquet encoded utf8 byte array.

Arrow does need to support this use case.  It stores strings using two arrays.  The first is an array of offsets.  The second is an array of bytes.  To fetch the ith string Arrow will look up offsets[i] and offsets[i+1] to determine the range that needs to be fetched from the array of bytes.

There are two string types in Arrow, "string" and "large_string".  The "string" data type uses 4 byte signed integer offsets while the "large_string" data type uses 8 byte signed integer offsets.  So it is not possible to create a "string" array with data containing more than 2 billion bytes.

Now, this is not normally a problem.  Arrow can fall back to a chunked array (which is why the 31 bit limit you reference isn't always such an issue).


>>> import pyarrow as pa
>>> x = '0' * 1024
>>> y = [x] * (1024 * 1024 * 2)
>>> len(y)
2097152 // # of strings
>>> len(y) * 1024
2147483648 // # of bytes
>>> a = pa.array(y)
>>> len(a.chunks)
2
>>> len(a.chunks[0])
2097151
>>> len(a.chunks[1])
1

However, it does seem that, if there are 2 billion strings (as opposed to just 2 billion bytes), the chunked array fallback is not applying.


>>> x = '0' * 8
>>> y = [x] * (1024 * 1024 * 1024 * 2)
>>> len(y)
2147483648
>>> len(y) * 8
17179869184
>>> a = pa.array(y)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "pyarrow\array.pxi", line 296, in pyarrow.lib.array
  File "pyarrow\array.pxi", line 39, in pyarrow.lib._sequence_to_array
  File "pyarrow\error.pxi", line 122, in pyarrow.lib.pyarrow_internal_check_status
  File "pyarrow\error.pxi", line 109, in pyarrow.lib.check_status
pyarrow.lib.ArrowCapacityError: BinaryBuilder cannot reserve space for more than 2147483646 child elements, got 2147483648

This "should" be representable using a chunked array with two chunks.  It is possible this is the source of your issue.  Or maybe when reading from parquet the "fallback to chunked array" logic simply doesn't apply.  I don't know the parquet code well enough.  That is one of the reasons it would be helpful to have a reproducible test.

It also might be easier to just write your parquet out to multiple files or multiple row groups.  Both of these approaches should not only avoid this issue but also reduce the memory pressure when you are converting to pandas.

asfimport commented 3 years ago

Pac A. He: I see. I have now added code to reproduce the issue. Basically, when I attempt to write a parquet file from a pandas dataframe having 2.5 billion unique string rows in a column, I have the error. Due to the large size of the dataframe, it will be memory and time intensive to test.

asfimport commented 3 years ago

Antoine Pitrou / @pitrou: Thanks for the reproducer [~apacman] . Unfortunately, even 48 GB RAM is not enough to run it.

I tried to write another reproducer:


import numpy as np
import pandas as pd

import pyarrow as pa

df = pd.Series(["x" * 2_500_000_000]).astype("string").to_frame("s")

out = pa.BufferOutputStream()
df.to_parquet(out, engine="pyarrow", compression="lz4", index=False)

However, it fails a bit differently, so I'm not sure it's the same issue:


Traceback (most recent call last):
  File "../bug_11456.py", line 15, in <module>
    df.to_parquet(out, engine="pyarrow", compression="lz4", index=False)
[...]
  File "/home/antoine/miniconda3/envs/pyarrow/lib/python3.7/site-packages/pandas/core/arrays/string_.py", line 250, in __arrow_array__
    return pa.array(values, type=type, from_pandas=True)
  File "pyarrow/array.pxi", line 301, in pyarrow.lib.array
    return _ndarray_to_array(values, mask, type, c_from_pandas, safe,
  File "pyarrow/array.pxi", line 83, in pyarrow.lib._ndarray_to_array
    check_status(NdarrayToArrow(pool, values, mask, from_pandas,
  File "pyarrow/error.pxi", line 109, in pyarrow.lib.check_status
    raise ArrowCapacityError(message)
pyarrow.lib.ArrowCapacityError: array cannot contain more than 2147483646 bytes, have 2500000000
asfimport commented 3 years ago

Pac A. He: We have seen that there are one or more pyarrow limits at 2147483646 (about 2^31) bytes and rows for a column. As a user I request this limit be increased to be somewhat closer to 2^64, so the downstream packages, e.g. pandas, etc., work transparently. It is unreasonable to ask me to write partitioned parquets given that fastparquet has no trouble writing a large parquet, so it's definitely technically feasible.

asfimport commented 3 years ago

Antoine Pitrou / @pitrou: Yeah, well, the first question is at which layer the error occurs. According to my reproducer, it may be during Pandas->Arrow conversion. But your reproducer is different...

Note that you may be able to do the conversion manually and force a Arrow large_string type, though I'm not sure Pandas allows that. I'll let @jorisvandenbossche comment on this.

asfimport commented 3 years ago

Joris Van den Bossche / @jorisvandenbossche:

Note that you may be able to do the conversion manually and force a Arrow large_string type, though I'm not sure Pandas allows that.

Yes, pandas allows that by specifying a pyarrow schema manually (instead of letting pyarrow infer that from the dataframe).

For the example above, that would look like:


df.to_parquet(out, engine="pyarrow", compression="lz4", index=False, schema=pa.schema([("s", pa.large_string())]))

[~apacman] does that help as a work-around?