apache / arrow

Apache Arrow is a multi-language toolbox for accelerated data interchange and in-memory processing
https://arrow.apache.org/
Apache License 2.0
13.89k stars 3.38k forks source link

[C++] Take kernel can't handle ChunkedArrays that don't fit in an Array #25822

Open asfimport opened 3 years ago

asfimport commented 3 years ago

Take() currently concatenates ChunkedArrays first. However, this breaks down when calling Take() from a ChunkedArray or Table where concatenating the arrays would result in an array that's too large. While inconvenient to implement, it would be useful if this case were handled.

This could be done as a higher-level wrapper around Take(), perhaps.

Example in Python:


>>> import pyarrow as pa
>>> pa.__version__
'1.0.0'
>>> rb1 = pa.RecordBatch.from_arrays([["a" * 2**30]], names=["a"])
>>> rb2 = pa.RecordBatch.from_arrays([["b" * 2**30]], names=["a"])
>>> table = pa.Table.from_batches([rb1, rb2], schema=rb1.schema)
>>> table.take([1, 0])
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "pyarrow/table.pxi", line 1145, in pyarrow.lib.Table.take
  File "/home/lidavidm/Code/twosigma/arrow/venv/lib/python3.8/site-packages/pyarrow/compute.py", line 268, in take
    return call_function('take', [data, indices], options)
  File "pyarrow/_compute.pyx", line 298, in pyarrow._compute.call_function
  File "pyarrow/_compute.pyx", line 192, in pyarrow._compute.Function.call
  File "pyarrow/error.pxi", line 122, in pyarrow.lib.pyarrow_internal_check_status
  File "pyarrow/error.pxi", line 84, in pyarrow.lib.check_status
pyarrow.lib.ArrowInvalid: offset overflow while concatenating arrays

In this example, it would be useful if Take() or a higher-level wrapper could generate multiple record batches as output.

Reporter: Will Jones / @wjones127 Assignee: Will Jones / @wjones127

Related issues:

PRs and other links:

Note: This issue was originally created as ARROW-9773. Please see the migration documentation for further details.

asfimport commented 3 years ago

Antoine Pitrou / @pitrou: Note that this can happen with regular arrays too:


>>> import pyarrow as pa
>>> arr = pa.array(["x" * (1<<20)])
>>> t = arr.take(pa.array([0]*((1<<12) + 1), type=pa.int8()))
>>> t.validate(full=True)
Traceback (most recent call last):
  [...]
ArrowInvalid: Offset invariant failure: non-monotonic offset at slot 2048: -2147483648 < 2146435072
asfimport commented 3 years ago

