pola-rs / polars

Dataframes powered by a multithreaded, vectorized query engine, written in Rust
https://docs.pola.rs
Other
30.07k stars 1.94k forks source link

Faster parquet partitioning scheme #14389

Open CaselIT opened 8 months ago

CaselIT commented 8 months ago

Hi,

This may be more of a discussion, but they don't seem enabled, so this is what I think fit best. Sorry if it's not the case

I originally suggested this in the arrow repository, but so far no one replied to it. Since I believe it would be an useful feature, I though about asking here to see if polars has interested in it. The original request is this one https://github.com/apache/arrow/issues/39079.

Below is the same description but using polars instead of pyarrow where possible

Description

I'm testing how to save in an optimal mode a dataframe partitioned over one or more columns, with the objective of reading the single partitions later. While trying this I noticed that the current hive logic to save and especially load a partitioned table is very slow.

While experimenting on ways of speeding up this use case I noticed that the row groups of parquet can have an arbitrary size and a single parquet can have row groups of many different sizes. I think there could be a partitioning scheme that makes use of this feature of parquet files.

From some initial the tests I've done using only the public python api of pyarrow the serialization time is generally comparable or better while the read time is up to 5x faster when increasing the number of partitions (scaling seems to improve with the number of partitions). It's likely that if this scheme were natively implemented the result would be better.

The example code I tried is the following.

import shutil
import time
from contextlib import contextmanager

import polars as pl
import numpy as np
import pyarrow
import pyarrow.parquet

rng = np.random.default_rng(seed=42)
shape = (1_000_000, 13)  # this results in about 100 MB of data
num_partitions = (100, 250, 500)

@contextmanager
def timectx(name):
    s = time.perf_counter()
    yield
    e = time.perf_counter()
    print("  ", name, (e - s) * 1000, "ms")

def make_data(num_partition):
    data = pl.DataFrame(rng.uniform(size=shape), schema={f"c{i}": pl.Float64 for i in range(shape[1])})
    # make a key column used for partitioning with partitions of random size.
    # The keys are randomly sorted in the dataframe
    splits = set()
    while len(splits) < num_partition - 1:
        splits.add(rng.integers(1, len(data)))
    splits.add(len(data))
    start = 0
    arr = []
    for i, end in enumerate(sorted(splits)):
        arr.append(np.full(end - start, i))
        start = end
    values = np.concatenate(arr)
    rng.shuffle(values)
    return data.with_columns(key=pl.Series(values.astype(np.int64)))

def run(num_partition):
    print(f"Saving data with {num_partition} partitions")
    df = make_data(num_partition)

    keys = df["key"].unique()
    assert len(keys) == num_partition

    shutil.rmtree("write_to_dataset", ignore_errors=True)
    with timectx("save using write_to_dataset"):
        df.write_parquet(
            "write_to_dataset",
            use_pyarrow=True,
            pyarrow_options={"partition_cols": ["key"]},
        )

    keys = keys.sample(fraction=1)  # randomize access
    assert len(keys) == num_partition
    with timectx("load partitions using read_table"):
        for key in keys:
            pl.scan_parquet("write_to_dataset/**/*.parquet").filter(key=key).collect()

    shutil.rmtree("row_group", ignore_errors=True)
    with timectx("save using row groups"):
        # include the processing in polars
        parts = df.partition_by(["key"], as_dict=True)
        pa_table = next(iter(parts.values())).to_arrow()
        key_to_index = {}
        with pyarrow.parquet.ParquetWriter("row_group", pa_table.schema) as pw:
            for index, (key, group) in enumerate(parts.items()):
                size = len(group)
                pa_table = group.to_arrow()
                pw.write_table(pa_table, row_group_size=size)
                key_to_index[key] = index

    with timectx("load partitions from row group"):
        for key in keys:
            index = key_to_index[key,]
            # simulate opening the file each time, moving the for inside the with is faster but it
            # does not simulate the classing use
            with pyarrow.parquet.ParquetFile("row_group") as pf:
                pl.from_arrow(pf.read_row_group(index))

for ns in num_partitions:
    run(ns)

The result on windows are on my pc the following (using polars 0.20.7 and pyarrow 15.0.0):

Saving data with 100 partitions
   save using write_to_dataset 952.3031999997329 ms
   load partitions using read_table 1801.9544999988284 ms
   save using row groups 1084.1881000087596 ms
   load partitions from row group 524.6812000114005 ms
Saving data with 250 partitions
   save using write_to_dataset 1478.5382999980357 ms
   load partitions using read_table 9076.92710000265 ms
   save using row groups 1145.2023000019835 ms
   load partitions from row group 1988.7720999977319 ms
