apache / arrow

Apache Arrow is a multi-language toolbox for accelerated data interchange and in-memory processing
https://arrow.apache.org/
Apache License 2.0
13.88k stars 3.38k forks source link

join_asof out-of-order error for big sorted tables #41706

Open bepec opened 1 month ago

bepec commented 1 month ago

Describe the usage question you have. Please include as many useful details as possible.

With pyarrow 16.0.0, I can't apply join_asof although the input tables are ordered by "on" key. Noticed when trying to merge bigger sorted tables - for example, it fails for tables with rows numbers 1061753 & 994046, but can be executed if I reduce numbers to 1048178 & 975257.

I think this behavior can be reproduced with an example below:

import numpy as np
ts0 = 0
nticks = 2_000_000 # it's OK for nticks = 1_000_000
ncats = 10
ticks = np.arange(ts0, ts0 + nticks)
cats = np.arange(0, ncats).repeat(nticks/ncats)
t1 = pa.Table.from_pydict({"ts": ticks, "cats": cats})
t2 = pa.Table.from_pydict({"ts": ticks, "cats": cats})
t1.join_asof(t2, on="ts", tolerance=-10, by="cats")

# Last line fails with error:
---------------------------------------------------------------------------
ArrowInvalid                              Traceback (most recent call last)
Cell In[273], line 10
      8 t1 = pa.Table.from_pydict({"ts": ticks, "cats": cats})
      9 t2 = pa.Table.from_pydict({"ts": ticks, "cats": cats})
---> 10 t1.join_asof(t2, on="ts", tolerance=-10, by="cats")

File /lib/python3.10/site-packages/pyarrow/table.pxi:5528, in pyarrow.lib.Table.join_asof()

File /lib/python3.10/site-packages/pyarrow/acero.py:333, in _perform_join_asof(left_operand, left_on, left_by, right_operand, right_on, right_by, tolerance, use_threads, output_type)
    326 join_opts = AsofJoinNodeOptions(
    327     left_on, left_by, right_on, right_by, tolerance
    328 )
    329 decl = Declaration(
    330     "asofjoin", options=join_opts, inputs=[left_source, right_source]
    331 )
--> 333 result_table = decl.to_table(use_threads=use_threads)
    335 if output_type == Table:
    336     return result_table

File /lib/python3.10/site-packages/pyarrow/_acero.pyx:590, in pyarrow._acero.Declaration.to_table()

File /lib/python3.10/site-packages/pyarrow/error.pxi:154, in pyarrow.lib.pyarrow_internal_check_status()

File /lib/python3.10/site-packages/pyarrow/error.pxi:91, in pyarrow.lib.check_status()

ArrowInvalid: AsofJoin does not allow out-of-order on-key values

So I suspect the issue has nothing to do with the on-key values order, but rather the input size? Is it the bug that can be fixed or some fundamental limitation? Is there any workaround other than limiting input size?

Component(s)

Python

bepec commented 1 month ago

I was suspecting if pre-joining with by-key may cause the on-key reordering? As discussed here: https://lists.apache.org/list?user@arrow.apache.org:2024-4:join However, the above example still fails even with empty by-key: t1.join_asof(t2, on="ts", tolerance=-10, by=[])

0x26res commented 1 month ago

I have a similar issue with a smaller table.

It only happens if I have a lot of small chunks in the table.

Here's an example:

import pyarrow as pa
import pytest
from pandas import Timestamp

