apache / arrow

Apache Arrow is a multi-language toolbox for accelerated data interchange and in-memory processing
https://arrow.apache.org/
Apache License 2.0
14.36k stars 3.49k forks source link

[C++][Python] Large strings cause ArrowInvalid: offset overflow while concatenating arrays #33049

Open asfimport opened 2 years ago

asfimport commented 2 years ago

When working with medium-sized datasets that have very long strings, arrow fails when trying to operate on the strings. The root is the combine_chunks function.

Here is a minimally reproducible example

import numpy as np
import pyarrow as pa

# Create a large string
x = str(np.random.randint(low=0,high=1000, size=(30000,)).tolist())
t = pa.chunked_array([x]*20_000)
# Combine the chunks into large string array - fails
combined = t.combine_chunks()

I get the following error

--------------------------------------------------------------------------- ArrowInvalid Traceback (most recent call last) /var/folders/x6/00594j4s2yv3swcn98bn8gxr0000gn/T/ipykernel_95780/4128956270.py in <module> ----> 1 z=t.combine_chunks()
~/.venv/lib/python3.7/site-packages/pyarrow/table.pxi in pyarrow.lib.ChunkedArray.combine_chunks() 
~/.venv/lib/python3.7/site-packages/pyarrow/array.pxi in pyarrow.lib.concat_arrays() ~/Documents/Github/dataquality/.venv/lib/python3.7/site-packages/pyarrow/error.pxi in pyarrow.lib.pyarrow_internal_check_status() ~.venv/lib/python3.7/site-packages/pyarrow/error.pxi in pyarrow.lib.check_status() 
ArrowInvalid: offset overflow while concatenating arrays 

With smaller strings or smaller arrays this works fine.

x = str(np.random.randint(low=0,high=1000, size=(10,)).tolist())
t = pa.chunked_array([x]*1000)
combined = t.combine_chunks()

The first example that fails takes a few minutes to run. If you'd like a faster example for experimentation, you can use vaex to generate the chunked array much faster. This will throw the identical error and will run about 1 second.

import vaex
import numpy as np

n = 50_000
x = str(np.random.randint(low=0,high=1000, size=(30_000,)).tolist())
df = vaex.from_arrays(
    id=list(range(n)),
    y=np.random.randint(low=0,high=1000,size=n)
)
df["text"] = vaex.vconstant(x, len(df))
# text_chunk_array is now a pyarrow.lib.ChunkedArray
text_chunk_array = df.text.values
x = text_chunk_array.combine_chunks() 

Reporter: Ben Epstein

Related issues:

Note: This issue was originally created as ARROW-17828. Please see the migration documentation for further details.

asfimport commented 2 years ago

David Li / @lidavidm: It's because Arrow string arrays have a 2 GiB limit due to the representation (each element is represented by 32-bit signed offsets into a backing buffer); you can cast to large_string first to avoid this (this uses 64-bit offsets) but I think the error here could be much improved (it should at least tell you what to do)

asfimport commented 2 years ago

Ben Epstein: @lidavidm thanks for the reply!

 

Is there a way for me to check this before casting? Ie instead of trying, catching the failure, casting, and trying again, is there a (fast) computation I can run to see if i'm over the size limit?

asfimport commented 2 years ago

David Li / @lidavidm: Hmm, off the top of my head, there isn't such a helper; you could do it by adding up lengths of the buffers but it'd be a bit annoying to account for the offsets.

Array.nbytes is an overapproximation but you could sum that up for all the chunks (overapproximation because it includes the space taken up by the validity buffer and the offsets themselves)

asfimport commented 2 years ago

David Li / @lidavidm: For posterity: labeling good-first-issue in case someone wants to improve the error message here, also linked some related issues

asfimport commented 2 years ago

David Li / @lidavidm: It's also possible we could convert from string to large_string for you; not sure which behavior would be more surprising to people

asfimport commented 2 years ago

Ben Epstein: I (personally) would prefer you convert to large string and emit a warning (converting string to large_string as column XX is over the arrow string size limit).

 

I'm using vaex for all my arrow work, so using your suggestion here's how i'm handling it (in cast others find themselves here)


import pyarrow as pa
import vaex
import numpy as np
from vaex.dataframe import DataFrame

n = 50_000
x = str(np.random.randint(low=0,high=1000, size=(30_000,)).tolist())
# Create a df with a string too large
df = vaex.from_arrays(
    id=list(range(n)), 
    y=np.random.randint(low=0,high=1000,size=n)
)
df["text"] = vaex.vconstant(x, len(df))