Saving data with 500 partitions
   save using write_to_dataset 2888.365699996939 ms
   load partitions using read_table 31440.25019998662 ms
   save using row groups 1304.839999997057 ms
   load partitions from row group 6533.625700001721 ms

Using docker the results are similar

Saving data with 100 partitions
   save using write_to_dataset 3171.588899999989 ms
   load partitions using read_table 723.6860350000143 ms
   save using row groups 1122.3316360000126 ms
   load partitions from row group 250.70953099998405 ms
Saving data with 250 partitions
   save using write_to_dataset 7660.383457999984 ms
   load partitions using read_table 3418.3086179999973 ms
   save using row groups 1477.735848000009 ms
   load partitions from row group 936.2179569999967 ms
Saving data with 500 partitions
   save using write_to_dataset 15105.868474000004 ms
   load partitions using read_table 12409.564421 ms
   save using row groups 2420.6517399999825 ms
   load partitions from row group 2887.588471999976 ms

In both cases I'm using python 3.10

A couple of consideration:

Thanks for reading.

mkleinbort commented 8 months ago

I've been working on this as well, but not sure this is the answer. I think your us case is better solved by the implementation of page skipping via column indexes (https://github.com/pola-rs/polars/issues/14314) or by adopting the lance file format (https://github.com/pola-rs/polars/issues/12389).

I'm personally going with Lance with scalar indexed instead of partitions. It works remarkably well.

CaselIT commented 8 months ago

Thanks for your input.

I'm not familiar with the lance file format, I'll give it a look thanks (even if it seems still a lot less popular that parquet).

Regarding column indexes it does not seem to still be very well supported, at least as far as I could find pyarrow seems to support writing them, but does not uses them while reading (from the documentation of write_page_index). In any case I'll investigate if it's already possible to use.

In any case I think something as proposed would still be viable, even if there are alternatives to it

mkleinbort commented 8 months ago

If you find a way to speed up parquet I'm all for it - it's what led me to lance. I hit the issue that compression got very bad when the row groups got too small (or the file got split across too many partitions).

Maybe I don't understand your proposal - what is the core idea? Matching the partitions to row group sizes?

CaselIT commented 8 months ago

If you find a way to speed up parquet I'm all for it - it's what led me to lance. I hit the issue that compression got very bad when the row groups got too small (or the file got split across too many partitions).

The too many row-group thing is really evident on how pyarrow saves the data in hive partitions. If the partition are not sorted how hive partitioning is implemented can only be described as "bad".

Maybe I don't understand your proposal - what is the core idea? Matching the partitions tomorrow group sizes?

It's an alternative to hive partitioning, that in my use case is not really great, meaning I would expect that it's optimized at loading a single partition given a partitioning the key, but it does not seem to that good at it. Again on my use case.

CaselIT commented 8 months ago

I'm personally going with Lance with scalar indexed instead of partitions. It works remarkably well.

I'm probably not doing something correctly, but it seems that translating the same example to use lance perform a lot worse compared to either parquet example (saving the data is fast though):

def run_lance(num_partition):
    import lance

    print(f"Saving data with {num_partition} partitions")
    df = make_data(num_partition)

    keys = df["key"].unique()
    assert len(keys) == num_partition

    shutil.rmtree("write_to_lance", ignore_errors=True)
    with timectx("save using write_to_lance"):
        ds = lance.write_dataset(df, "write_to_lance")

    keys = keys.sample(fraction=1)  # randomize access
    assert len(keys) == num_partition
    with timectx("load single key reopening dataset"):
        for key in keys:
            pl.from_arrow(lance.dataset("write_to_lance").scanner(filter=f"key == {key}").to_table())

    with timectx("load single key reusing dataset - read dataset once"):
        write_to_lance = lance.dataset("write_to_lance")
        for key in keys:
            pl.from_arrow(write_to_lance.scanner(filter=f"key == {key}").to_table())

    shutil.rmtree("write_to_lance_index", ignore_errors=True)
    with timectx("save using write_to_lance with index"):
        ds = lance.write_dataset(df, "write_to_lance_index")
        ds.create_scalar_index("key", "BTREE")

    with timectx("load single key reopening dataset with index"):
        for key in keys:
            pl.from_arrow(lance.dataset("write_to_lance_index").scanner(filter=f"key == {key}").to_table())

    with timectx("load single key reusing dataset with index - read dataset once"):
        write_to_lance_index = lance.dataset("write_to_lance_index")
        for key in keys:
            pl.from_arrow(write_to_lance_index.scanner(filter=f"key == {key}").to_table())

Tried only on windows

