pola-rs / polars

Dataframes powered by a multithreaded, vectorized query engine, written in Rust
https://docs.pola.rs
Other
29.34k stars 1.86k forks source link

performance issue with tpch q7 after dropping columns and using sink_parquet #16694

Open lostmygithubaccount opened 3 months ago

lostmygithubaccount commented 3 months ago

Checks

Reproducible example

this is slightly involved but you should be able to copy/paste below after pip install 'ibis-framework[duckdb]' in addition to having Polars installed. I am on the latest release of Polars (0.20.31). this breaks down at sf=20, works fine on sf=10

function to generate the data:

import os
import ibis

def get_data_dir(sf, n_partitions):
    return os.path.join("tpch_data", f"sf={sf}", f"n={n_partitions}")

def generate_data(sf, n_partitions):
    con = ibis.connect("duckdb://")
    con.raw_sql("set enable_progress_bar = false")

    data_directory = get_data_dir(sf, n_partitions)

    if not os.path.exists(data_directory):
        for i in range(n_partitions):
            con.raw_sql(f"call dbgen(sf={sf}, children={n_partitions}, step={i})")
            for table in con.list_tables():
                if i == 0:
                    os.makedirs(os.path.join(data_directory, table), exist_ok=True)

                con.table(table).to_parquet(
                    os.path.join(data_directory, table, f"{i:04d}.parquet")
                )

                con.drop_table(table)

run for sf=10 and sf=20:

n = 1

sfs = [10, 20]

for sf in sfs:
    generate_data(sf, n)

now we can read the data:

customer = pl.scan_parquet("tpch_data/sf=10/n=1/customer/*.parquet")
customer.head().collect()

you'll notice Polars by default (and Ibis on the DuckDB/Polars backends) creates the hive-partitioned sf and n as columns in the data. this was throwing some things off, so in my longer get_polars_tables function I dropped those columns:

def get_polars_tables(sf, n_partitions, lazy=True):
    import os

    os.environ["POLARS_ACTIVATE_DECIMAL"] = (
        "1"  # https://github.com/pola-rs/polars/issues/16603#issuecomment-2141701041
    )
    data_directory = get_data_dir(sf, n_partitions)

    if lazy:
        customer = pl.scan_parquet(f"{data_directory}/customer/*.parquet")
        lineitem = pl.scan_parquet(f"{data_directory}/lineitem/*.parquet")
        nation = pl.scan_parquet(f"{data_directory}/nation/*.parquet")
        orders = pl.scan_parquet(f"{data_directory}/orders/*.parquet")
        part = pl.scan_parquet(f"{data_directory}/part/*.parquet")
        partsupp = pl.scan_parquet(f"{data_directory}/partsupp/*.parquet")
        region = pl.scan_parquet(f"{data_directory}/region/*.parquet")
        supplier = pl.scan_parquet(f"{data_directory}/supplier/*.parquet")
    else:
        customer = pl.read_parquet(f"{data_directory}/customer/*.parquet")
        lineitem = pl.read_parquet(f"{data_directory}/lineitem/*.parquet")
        nation = pl.read_parquet(f"{data_directory}/nation/*.parquet")
        orders = pl.read_parquet(f"{data_directory}/orders/*.parquet")
        part = pl.read_parquet(f"{data_directory}/part/*.parquet")
        partsupp = pl.read_parquet(f"{data_directory}/partsupp/*.parquet")
        region = pl.read_parquet(f"{data_directory}/region/*.parquet")
        supplier = pl.read_parquet(f"{data_directory}/supplier/*.parquet")

    # TODO: report issue(s) (issue(s) at higher SFs)
    def _decimal_to_float(df):
        return df.with_columns((ps.decimal().cast(pl.Float64)))

    customer = _decimal_to_float(customer)
    lineitem = _decimal_to_float(lineitem)
    nation = _decimal_to_float(nation)
    orders = _decimal_to_float(orders)
    part = _decimal_to_float(part)
    partsupp = _decimal_to_float(partsupp)
    region = _decimal_to_float(region)
    supplier = _decimal_to_float(supplier)

    # TODO: keep this or figure something out and remove
    def _drop_hive_cols(df):
        return df.drop(["sf", "n"])

    customer = _drop_hive_cols(customer)
    lineitem = _drop_hive_cols(lineitem)
    nation = _drop_hive_cols(nation)
    orders = _drop_hive_cols(orders)
    part = _drop_hive_cols(part)
    partsupp = _drop_hive_cols(partsupp)
    region = _drop_hive_cols(region)
    supplier = _drop_hive_cols(supplier)

    return customer, lineitem, nation, orders, part, partsupp, region, supplier

