pola-rs / polars

Dataframes powered by a multithreaded, vectorized query engine, written in Rust
https://docs.pola.rs
Other
30.39k stars 1.97k forks source link

rolling_* functions don't check sortedness #10991

Closed MarcoGorelli closed 1 year ago

MarcoGorelli commented 1 year ago

Checks

Reproducible example

In [18]: df = pl.DataFrame({
    ...:     'dt': [datetime(2020, 1, 3), datetime(2020, 1, 1), datetime(2020, 1, 2)],
    ...:     'val': [1,2,3]
    ...: })

In [19]: df.with_columns(roll=pl.col('val').rolling_sum('2d', by='dt', closed='right'))
Out[19]:
shape: (3, 3)
┌─────────────────────┬─────┬──────┐
│ dt                  ┆ val ┆ roll │
│ ---                 ┆ --- ┆ ---  │
│ datetime[μs]        ┆ i64 ┆ i64  │
╞═════════════════════╪═════╪══════╡
│ 2020-01-03 00:00:00 ┆ 1   ┆ 1    │
│ 2020-01-01 00:00:00 ┆ 2   ┆ 2    │
│ 2020-01-02 00:00:00 ┆ 3   ┆ 5    │
└─────────────────────┴─────┴──────┘

Log output

n/a

Issue description

The column 'ts' wasn't sorted, so the results don't make sense

Expected behavior

Trying to use group_by_rolling would have raised

In [20]: df.group_by_rolling('dt', period='2d').agg(pl.col('val').mean())
---------------------------------------------------------------------------
InvalidOperationError: argument in operation 'group_by_rolling' is not explicitly sorted

- If your data is ALREADY sorted, set the sorted flag with: '.set_sorted()'.
- If your data is NOT sorted, sort the 'expr/series/column' first.

Installed versions

``` INSTALLED VERSIONS ------------------ commit : 6e194cf9eadedbf5506da5ba0939d700f46ba3ba python : 3.10.6.final.0 python-bits : 64 OS : Linux OS-release : 5.10.102.1-microsoft-standard-WSL2 Version : #1 SMP Wed Mar 2 00:30:59 UTC 2022 machine : x86_64 processor : x86_64 byteorder : little LC_ALL : None LANG : en_GB.UTF-8 LOCALE : en_GB.UTF-8 pandas : 2.1.0rc0 numpy : 1.25.1 pytz : 2023.3 dateutil : 2.8.2 setuptools : 67.6.1 pip : 23.1.2 Cython : None pytest : 7.3.1 hypothesis : 6.82.4 sphinx : None blosc : None feather : None xlsxwriter : None lxml.etree : None html5lib : None pymysql : None psycopg2 : None jinja2 : 3.1.2 IPython : 8.13.2 pandas_datareader : None bs4 : 4.12.2 bottleneck : None brotli : None dataframe-api-compat: None fastparquet : 2023.7.0 fsspec : 2023.6.0 gcsfs : None matplotlib : 3.7.1 numba : None numexpr : None odfpy : None openpyxl : None pandas_gbq : None pyarrow : 12.0.1 pyreadstat : None pyxlsb : None s3fs : None scipy : 1.11.0 snappy : None sqlalchemy : 2.0.20 tables : None tabulate : None xarray : None xlrd : None zstandard : None tzdata : 2023.3 qtpy : None pyqt5 : None None ```
orlp commented 1 year ago