# Saving data with 100 partitions
#    save using write_to_lance 587.8781999927014 ms
#    load single key reopening dataset 29757.00619997224 ms
#    load single key reusing dataset - read dataset once 30374.47699997574 ms
#    save using write_to_lance with index 585.647500003688 ms
#    load single key reopening dataset with index 21465.08240001276 ms
#    load single key reusing dataset with index - read dataset once 21361.00340000121 ms
# Saving data with 250 partitions
#    save using write_to_lance 521.4053000090644 ms
#    load single key reopening dataset 64323.835899995174 ms
#    load single key reusing dataset - read dataset once 62762.13719998486 ms
#    save using write_to_lance with index 688.1173999863677 ms
#    load single key reopening dataset with index 43631.29079999635 ms
#    load single key reusing dataset with index - read dataset once 42433.75660001766 ms
# Saving data with 500 partitions
#    save using write_to_lance 480.91260000364855 ms
#    load single key reopening dataset 100031.00830002222 ms
#    load single key reusing dataset - read dataset once 99191.57840003027 ms
#    save using write_to_lance with index 651.4785999897867 ms
#    load single key reopening dataset with index 64974.15620001266 ms
#    load single key reusing dataset with index - read dataset once 64076.031400007196 ms
mkleinbort commented 8 months ago

I'll check. But before I do I'd like to understand your proposed partitioning scheme. How would you describe it? I read the code but it's not clear to me what is the core idea... To have one row group per value in the partition column?

CaselIT commented 8 months ago

To have one row group per value in the partition column?

Yes, that's all there is to it regarding how the data is saved.

A more complex part is probably how to save the mapping key value to row group index, that in the example is not taken into consideration simplifying it for a dict. As mentioned in the considerations such data could be saved in the parquet metadata in some standard format.

CaselIT commented 8 months ago

I'm personally going with Lance with scalar indexed instead of partitions. It works remarkably well.

I'm probably not doing something correctly

Seems like lance in this use case is very sensible to the order of the saved data. Adding df = df.sort('key') before saving results in almost 2 order of magnitude improvements compared to the unordered case

mkleinbort-ic commented 8 months ago

Yes, and you can add an index to make it even faster.

If you know the rows you need, lance's take is very fast as well.

jonashaag commented 8 months ago

I'm getting very different benchmark results on a real world dataset. 200 M rows, 10 columns, 100 partitions (index), zstd level 3 compressed:

# Manual read of partitions
In [59]: %%timeit -n1 -r1
    ...: for k in index:
    ...:     pl.read_parquet(f"/tmp/xxx/xxx={k}/data.parquet")
    ...:
12.8 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)

# Manual read of partitions with PyArrow
In [60]: %%timeit -n1 -r1
    ...: for k in index:
    ...:     pq.read_table(f"/tmp/xxx/xxx={k}/data.parquet")
    ...:
2.24 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)

# Read row groups with PyArrow
In [61]: %%timeit -n1 -r1
    ...: for i, k in enumerate(index):
    ...:     pl.from_arrow(r.read_row_group(i))
    ...:
8.17 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)

# Polars hive partitions
In [62]: scan = pl.scan_parquet("/tmp/xxx/*/*")
In [63]: %%timeit -n1 -r1
    ...: for k in index:
    ...:     scan.filter(pl.col.xxx == k).collect()
    ...:
13.5 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)

# PyArrow hive partitions
In [94]: %%timeit -n1 -r1
    ...: for k in index:
    ...:     pq.read_table("/tmp/xxx", filters=[("xxx", "==", k)])
    ...:
3.12 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
CaselIT commented 8 months ago

Interesting, thanks for the test! Is the dataset public by any chance?

jonashaag commented 8 months ago

No :(

CaselIT commented 8 months ago

I'll try to check a synthetic one with the same characteristic. Are the key ordered when it was originally partitioned?

jonashaag commented 8 months ago

One problem in your benchmark is that the number of rows (or the file size) are way too small and the types of columns are not representative of real world data. Also, uniform random distribution is very rare in real world data.

As a consequence of the small number of rows the overhead of repeatedly calling .scan_parquet() is very large. If I change the Hive test to

        scan = pl.scan_parquet("write_to_dataset/**/*.parquet")
        for key in keys:
            scan.filter(key=key).collect()

it improves ~ 3x in performance, making it faster than the row group one. Although when I increase the number of rows, the row group one is faster again.

CaselIT commented 8 months ago

Regarding what to keep inside the for keys or not in the example, depends on what one wants to benchmark. The ParquetFile initialization has also a very significant overhead.

My example was more representative of the use case where something creates a partitioned file, then work on the single partitions is done independently, and where sharing a scan_parquet or pyarrow.datataset is not feasible (because for example work is being done in a batch processing or similar system).

In any case thanks for the input. I'll try different types of data to see if I also see different times.