LEFT = [
    {"left_on": Timestamp("2023-09-07 12:00:00+0000", tz="UTC"), "left_by": "SYM1"},
    {"left_on": Timestamp("2023-09-07 12:15:00+0000", tz="UTC"), "left_by": "SYM1"},
    {"left_on": Timestamp("2023-09-07 12:30:00+0000", tz="UTC"), "left_by": "SYM1"},
    {"left_on": Timestamp("2023-09-07 12:45:00+0000", tz="UTC"), "left_by": "SYM1"},
    {"left_on": Timestamp("2023-09-07 13:00:00+0000", tz="UTC"), "left_by": "SYM1"},
    {"left_on": Timestamp("2023-09-07 13:15:00+0000", tz="UTC"), "left_by": "SYM1"},
    {"left_on": Timestamp("2023-09-07 13:30:00+0000", tz="UTC"), "left_by": "SYM1"},
    {"left_on": Timestamp("2023-09-07 13:45:00+0000", tz="UTC"), "left_by": "SYM1"},
    {"left_on": Timestamp("2023-09-07 14:00:00+0000", tz="UTC"), "left_by": "SYM1"},
    {"left_on": Timestamp("2023-09-07 14:15:00+0000", tz="UTC"), "left_by": "SYM1"},
    {"left_on": Timestamp("2023-09-07 14:30:00+0000", tz="UTC"), "left_by": "SYM1"},
    {"left_on": Timestamp("2023-09-07 14:45:00+0000", tz="UTC"), "left_by": "SYM1"},
    {"left_on": Timestamp("2023-09-07 15:00:00+0000", tz="UTC"), "left_by": "SYM1"},
    {"left_on": Timestamp("2023-09-07 15:15:00+0000", tz="UTC"), "left_by": "SYM1"},
    {"left_on": Timestamp("2023-09-07 15:30:00+0000", tz="UTC"), "left_by": "SYM1"},
    {"left_on": Timestamp("2023-09-07 15:45:00+0000", tz="UTC"), "left_by": "SYM1"},
    {"left_on": Timestamp("2023-09-07 16:00:00+0000", tz="UTC"), "left_by": "SYM1"},
    {"left_on": Timestamp("2023-09-07 16:15:00+0000", tz="UTC"), "left_by": "SYM1"},
    {"left_on": Timestamp("2023-09-07 16:30:00+0000", tz="UTC"), "left_by": "SYM1"},
    {"left_on": Timestamp("2023-09-07 16:45:00+0000", tz="UTC"), "left_by": "SYM1"},
    {"left_on": Timestamp("2023-09-07 17:00:00+0000", tz="UTC"), "left_by": "SYM1"},
    {"left_on": Timestamp("2023-09-07 17:15:00+0000", tz="UTC"), "left_by": "SYM1"},
    {"left_on": Timestamp("2023-09-07 17:30:00+0000", tz="UTC"), "left_by": "SYM1"},
    {"left_on": Timestamp("2023-09-07 17:45:00+0000", tz="UTC"), "left_by": "SYM1"},
    {"left_on": Timestamp("2023-09-07 18:00:00+0000", tz="UTC"), "left_by": "SYM1"},
    {"left_on": Timestamp("2023-09-07 18:15:00+0000", tz="UTC"), "left_by": "SYM1"},
    {"left_on": Timestamp("2023-09-07 18:30:00+0000", tz="UTC"), "left_by": "SYM1"},
    {"left_on": Timestamp("2023-09-07 18:45:00+0000", tz="UTC"), "left_by": "SYM1"},
    {"left_on": Timestamp("2023-09-07 19:00:00+0000", tz="UTC"), "left_by": "SYM1"},
    {"left_on": Timestamp("2023-09-07 19:15:00+0000", tz="UTC"), "left_by": "SYM1"},
    {"left_on": Timestamp("2023-09-07 19:30:00+0000", tz="UTC"), "left_by": "SYM1"},
    {"left_on": Timestamp("2023-09-07 19:45:00+0000", tz="UTC"), "left_by": "SYM1"},
    {"left_on": Timestamp("2023-09-07 20:00:00+0000", tz="UTC"), "left_by": "SYM1"},
    {"left_on": Timestamp("2023-09-07 20:15:00+0000", tz="UTC"), "left_by": "SYM1"},
    {"left_on": Timestamp("2023-09-07 20:30:00+0000", tz="UTC"), "left_by": "SYM1"},
    {"left_on": Timestamp("2023-09-07 20:45:00+0000", tz="UTC"), "left_by": "SYM1"},
    {"left_on": Timestamp("2023-09-07 21:00:00+0000", tz="UTC"), "left_by": "SYM1"},
]
RIGHT = [
    {
        "right_on": Timestamp("2023-09-07 15:00:00+0000", tz="UTC"),
        "right_by": "SYM1",
    }
]

def test_asofjoin_order():
    left: pa.Table = pa.Table.from_pylist(LEFT)
    right = pa.Table.from_pylist(RIGHT)

    left = pa.concat_tables(left[i : i + 1] for i in range(left.num_rows))
    assert left[left.column_names[0]] == left[left.column_names[0]].sort()
    assert right[right.column_names[0]] == right[right.column_names[0]].sort()
    with pytest.raises(
        pa.ArrowInvalid, match="AsofJoin does not allow out-of-order on-key values"
    ):
        left.join_asof(
            right,
            on=left.column_names[0],
            by=left.column_names[1],
            right_on=right.column_names[0],
            right_by=right.column_names[1],
            tolerance=-9_223_372_036_854_775_808,
        )

it took a while to make a reproducible example. I can't exactly pin down what is causing the issue.

gitmodimo commented 3 weeks ago

This also happened for me. I am using acero C++ api

gitmodimo commented 1 week ago

I think I narrowed down the problem. The as-of-join node does not sequence incoming ExecBatches. Here is a patch I created. It probably is not the optimal solution. Maybe sequencer should be incorporated in BackpressureConcurrentQueue?