# byte limit for arrow strings
# because 1 character = 1 byte, the total number of characters in the 
# column in question must be less than the size_limit
size_limit = 2*1e9
def validate_str_cols(df: DataFrame) -> DataFrame:
    for col, dtype in zip(df.get_column_names(), df.dtypes):
        if dtype == str and df[col].str.len().sum() >= size_limit:
            df[col] = df[col].to_arrow().cast(pa.large_string())
    return df

# text is type string
print(df.dtypes)
df = validate_str_cols(df)
# test is type large_string
print(df.dtypes)
# works!
y = df.text.values.combine_chunks()
asfimport commented 2 years ago

David Li / @lidavidm: That might be reasonable, though I'm not sure if we can emit a Python warning in C++ (would be an interesting thing to do, and would probably be useful elsewhere). Maybe @jorisvandenbossche has opinions.

In general the large_X vs X type handling could use some consideration, I think

pramodsatya commented 1 year ago

Hi @asfimport, I would like to work on this issue. Could you please assign it to me?

kou commented 1 year ago

Could you add a comment that contains only "take" like https://github.com/apache/arrow/issues/33849#issuecomment-1423372616 to here? See also: https://arrow.apache.org/docs/dev/developers/bug_reports.html#issue-assignment

leprechaunt33 commented 1 year ago

Current work around I've developed for vaex in general with this pyarrow related error on dataframes for which the technique mentioned above does not work (for materialisation of pandas array from a joined multi file data frame where I was unable to set the arrow data type on the column):

pramodsatya commented 1 year ago

take

Ben-Epstein commented 1 year ago

@leprechaunt33 I was working on a PR in vaex to remove the use of .take because in fact that is the issue coming from arrow (huggingface datasets faced the same issue). Details are here https://github.com/vaexio/vaex/pull/2336

See the related issue referenced in the PR for more details

leprechaunt33 commented 1 year ago

@Ben-Epstein thanks for the heads up. I'm still digesting the code. In my use case this workaround had been successful but only fully after ensuring the iterator was always chunked.
image Not the most efficient, but... the only reason I mention this is: The bug for me arose with a single string column of a multiple hdf5 (33 files) dataset of max length 32000 (but in reality much shorter). I observed the iterator above fail when attempting to combine the chunks in the single less than 100 record filtered dataset. Given the iterator is on a filtered data set which had had a df.take done on unfiltered indices, I wonder if there might be more going on than just a take issue (unless that memory explosion is really that bad). I'll have a closer look at where mine was failing some time this week and see if those changes fix things.

Ben-Epstein commented 1 year ago

@leprechaunt33 thanks for checking back in. Since this same issue happened in huggingface datasets, it would make me believe that it really is the .take explosion.

That's certainly an interesting workaround, but not one I can use in production :)

leprechaunt33 commented 1 year ago

@Ben-Epstein yup its really only a workaround that's applicable to cases where you need the data in memory after filtering, and since vaex is meant to be lazy execution memory map as required a lot of folks will be encountering the problem in other contexts. Its certainly possible its a take explosion; the context in this case is that I'm working with filterset resulting from a df.take on an already potentially filtered set of indices of rows identified as having matching report ids due to the lack of indexing and a many to one record relationship between two tables that prevents a join from being used. Column failing are free text reports relating to leiomyosarcoma cases, so there's less than 100 scattered throughout this table of >2M reports that get filtered via a regex query on a MedDRA term. Its possible the take is being multiplied across different tables/arrays from the different hdf5 files in the dataset multiplied by the separate chunks of those files and just creating a polynomial complexity, but I'm not familiar enough yet with the vaex internals to confirm that. As you figured out there the Dataframe take vs arrow take and the code complexity makes it a little challenging to debug. I'll be able to look more under the hood of whats going on in a couple of days.

leprechaunt33 commented 1 year ago

I managed to get to this a bit earlier and it definitely looks like its a take issue. I printed out the indices and sizes of data structures for calls to pyarrow take. I see this for the column in question (details are sys.getsizeof data, len(indices) and the indices themselves. 2175981410 16 [ 0, 566984, 568042, 987100, 1021224, 1082097, 1097740, 1499272, 1505009, 1537374, 1598404, 1749420, 1818868, 1890281, 1890379, 1893484] 1444605469 10 [ 0, 566984, 568042, 987100, 1021224, 1082097, 1097740, 1499272, 1505009, 1537374] 600036404 6 [ 0, 151016, 220464, 291877, 291975, 295080 ]

The first of the 3 above is the column without using an iterator, the second and third is using an iterator with chunk size (number of rows) 10. If I'm reading things right here, it is the fact that the data structure being taken from has hit the limit, not the size of the data being collated. To be clear its not the size of the filtered dataset but the size of the unfiltered data set or chunk thereof that is being taken from to obtain the filtered rows. I also ran a memory_profile on the same code and determined both the iterator and the to_pandas_df both cause allocation of almost identical amounts of memory (around 165MB, hardly a memory issue).

@Ben-Epstein just did a double take on those numbers, that looks like a little more than garbage collection above the 2GB limit so it could be some kind of segmentation issue with indexing the string chunks that close to the 2GB limit. Not sure whether it would be necessary or not to limit chunk size allocation in vaex to 2GB along with the slice to fully fix the problem... something to consult the code about I guess :)