Leonard Lausen / @leezu: There is a similar issue with large tables (many rows) of medium size lists (~512 elements per list). When using pa.list_ type, take will fail due to offset overflow while concatenating arrays. Using pa.large_list works. (But in practice it doesn't help as .take performs 3 orders of magnitude (~1s vs ~1ms) slower than indexing operations on pandas..)

asfimport commented 2 years ago

Chris Fregly: Seeing this error through Ray 1.13 when I run the following code:

import ray

ray.init(address="auto")

df = ray.data.read_parquet(" [s3://amazon-reviews-pds/parquet/] ")

print(df.groupby("product_category").count())  

Here's the error: (_partition_and_combine_block pid=1933) 2022-05-06 20:51:29,275 INFO worker.py:431 – Task failed with retryable exception: TaskID(7f0166b85ffd7f1fffffffffffffffffffffffff01000000). (_partition_and_combine_block pid=1933) Traceback (most recent call last): (_partition_and_combine_block pid=1933) File "python/ray/_raylet.pyx", line 625, in ray._raylet.execute_task (_partition_and_combine_block pid=1933) File "python/ray/_raylet.pyx", line 629, in ray._raylet.execute_task (_partition_and_combine_block pid=1933) File "/home/ray/anaconda3/lib/python3.7/site-packages/ray/data/grouped_dataset.py", line 436, in _partition_and_combine_block (_partition_and_combine_block pid=1933) descending=False) (_partition_and_combine_block pid=1933) File "/home/ray/anaconda3/lib/python3.7/site-packages/ray/data/impl/arrow_block.py", line 308, in sort_and_partition (_partition_and_combine_block pid=1933) table = self._table.take(indices) (_partition_and_combine_block pid=1933) File "pyarrow/table.pxi", line 1382, in pyarrow.lib.Table.take (_partition_and_combine_block pid=1933) File "/home/ray/anaconda3/lib/python3.7/site-packages/pyarrow/compute.py", line 625, in take (_partition_and_combine_block pid=1933) return call_function('take', [data, indices], options, memory_pool) (_partition_and_combine_block pid=1933) File "pyarrow/_compute.pyx", line 528, in pyarrow._compute.call_function (_partition_and_combine_block pid=1933) File "pyarrow/_compute.pyx", line 327, in pyarrow._compute.Function.call (_partition_and_combine_block pid=1933) File "pyarrow/error.pxi", line 143, in pyarrow.lib.pyarrow_internal_check_status (_partition_and_combine_block pid=1933) File "pyarrow/error.pxi", line 99, in pyarrow.lib.check_status (_partition_and_combine_block pid=1933) pyarrow.lib.ArrowInvalid: offset overflow while concatenating arrays GroupBy Map: 100%|████████████████████████████████| 200/200 [01:31<00:00, 2.18it/s] GroupBy Reduce: 100%|██████████████████████████| 200/200 [00:00<00:00, 19776.52it/s] Traceback (most recent call last): File "/home/ray/parquet-raydata.py", line 10, in print(df.groupby("product_category").count().sort()) File "/home/ray/anaconda3/lib/python3.7/site-packages/ray/data/grouped_dataset.py", line 147, in count return self.aggregate(Count()) File "/home/ray/anaconda3/lib/python3.7/site-packages/ray/data/grouped_dataset.py", line 114, in aggregate metadata = ray.get(metadata) File "/home/ray/anaconda3/lib/python3.7/site-packages/ray/_private/client_mode_hook.py", line 105, in wrapper return func(*args, **kwargs) File "/home/ray/anaconda3/lib/python3.7/site-packages/ray/worker.py", line 1713, in get raise value.as_instanceof_cause() ray.exceptions.RayTaskError(ArrowInvalid): ray::_aggregate_combined_blocks() (pid=27147, ip=172.31.14.160) At least one of the input arguments for this task could not be computed: ray.exceptions.RayTaskError: ray::_partition_and_combine_block() (pid=1930, ip=172.31.14.160) File "/home/ray/anaconda3/lib/python3.7/site-packages/ray/data/grouped_dataset.py", line 436, in _partition_and_combine_block descending=False) File "/home/ray/anaconda3/lib/python3.7/site-packages/ray/data/impl/arrow_block.py", line 308, in sort_and_partition table = self._table.take(indices) File "pyarrow/table.pxi", line 1382, in pyarrow.lib.Table.take File "/home/ray/anaconda3/lib/python3.7/site-packages/pyarrow/compute.py", line 625, in take return call_function('take', [data, indices], options, memory_pool) File "pyarrow/_compute.pyx", line 528, in pyarrow._compute.call_function File "pyarrow/_compute.pyx", line 327, in pyarrow._compute.Function.call File "pyarrow/error.pxi", line 143, in pyarrow.lib.pyarrow_internal_check_status File "pyarrow/error.pxi", line 99, in pyarrow.lib.check_status pyarrow.lib.ArrowInvalid: offset overflow while concatenating arrays

asfimport commented 2 years ago

Mayur Srivastava / @mayuropensource: Hi @lidavidm , is there any progress on this jira? (This issue is blocking a few use cases we have.)

 

Thanks,

Mayur Srivastava

asfimport commented 2 years ago

David Li / @lidavidm: It needs someone motivated to sit down and work through the implementation. I can review/offer suggestions but probably don't have the time to implement this right now.

Note that I think the cases described in the comments above are fundamentally different from the original issue: they also require upgrading the output from Array to ChunkedArray (or from String/List to LargeString/LargeList) and so can't be done automatically.

asfimport commented 2 years ago

Will Jones / @wjones127: I'm interested in working on this soon. I'll look through the issue a little deeper and ping you @lidavidm to get some ideas on the design.

asfimport commented 2 years ago

David Li / @lidavidm: @wjones127 great! Looking forward to it.

asfimport commented 2 years ago

Will Jones / @wjones127: I've looked through the code and I think there are three related issues. I'll try to describe them here. If you think I am missing some case, let me know. Otherwise, I'll open three sub-tasks and start work on those.

Problem 1: We concatenate when we shouldn't need to

This fails:


arr = pa.chunked_array([["a" * 2**30]] * 2)
arr.take([0,1])
# Traceback (most recent call last):
#   File "<stdin>", line 1, in <module>
#   File "pyarrow/table.pxi", line 998, in pyarrow.lib.ChunkedArray.take
#   File "/Users/willjones/Documents/test-env/venv/lib/python3.9/site-packages/pyarrow/compute.py", line 457, in take
#     return call_function('take', [data, indices], options, memory_pool)
#   File "pyarrow/_compute.pyx", line 542, in pyarrow._compute.call_function
#   File "pyarrow/_compute.pyx", line 341, in pyarrow._compute.Function.call
#   File "pyarrow/error.pxi", line 144, in pyarrow.lib.pyarrow_internal_check_status
#   File "pyarrow/error.pxi", line 100, in pyarrow.lib.check_status
# pyarrow.lib.ArrowInvalid: offset overflow while concatenating arrays

 because we concatenate input values here. If that were corrected, it would then fail on the concatenation here if the indices were a chunked array.

The first concatenation could be avoided somewhat easily in special cases (sorted / fall in same chunk), which was partially implement in R. For the general case, we'd need to address this within the kernel rather than within pre-processing (see Problem 3).

The second concatenation shouldn't always be eliminated, but we might want to add a check to validate that there is enough room in offset buffers of arrays to concatenate. TBD if there is an efficient way to test that.

Problem 2: take_array kernel doesn't handle case of offset overflow

This is what Antoine was pointing out:


import pyarrow as pa
arr = pa.array(["x" * (1<<20)])
t = arr.take(pa.array([0]*((1<<12) + 1), type=pa.int8()))
t.validate(full=True)
# Traceback (most recent call last):
#   [...]
# ArrowInvalid: Offset invariant failure: non-monotonic offset at slot 2048: -2147483648 < 2146435072

To solve this, I think we'd either have to:

  1. (optionally?) promote arrays to Large variants of type. Problem is we'd need to do this cast consistently across chunks.
  2. Switch to returning chunked arrays, and create new chunks as needed. (TBD: Could we do that in some cases (String, Binary, List types) and not others?)

    Problem 3: there isn't a take_array kernel that handles ChunkedArrays

    Finally, for sorting chunked arrays of type string/binary/list (that is, the case for take where the indices are out-of-order), I think we need to implement kernels specialized for chunked arrays. IIUC, everything but string/binary/list types could simply do the concatenation we do now; it's just those three types that need special logic to chunk as necessary to avoid offset overflows.

     

     

asfimport commented 2 years ago

Antoine Pitrou / @pitrou: Ok, it's a bit unfortunate that several distinct issues have been amalgamated here :-)

I'd argue that this issue is primarily about fixing problem 3 (which would also fix problem 1). Besides correctness, concatenating is also a performance problem because we might be allocating a lot of temporary memory.

asfimport commented 2 years ago

David Li / @lidavidm: Agreed, we should focus on 1/3. Problem 2 is also interesting, but I'm not sure how best to handle it: right now the kernels infrastructure assumes a fixed output type and shape up front, and dynamically switching to ChunkedArray or promoting type would be a surprise.

I would think we could avoid concatenation for all types, even if it isn't strictly required, to avoid excessive allocation as Antoine mentioned.

liujiajun commented 6 months ago

Hi! Issue 3 is bothering us a lot when sorting huge tables. Do we have any updates on this?

felipecrv commented 2 months ago

Hi! Issue 3 is bothering us a lot when sorting huge tables. Do we have any updates on this?

I'm working on this.