apache / arrow

Apache Arrow is the universal columnar format and multi-language toolbox for fast data interchange and in-memory analytics
https://arrow.apache.org/
Apache License 2.0
14.5k stars 3.53k forks source link

[Python][Parquet] Faster parquet partitioning scheme #39079

Open CaselIT opened 11 months ago

CaselIT commented 11 months ago

Describe the enhancement requested

Hi,

I'm testing how to save in an optimal mode a table partitioned over one or more columns, with the objective of reading the single partitions later. While trying this I noticed that the current logic to save and especially load a partitioned table is very slow.

While experimenting on ways of speeding up this use case I noticed that the row groups of parquet can have an arbitrary size and a single parquet can have row groups of many different sizes. I think there could be a partitioning scheme that makes use of this feature of parquet files.

From some initial the tests I've done using only the public python api the serialization time is generally comparable while the read time is up to 10x faster when increasing the number of partitions. It's likely that if this scheme were natively implemented the result would be better.

The example code I tried is the following. I'm using pandas to partition the table, but it should not matter much on the times (if anything it should penalize the row group partitioning).

from contextlib import contextmanager
import pandas as pd
import numpy as np
import pyarrow
import pyarrow.parquet
import shutil
import time

rng = np.random.default_rng(seed=42)
shape = (1_000_000, 13)  # this results in about 100 MB of data
num_partitions = (100, 250, 500)

@contextmanager
def timectx(name):
    s = time.perf_counter()
    yield
    e = time.perf_counter()
    print("  ", name, (e - s) * 1000, "ms")

def make_data(num_partition):
    data = pd.DataFrame(rng.uniform(size=shape), columns=[f"c{i}" for i in range(shape[1])])
    # make a key column used for partitioning with partitions of random size.
    # The keys are randomly sorted in the dataframe
    splits = set()
    while len(splits) < num_partition - 1:
        splits.add(rng.integers(1, len(data)))
    splits.add(len(data))
    start = 0
    arr = []
    for i, end in enumerate(sorted(splits)):
        arr.append(np.full(end - start, i))
        start = end
    values = np.concatenate(arr)
    rng.shuffle(values)
    data["key"] = values.astype(np.int64)
    return data.copy()

def run(num_partition):
    print(f"Saving data with {num_partition} partitions")
    df = make_data(num_partition)
    keys = df["key"].unique()
    assert len(keys) == num_partition

    shutil.rmtree("write_to_dataset", ignore_errors=True)
    table = pyarrow.Table.from_pandas(df)
    with timectx("save using write_to_dataset"):
        pyarrow.parquet.write_to_dataset(
            table,
            "write_to_dataset",
            partition_cols=["key"],
            write_statistics=False,
            use_dictionary=False,
        )

    rng.shuffle(keys)  # randomize access
    with timectx("load partitions using read_table"):
        for key in keys:
            pyarrow.parquet.read_table("write_to_dataset", filters=[("key", "==", key)])

    shutil.rmtree("row_group", ignore_errors=True)
    with timectx("save using row groups"):
        # include the processing in pandas in the time even if using the
        # table directly is likely faster
        parts = dict(list(df.groupby("key")))
        pa_table = pyarrow.Table.from_pandas(next(iter(parts.values())))
        key_to_index = {}
        with pyarrow.parquet.ParquetWriter(
            "row_group",
            pa_table.schema,
            write_statistics=False,
            use_dictionary=False,
        ) as pw:
            for index, (key, group) in enumerate(parts.items()):
                size = len(group)
                pa_table = pyarrow.Table.from_pandas(group)
                pw.write_table(pa_table, row_group_size=size)
                key_to_index[key] = index

    with timectx("load partitions from row group"):
        for key in keys:
            index = key_to_index[key]
            # simulate opening the file each time, moving the for inside the with is faster but it
            # does not simulate classic use
            with pyarrow.parquet.ParquetFile("row_group") as pf:
                pf.read_row_group(index)

for ns in num_partitions:
    run(ns)

The result on windows are on my pc the following:

Saving data with 100 partitions
   save using write_to_dataset 387.8823999548331 ms
   load partitions using read_table 1924.649199936539 ms
   save using row groups 540.5166000127792 ms
   load partitions from row group 393.6318999622017 ms
