apache / arrow

Apache Arrow is a multi-language toolbox for accelerated data interchange and in-memory processing
https://arrow.apache.org/
Apache License 2.0
14.03k stars 3.43k forks source link

[C++][Dataset] Preserve order when writing dataset #26818

Open asfimport opened 3 years ago

asfimport commented 3 years ago

Currently, when writing a dataset, e.g. from a table consisting of a set of record batches, there is no guarantee that the row order is preserved when reading the dataset.

Small code example:


In [1]: import pyarrow.dataset as ds

In [2]: table = pa.table({"a": range(10)})

In [3]: table.to_pandas()
Out[3]: 
   a
0  0
1  1
2  2
3  3
4  4
5  5
6  6
7  7
8  8
9  9

In [4]: batches = table.to_batches(max_chunksize=2)

In [5]: ds.write_dataset(batches, "test_dataset_order", format="parquet")

In [6]: ds.dataset("test_dataset_order").to_table().to_pandas()
Out[6]: 
   a
0  4
1  5
2  8
3  9
4  6
5  7
6  2
7  3
8  0
9  1

Although this might seem normal in SQL world, typical dataframe users (R, pandas/dask, etc) will expect a preserved row order. Some applications might also rely on this, eg with dask you can have a sorted index column ("divisions" between the partitions) that would get lost this way (note, the dask parquet writer itself doesn't use pyarrow.dataset.write_dataset so isn't impacted by this.)

Some discussion about this started in https://github.com/apache/arrow/pull/8305 (ARROW-9782), which changed to write all fragments to a single file instead of a file per fragment.

I am not fully sure what the best way to solve this, but IMO at least having the option to preserve the order would be good.

cc @bkietz

Reporter: Joris Van den Bossche / @jorisvandenbossche Watchers: Rok Mihevc / @rok

Related issues:

Note: This issue was originally created as ARROW-10883. Please see the migration documentation for further details.

asfimport commented 2 years ago

Weston Pace / @westonpace: I deleted the link to ARROW-12873 because I don't know that "batch index" needs to rely on that arbitrary metadata mechanism (and, given that many nodes will need to manipulate it, I don't think it is arbitrary metadata)

hu6360567 commented 3 months ago

Hi @westonpace, Is there any updates on this? Current "FileSystemDataset::Write" is implemented by a sequenced plan of scan, filter, project, write. As referenced plan in #32991 , a batch_index has been added in the scanner and is used by "ordered_sink" to reorder exec_batches. Should we consider implementing an "ordered" node that functions similar to ordered_sink without sinking? This node could be injected any place between scan and project. I believe that the "ordered" node would be a more effective way to directly order the output of the "scan" node, providing a more flexible planning approach.

jerryqhyu commented 1 month ago

Could someone please solve this issue? This is clearly a bug in arrow, and it should least have an option to preserve order.

douglas-raillard-arm commented 3 weeks ago

Just got burnt by the same issue as I was trying to re-encode a parquet file with a different rowgroup size. Even if there is no plan to fix that issue, it might be a good idea to add a warning in the documentation, which currently mentions nothing about order not being preserved: https://arrow.apache.org/docs/python/generated/pyarrow.dataset.Dataset.html#

u3Izx9ql7vW4 commented 3 weeks ago

Related: https://github.com/apache/arrow/issues/39030