Will watch with interest.

Ben-Epstein commented 1 year ago

I'll ping @maartenbreddels to get deeper opinion

leprechaunt33 commented 1 year ago

This at least hopefully gives a minimal reproducible example for testing that is database independent, and I assume this would happen with any hdf5 with chunks that big. Create an hdf5 with a single text field of arbitrary lengths that at least totals over 2GB and then try to combine chunks from a limited number rows from the start to the end of that text field spaced evenly. Should be able to trigger the exception that way

maartenbreddels commented 1 year ago

I think there are 2 issues being reported here right?

  1. The one that matches the original topic: no auto string->large_string casting in combine_chunks.
  2. .take on chunked arrays is very slow/memory consuming. IIRC arrowcpp does a combine_chunks first, which leads to a memory explosion in many vaex use cases.

I believe number 2 is already reported before in a different issue.

leprechaunt33 commented 1 year ago

@maartenbreddels it seems from my testing that this may be a third issue also related to take, which only occurs when vaex is forced to do a df.take on rows which contain a string column whose unfiltered in memory representation is larger than 2GB. For example, I have been able to consistently reproduce the bug with the leiiomyosarcoma data set (16 indices above) but not for juvenile polymyositis, which generates indices at the start of the data set (indices 0, 44726, 225143 and subsequently the full 2GB of data is not memory mapped). Wherever I've been able to consistently reproduce the problem, vaex has been attempting a take (df.take in this case which turns into a column take) on high row indices which has caused vaex to read in more than 2GB of data and the exception occurs precisely after the lazy execution attempts to take from that large memory mapped column.

I have not noted a speed issue or memory explosion in these cases (testing on the 16 index case above only allocated 165MB), but the ArrowInvalid is consistently yielded. It should be fairly simple to test this by generating a hdf5 data set with arbitrary length string columns of at least 2GB+ in total combined length and doing a df take on high indices or a small number of indices which span the column. The error is then yielded when attempting to access the data in that column (for example for conversion to pandas)

Edit again: to be clear, the size of the data being combined does not seem to matter in these instances but rather the index within the column. It could be tangentially related to the original issue, but attempting the explicit column type conversion does not resolve the problem, only using an iterator to ensure the filtered chunks are less than 2GB resolves it.

Edit again: On further reflection the root cause is similar as its failing in the _compute/take function and the cause is vaex being convinced to generate a chunked array of that size in order to take from it. But there's no large data set involved, the chunked array is generated indirectly and is combining small (typically 0-5000 character) strings.

leprechaunt33 commented 1 year ago

It looks like the simplest way looking at the api of not triggering it to read in such large chunks in the meantime is going to be to nbytes() the unfiltered data frame and split the filtered one up into suitable chunk sizes (shl 31 plus one) to operate on, assuming sufficient rows. shl 30 plus one would probably give the most optimal. Or some other more clever use of the into parameter where the take indices and unfiltered length are known.

wjones127 commented 1 year ago

which only occurs when vaex is forced to do a df.take on rows which contain a string column whose unfiltered in memory representation is larger than 2GB.

That sounds like #25822

leprechaunt33 commented 1 year ago

which only occurs when vaex is forced to do a df.take on rows which contain a string column whose unfiltered in memory representation is larger than 2GB.

That sounds like #25822

@wjones127 yup that looks a lot more like the bug in question! perhaps my confusion here has come from the fact that the same exception is being thrown for multiple different reasons (all of them likely connected to the 32 bit limitations). Is this just a take kernel thing as the title suggests or would it also impact slicing (which is mentioned in the PRL mentioned above as a zero copy alternative)? It looked to me very much like a 32 bit pointer limitation issue.

wjones127 commented 1 year ago

It looked to me very much like a 32 bit pointer limitation issue.

Yes, it essentially is. We use 32-bit offsets for the default string and binary types. This means you always have to be careful when generating the arrays to chunk them if they get too large. Or otherwise use the LargeString variant, which uses 64-bit offsets. Polars, for example, takes the latter approach; they always use LargeString.

Basically the bug with take and concatenation is that we aren't automatically handling the chunking for you. But you are right that essentially the same issue shows up in multiple places.