Saving data with 250 partitions
   save using write_to_dataset 922.8724999120459 ms
   load partitions using read_table 11072.766500059515 ms
   save using row groups 996.5518999379128 ms
   load partitions from row group 1503.8880000356585 ms
Saving data with 500 partitions
   save using write_to_dataset 1778.2125999219716 ms
   load partitions using read_table 38530.92749998905 ms
   save using row groups 1454.6020000707358 ms
   load partitions from row group 4754.040600033477 ms

Using docker the results are similar

Saving data with 100 partitions
   save using write_to_dataset 2058.0104130003747 ms
   load partitions using read_table 1066.399915999682 ms
   save using row groups 933.8465500004531 ms
   load partitions from row group 230.3446129999429 ms
Saving data with 250 partitions
   save using write_to_dataset 4654.56907800035 ms
   load partitions using read_table 4715.190167000401 ms
   save using row groups 1677.210732000276 ms
   load partitions from row group 797.0339869998497 ms
Saving data with 500 partitions
   save using write_to_dataset 9070.415569000033 ms
   load partitions using read_table 17305.43225999918 ms
   save using row groups 2508.86906300002 ms
   load partitions from row group 2386.4264080002613 ms

In both cases I'm using python 3.10 with the following libraries:

pyarrow==14.0.1
pandas==2.1.3
numpy==1.26.2

A couple of consideration:

Thanks for reading.

Component(s)

Parquet

ghuls commented 8 months ago

You are rescanning the whole subdir on every invocation of pyarrow.parquet.read_table Scanning the filesystem only once for parquet files with pa.dataset.dataset and then filtering the data on the key column with dataset scanner is much faster:

In [14]: with timectx("scan for parquet files using dataset"):
     ...:      write_to_dataset_dataset = pa.dataset.dataset("write_to_dataset/", partitioning=pa.dataset.partitioning(flavor="hive"))
     ...: 
   scan for parquet files using dataset 40.98821000661701 ms

In [143]: with timectx("load partitions using dataset"):
     ...:         for key in keys:
     ...:             write_to_dataset_dataset.scanner(filter=(pc.field("key") == key)).to_table()
     ...: 
   load partitions using read_table 558.3812920376658 ms

In [144]:     with timectx("load partitions using read_table"):
     ...:         for key in keys:
     ...:             pyarrow.parquet.read_table("write_to_dataset", filters=[("key", "==", key)])
     ...: 
   load partitions using read_table 5385.372799937613 ms
CaselIT commented 8 months ago

Thanks for the reply!

Sure, but my use case is more:

I think something similar is a very common use case, where it's unfeasible to read the dataset only once and reuse it many times. This is also how parquet are usually read using pandas, polars and most other dataframe libraries: df_lib.read_parquet('file', filters=[...])

If that's possible also opening the parquet only once in this row group partition results in much better results:


    with timectx("load partitions from row group"):
        for key in keys:
            index = key_to_index[key]
            with pyarrow.parquet.ParquetFile("row_group") as pf:
                pf.read_row_group(index)
load partitions from row group 4809.91979999817 ms

move the file open outside the for

    with timectx("load partitions from row group - single file open"):
        with pyarrow.parquet.ParquetFile("row_group") as pf:
            for key in keys:
                index = key_to_index[key]
                pf.read_row_group(index)
load partitions from row group - single file open 196.3233999995282 ms

edited to remove polars use

CaselIT commented 8 months ago

For reference on my pc your suggestion is on a little better compared with re-opening the row group file each time in the for, but it's still ~20 times slower of doing the same using the proposed partitioning scheme.

This gives me the following times:

    import pyarrow.compute as pc
    import pyarrow.dataset

    with timectx("load partitions using read_table - read dataset once"):
        write_to_dataset_dataset = pyarrow.dataset.dataset(
            "write_to_dataset/", partitioning=pyarrow.dataset.partitioning(flavor="hive")
        )
        for key in keys:
            write_to_dataset_dataset.scanner(filter=(pc.field("key") == key)).to_table()
load partitions using read_table - read dataset once 3847.4653999874135 ms

So I still think this scheme has significant advantages compared to hive

edited to remove polars use