pola-rs / polars

Dataframes powered by a multithreaded, vectorized query engine, written in Rust
https://docs.pola.rs
Other
30.34k stars 1.96k forks source link

Pathological slowdown in `LazyFrame.group_by_dynamic` with simple input #15080

Open kszlim opened 8 months ago

kszlim commented 8 months ago

Checks

Reproducible example

import polars as pl
import time

ldf = pl.scan_parquet("repro.parquet").sort(pl.col("time_ns").over("id"))
ldf = ldf.group_by_dynamic("time_ns", every="10i", by="id", check_sorted=False).agg(pl.exclude("time_ns").first())
print(ldf.collect()) # Doesn't get printed unless I do ldf.head(30) or something

repro.parquet.zip

Unzip the attached parquet and then try to run it.

Log output

n/a

Issue description

Pathologically slow, there must be some sort of exponential behavior.

Expected behavior

Should run fast.

Installed versions

``` --------Version info--------- Polars: 0.20.15 Index type: UInt32 Platform: Linux-5.10.210-178.852.x86_64-x86_64-with-glibc2.26 Python: 3.11.7 (main, Dec 5 2023, 22:00:36) [GCC 7.3.1 20180712 (Red Hat 7.3.1-17)] ----Optional dependencies---- adbc_driver_manager: cloudpickle: 3.0.0 connectorx: deltalake: fastexcel: fsspec: 2024.2.0 gevent: hvplot: matplotlib: 3.8.3 numpy: 1.26.4 openpyxl: pandas: 2.2.1 pyarrow: 15.0.1 pydantic: pyiceberg: pyxlsb: sqlalchemy: xlsx2csv: xlsxwriter: ```

This reproduces in versions of polars since 0.19.14 at least. Doesn't seem to change if I write it to parquet before the group_by_dynamic and read it again, ie. there's nothing broken about the parquet file encoding.

ritchie46 commented 8 months ago

Minimal repro:


df = pl.DataFrame(
    {'id': [67,
  67,
  67,
  67,
  67,
  67,
  67,
  67,
  67,
  67,
  67,
  67,
  67,
  67,
  67,
  67,
  67,
  67,
  67,
  67],
 'time_ns': [15016000000,
  15126000000,
  15236000000,
  15346000000,
  15456000000,
  15566000000,
  15676000000,
  15786000000,
  15896000000,
  16006000000,
  16116000000,
  16226000001,
  16336000001,
  16446000001,
  16556000000,
  16666000000,
  16776000000,
  16886000001,
  16996000001,
  17106000001]}
    ).set_sorted("time_ns")

df.group_by_dynamic("time_ns", every="10i", check_sorted=False).agg(pl.col("id").alias("group"))
MarcoGorelli commented 8 months ago

determining the groups takes a long time

https://github.com/pola-rs/polars/blob/42a4b01ec7c1a8b6196080ec4ed7587f0212cdd7/crates/polars-time/src/group_by/dynamic.rs#L315-L324

if you're making groups every 10 units, and your measurements span 2 billion units, then that's a lot of groups...there's probably some fastpath which could be introduced to avoid creating a lot of them though

ritchie46 commented 8 months ago

Yes, we seem to iterate A LOT! Care to look a that one? Then I will do the pivots. :D

MarcoGorelli commented 8 months ago

I think this isn't so simple to speedup, there's already an early continue

https://github.com/pola-rs/polars/blob/25536cffa7cc7d8332ea07f3eb3291e466ccc08b/crates/polars-time/src/windows/group_by.rs#L79-L85

this may require a larger refactor..

ritchie46 commented 8 months ago

Oh, I didn't realize we went in steps of 10 through 2 billion units. Ok.. :/

MarcoGorelli commented 8 months ago

Yeah it's the equivalent of having 1 observation every 2 minutes, and then resampling so they're every 10 microseconds..

So I think a slowdown is expected - not saying it's not addressable, but I don't think it's at all common to do this, and so that it's low-prio compared with other open issues

kszlim commented 8 months ago

Is there a way to make it work with a time and/or duration datatype? I guess I could convert the column to seconds and then it should work fine with indices?

MarcoGorelli commented 8 months ago

Regardless of what dtype you convert it to, if your every is 8 orders of magnitude smaller than the distance between points, then there's going to be a perf impact

May I ask what your use case is here? I think you may be better of using a different operation (truncate perhaps?)

kszlim commented 8 months ago

Just trying to do a lazy downsample within groups.

mcrumiller commented 8 months ago

If you're doing an operation on every 10 elements, you could try something like unstack although you're going to generate a lot of columns. For this I would almost suggest to_numpy().reshape(-1, 10).mean(axis=1) or something of the sort.

kszlim commented 8 months ago

Regardless of what dtype you convert it to, if your every is 8 orders of magnitude smaller than the distance between points, then there's going to be a perf impact

May I ask what your use case is here? I think you may be better of using a different operation (truncate perhaps?)

I'm trying to downsample my data to about 50hz, but my data isn't labeled by timestamp and instead is just some sort of monotonic clock from a given epoch.