Polars is different from the relational model in that DataFrames/Series are always ordered. The order itself is semantic (that our current hash joins don't always respect this is something I intend to resolve).

It is not up to us to determine whether or not a rolling sum makes sense or not, because the user might have intended a specific order. The result is always well-defined, unlike in SQL. So I don't think this is a bug, nor that it should be fixed.

orlp commented 1 year ago

If our implementation assumes the data is sorted based on the time, and the rolling_sum specifies a time window (which gives an unambiguous result), our implementation is bugged. But that should be fixed in the implementation, and not be shoved onto the user.

orlp commented 1 year ago

And forcing the user to first sort based on time is not the same! Window functions like rolling_sum should preserve the order of the input. If the data is not sorted by time already, we need to sort by time in a copy, and store back the results after windowing in the original order.

MarcoGorelli commented 1 year ago

thanks for explaining

does the same apply for group_by_rolling, which currently raises in such a case? or should that be reverted too?

MarcoGorelli commented 1 year ago

for reference, that came from #8994

orlp commented 1 year ago

@MarcoGorelli I'm trying to wrap my head around group_by_rolling, but it makes no sense to me. How it is grouping anything? Isn't the output always the same length as the input? Shouldn't that be called over_rolling as it is a window function?

I'm not attached to over_rolling as a name at all, it could be over_rolling_period or whatever, I just mean on a principled level.

MarcoGorelli commented 1 year ago

I think the idea was that in group_by_rolling, you create lots of groups (one for each row), and then aggregate each group - going to cc @ritchie46 here though, as he introduced it https://github.com/pola-rs/polars/pull/2435

orlp commented 1 year ago

you create lots of groups (one for each row), and then aggregate each group

That is exactly what a window function is. For each row a window is created (which does not necessarily have to be contiguous!), and within that window you create some result (which typically is an aggregate, but that also doesn't have to be the case). As regular window functions on Series are created by over, we should be consistent and use the term over for all window functions IMO to distinguish them from grouping functions (which reduce the total number of rows by grouping them).

MarcoGorelli commented 1 year ago

makes sense - over_rolling sounds good to me tbh, I think I was also confused about groupby_rolling when I first came across it

though back to sortedness - reckon the raising of the error should be reverted for group_by_rolling as well, or is that case different?

orlp commented 1 year ago

@MarcoGorelli I think the result is well-specified regardless of sorting for 'group_by_rolling' as well:

If you have a time series <t_0, t_1, ..., t_n>, then by default the windows created will be

  • (t_0 - period, t_0]
  • (t_1 - period, t_1]
  • ...
  • (t_n - period, t_n]

This is still perfectly well-defined even if t_0, t_1, ..., t_n aren't sorted. In practice you will first sort them while keeping track of the original order, compute the window function efficiently since it's now sorted, then put the results back in the original order.

MarcoGorelli commented 1 year ago

Something that comes to mind is - suppose some has a by column 'ts'. They repeatedly call rolling_mean

If group_by_rolling were to sort for you under-the-hood, then a user might end up in the first scenario above, and not necessarily know why their code isn't as performant as it could be But if group_by_rolling were to error on unsorted data, then the user would be forced to sort their data once - and thereafter, they would benefit from higher performance


I totally may be missing something here (or a lot) - I'm not too attached to the error message, my main concern is the inconsistency between rolling_* and group_by_rolling

orlp commented 1 year ago

@MarcoGorelli What do you mean by "repeatedly call" rolling_mean? Could you give an example (doesn't have to run with real data, just a hypothetical query)?

MarcoGorelli commented 1 year ago

sure, something like

df.with_columns(
    rolling_mean_30d=pl.col('val').rolling_mean('30d', by='ts'),
    rolling_mean_60d=pl.col('val').rolling_mean('60d', by='ts'),
    rolling_mean_90d=pl.col('val').rolling_mean('90d', by='ts'),
    rolling_mean_120d=pl.col('val').rolling_mean('120d', by='ts'),
)

(the kind of things data scientists like to do, like - first example that comes to mind: https://www.kaggle.com/competitions/m5-forecasting-accuracy/discussion/163684)

orlp commented 1 year ago

@MarcoGorelli If we were to (eventually) implement my vision for rolling window functions (https://github.com/pola-rs/polars/issues/10989#issuecomment-1711742252), that would be

df.with_columns(
    rolling_mean_30d=pl.col('val').mean().rolling_by('ts', '30d'),
    rolling_mean_60d=pl.col('val').mean().rolling_by('ts', '60d'),
    rolling_mean_90d=pl.col('val').mean().rolling_by('ts', '90d'),
    rolling_mean_120d=pl.col('val').mean().rolling_by('ts', '120d'),
)

If internally in the engine we'd translate that to

df.with_columns(
    rolling_mean_30d=pl.col('val').mean().rolling_by_assume_sorted('ts', '30d').temporary_sort_by('ts'),
    rolling_mean_60d=pl.col('val').mean().rolling_by_assume_sorted('ts', '60d').temporary_sort_by('ts'),
    rolling_mean_90d=pl.col('val').mean().rolling_by_assume_sorted('ts', '90d').temporary_sort_by('ts'),
    rolling_mean_120d=pl.col('val').mean().rolling_by_assume_sorted('ts', '120d').temporary_sort_by('ts'),
)

where expr.temporary_sort_by(col) sorts the input keeping the order, evals on the copy before undoing the sort permutation on the results, (which is a no-op if the column is already sorted which can be detected at runtime).... theeeeen we could add an optimization that pushes down the rolling means:

df.with_columns(
    (
        pl.col('val').mean().rolling_by_assume_sorted('ts', '30d').alias('rolling_mean_30d'),
        pl.col('val').mean().rolling_by_assume_sorted('ts', '60d').alias('rolling_mean_60d'),
        pl.col('val').mean().rolling_by_assume_sorted('ts', '90d').alias('rolling_mean_90d'),
        pl.col('val').mean().rolling_by_assume_sorted('ts', '120d').alias('rolling_mean_120d'),
    )
        .temporary_sort_by('ts')
        .struct.unnest()
)

Now all of that is very hypothetical and relies on a lot of features we don't have yet, as well as my speculative 'vision' for both window functions and the internal optimizer layer, but in theory it should be possible to automatically optimize your scenario.

MarcoGorelli commented 1 year ago

Sounds amazing, thanks for explaining - and I'm very excited for the future of Polars with you hired to work on it!

This does sound like it's quite some time away though, so a question I have is what to do in the meantime. Do you agree that

should return the same result?

Currently:

To me, it looks like making the latter also raise would be a good short-term fix (to avoid users' bugs and surprises), and a longer-term solution would be to work towards what you've described

orlp commented 1 year ago

@MarcoGorelli I think for the short-term we should change both to always give correct results (even if extra sorting is required), and repurpose the check_sorted option on group_by_rolling and add a similar option to rolling_mean & friends that by default is True. If True it gives a warning when given unsorted input that it might be inefficient, which can be silenced by passing False.

MarcoGorelli commented 1 year ago

Sounds good, thanks for the discussion

I'm quite short on time at the moment so I've removed my assignment, and am suggesting to just add a docs warning before the next release comes out https://github.com/pola-rs/polars/pull/11013 (though if you have time to implement your solution before 0.19.3, then I think that would be better, so please do take over and close my PR if that's the case!)

ritchie46 commented 1 year ago

Regarding the group_by_rolling, I don't like the name and I think we should rename it. It originated because we had group_by_dynamic which creates windows of different sizes dynamically.

I would like a rolling or maybe even over context.

The reason I am in favor of a context instead of a over_rolling is that we then can reuse the groups and perfectly parallelize over the aggregations.

This would ofcourse also be possible if we allowed an over_rolling, but requires a bit more work (e.g. group partitioning) on the physical side. I do agree that it is much nicer in a with_columns call, as we guarantee the same number of rows as output.