you can then read in the tables:

sf = 20
n_partitions = 1
lazy = True

customer, lineitem, nation, orders, part, partsupp, region, supplier = (
    get_polars_tables(sf=sf, n_partitions=n_partitions, lazy=lazy)
)

perhaps a separate bug but I'll move forward -- at this point the dataframes still have the n and sf columns, even though they should have been dropped. this does not seem to be an issue in the eager API

now we define q7:

def q7(customer, lineitem, nation, orders, supplier, **kwargs):
    var1 = "FRANCE"
    var2 = "GERMANY"
    var3 = date(1995, 1, 1)
    var4 = date(1996, 12, 31)

    n1 = nation.filter(pl.col("n_name") == var1)
    n2 = nation.filter(pl.col("n_name") == var2)

    q1 = (
        customer.join(n1, left_on="c_nationkey", right_on="n_nationkey")
        .join(orders, left_on="c_custkey", right_on="o_custkey")
        .rename({"n_name": "cust_nation"})
        .join(lineitem, left_on="o_orderkey", right_on="l_orderkey")
        .join(supplier, left_on="l_suppkey", right_on="s_suppkey")
        .join(n2, left_on="s_nationkey", right_on="n_nationkey")
        .rename({"n_name": "supp_nation"})
    )

    q2 = (
        customer.join(n2, left_on="c_nationkey", right_on="n_nationkey")
        .join(orders, left_on="c_custkey", right_on="o_custkey")
        .rename({"n_name": "cust_nation"})
        .join(lineitem, left_on="o_orderkey", right_on="l_orderkey")
        .join(supplier, left_on="l_suppkey", right_on="s_suppkey")
        .join(n1, left_on="s_nationkey", right_on="n_nationkey")
        .rename({"n_name": "supp_nation"})
    )

    q_final = (
        pl.concat([q1, q2])
        .filter(pl.col("l_shipdate").is_between(var3, var4))
        .with_columns(
            (pl.col("l_extendedprice") * (1 - pl.col("l_discount"))).alias("volume"),
            pl.col("l_shipdate").dt.year().alias("l_year"),
        )
        .group_by("supp_nation", "cust_nation", "l_year")
        .agg(pl.sum("volume").alias("revenue"))
        .sort(by=["supp_nation", "cust_nation", "l_year"])
    )

    return q_final

and run it, calling sink_parquet on the result:

res = q7(
    customer=customer,
    lineitem=lineitem,
    nation=nation,
    orders=orders,
    part=part,
    partsupp=partsupp,
    region=region,
    supplier=supplier,
)
res.sink_parquet("temp.parquet")

at sf=10 it works fine, but at sf=20 it hangs for a very long time. it also uses 100% CPU while doing this

as I'm writing this it actually did finish -- while .collect().write_parquet() takes 1.5s at sf=20, the sink_parquet call takes ~9s at sf=10 and ~60s at sf=20

I was originally testing this at sf=50 and sf=100 so assumed it was hanging forever, particularly compared to the previous numbers I was seeing before I added those drop column calls. I'll still submit this

NOTE: the log output below was too long (parquet file must be read...) for GitHub so I deleted a bunch of it, that seems like it'd be the issue though (reading the parquet file(s) a ton of times?)

Log output

parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
POLARS PREFETCH_SIZE: 20
POLARS PREFETCH_SIZE: 20
POLARS PREFETCH_SIZE: 20
POLARS PREFETCH_SIZE: 20
POLARS PREFETCH_SIZE: 20
POLARS PREFETCH_SIZE: 20
POLARS PREFETCH_SIZE: 20
POLARS PREFETCH_SIZE: 20
POLARS PREFETCH_SIZE: 20
POLARS PREFETCH_SIZE: 20
POLARS PREFETCH_SIZE: 20
POLARS PREFETCH_SIZE: 20
RUN STREAMING PIPELINE
[parquet -> hstack -> callback -> fast_projection -> callback -> fast_projection -> callback -> fast_projection -> function -> fast_projection -> union -> hstack -> generic-group_by, parquet -> fast_projection -> callback -> fast_projection -> function -> generic_join_build, parquet -> fast_projection -> callback -> fast_projection -> generic_join_build, parquet -> generic_join_build, parquet -> generic_join_build, parquet -> generic_join_build, parquet -> hstack -> callback -> fast_projection -> callback -> fast_projection -> callback -> fast_projection -> function -> fast_projection -> union -> hstack -> generic-group_by -> sort_multiple -> parquet_sink, parquet -> fast_projection -> callback -> fast_projection -> function -> generic_join_build, parquet -> fast_projection -> callback -> fast_projection -> generic_join_build, parquet -> generic_join_build, parquet -> generic_join_build, parquet -> generic_join_build]
STREAMING CHUNK SIZE: 25000 rows
parquet file must be read, statistics not sufficient for predicate.
STREAMING CHUNK SIZE: 25000 rows
STREAMING CHUNK SIZE: 25000 rows
parquet file must be read, statistics not sufficient for predicate.
STREAMING CHUNK SIZE: 25000 rows
STREAMING CHUNK SIZE: 25000 rows
STREAMING CHUNK SIZE: 10000 rows
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
...
parquet file must be read, statistics not sufficient for predicate.
finish streaming aggregation with local in-memory table

Issue description

two potential issues:

  1. noticed columns aren't dropped for LazyFrames when they should be (and are for regular DataFrames)

  2. potential performance issue involving dropping columns + sink_parquet

  3. columns are dropped

  4. no performance issue w/ the above (I can work around this w/ .collect().write_parquet it seems)

Expected behavior

  1. columns are dropped
  2. no performance issue w/ the above (I can work around this w/ .collect().write_parquet it seems)

Installed versions

``` --------Version info--------- Polars: 0.20.31 Index type: UInt32 Platform: macOS-14.5-arm64-arm-64bit Python: 3.11.8 (main, Feb 20 2024, 20:00:15) [Clang 14.0.3 (clang-1403.0.22.14.1)] ----Optional dependencies---- adbc_driver_manager: cloudpickle: connectorx: deltalake: fastexcel: fsspec: 2024.5.0 gevent: hvplot: matplotlib: nest_asyncio: 1.6.0 numpy: 1.26.4 openpyxl: pandas: 2.2.2 pyarrow: 16.1.0 pydantic: pyiceberg: pyxlsb: sqlalchemy: torch: xlsx2csv: xlsxwriter: ```
lostmygithubaccount commented 3 months ago

this was very nasty...it took me a while to figure out what changed (adding that drop columns thing) -- I'm going to move forward by just removing sink_parquet in my code in favor of collect().write_parquet(), as it was causing other issues too

the columns not being dropped was particularly surprising. but if overall this is something y'all are addressing in the new streaming engine or there's something obvious here of course feel free to close

lostmygithubaccount commented 3 months ago

and one more note, I also observed this in 0.20.30 -- I upgraded to see if it fixed itself

ritchie46 commented 3 months ago

sink_parquet uses the streaming engine, whereas collect().write_parquet() uses the in-memory engine. We are completely redesigning the streaming engine and will likely not improve the performance of the current one (it will be discontinued).

We don't recommend using the streaming engine at the moment (if it works for you great), but we are not happy with it. If you use it for benchmarking, I think you should make clear that it is polars-streaming you are benchmarking. ;)

On the mentioned bug, can you create an MWE that only shows the bug and can be repeated on a small dataset without any dependencies?

lostmygithubaccount commented 3 months ago

that's helpful, thanks! I'll probably avoid streaming for this but keep it flexible so we can redo it once the new engine is out (and clearly note what we're using) -- btw I'll share the benchmark in the communities before publishing for any other feedback or corrections (hopefully this week)

I'll also try to reproduce this on smaller data with a better MWE (but won't be a high priority for me), feel free to close this out