pola-rs / polars

Dataframes powered by a multithreaded, vectorized query engine, written in Rust
https://docs.pola.rs
Other
29.57k stars 1.88k forks source link

Rolling ewm/prod/rank #16564

Open Mirac-Le opened 4 months ago

Mirac-Le commented 4 months ago

Description

Thank you to the hard-working Polars team! I recently switched to using Polars and I'm amazed by its speed. I hope in future versions there will be more expansion on time-series analysis operators, such as rolling ewm, rolling prod, rolling rank, etc.

Below are similar functions from other Python libraries that might be helpful to you: https://docs.xarray.dev/en/latest/generated/xarray.core.rolling.DataArrayRolling.prod.html https://docs.xarray.dev/en/latest/generated/xarray.Dataset.rolling_exp.html

Thanks again!

cmdlineluser commented 4 months ago

I was just curious as rolling rank has popped up a few times, e.g. https://github.com/pola-rs/polars/issues/4808

Are these operations currently possible in Polars via Expr.rolling()?

df = pl.DataFrame({"A": [1, 4, 2, 3, 5, 3]})

(df.with_row_index()
   .with_columns(
      rank = pl.col("A").rank().rolling(index_column="index", period="3i").list[-1],
      prod = pl.col("A").product().rolling(index_column="index", period="3i"),
      ewm  = pl.col("A").ewm_mean(span=1.5).rolling(index_column="index", period="3i").list[-1]
   )
)
shape: (6, 5)
┌───────┬─────┬──────┬──────┬──────────┐
│ index ┆ A   ┆ rank ┆ prod ┆ ewm      │
│ ---   ┆ --- ┆ ---  ┆ ---  ┆ ---      │
│ u32   ┆ i64 ┆ f64  ┆ i64  ┆ f64      │
╞═══════╪═════╪══════╪══════╪══════════╡
│ 0     ┆ 1   ┆ 1.0  ┆ 1    ┆ 1.0      │
│ 1     ┆ 4   ┆ 2.0  ┆ 4    ┆ 3.5      │
│ 2     ┆ 2   ┆ 2.0  ┆ 8    ┆ 2.290323 │
│ 3     ┆ 3   ┆ 2.0  ┆ 24   ┆ 2.870968 │
│ 4     ┆ 5   ┆ 3.0  ┆ 30   ┆ 4.580645 │
│ 5     ┆ 3   ┆ 1.5  ┆ 45   ┆ 3.322581 │
└───────┴─────┴──────┴──────┴──────────┘

But because a list is accumulated, it is less efficient than dedicated functions would be?

MarcoGorelli commented 4 months ago

yes, that would be less efficient than a dedicated function

wukan1986 commented 4 months ago

SkipList must be introduced

FYI: https://github.com/pandas-dev/pandas/blob/main/pandas/_libs/window/aggregations.pyx#L1281

from functools import lru_cache
from typing import Tuple

import numpy as np
from numba import jit
from pandas._libs.window.aggregations import roll_kurt as _roll_kurt
from pandas._libs.window.aggregations import roll_rank as _roll_rank
from polars import Series
from polars import Expr, Int32, UInt16, map_batches

@lru_cache
@jit(nopython=True, nogil=True, fastmath=True, cache=True)
def get_window_bounds(
        num_values: int = 0,
        window_size: int = 10,
) -> Tuple[np.ndarray, np.ndarray]:
    end = np.arange(1, num_values + 1, dtype=np.int64)
    start = end - window_size
    start = np.clip(start, 0, num_values)
    return start, end

def roll_rank(x: Series, d: int, pct: bool = True, method: str = 'average', ascending: bool = True):
    start, end = get_window_bounds(len(x), d)
    """
    https://github.com/pandas-dev/pandas/blob/main/pandas/_libs/window/aggregations.pyx#L1281

    def roll_rank(const float64_t[:] values, ndarray[int64_t] start,
              ndarray[int64_t] end, int64_t minp, bint percentile,
              str method, bint ascending) -> np.ndarray:

    O(N log(window)) implementation using skip list
    """
    ret = _roll_rank(x.to_numpy().astype(float), start, end, d, pct, method, ascending)
    return Series(ret, nan_to_null=True)

def ts_rank(x: Expr, d: int = 5) -> Expr:
    return x.map_batches(lambda a: roll_rank(a, d, True))
Mirac-Le commented 4 months ago

@cmdlineluser @MarcoGorelli @wukan1986

Thank you all for your responses; I've learned a lot from you. In my actual projects, I need to calculate rolling window rank/ewm/prod and other operators, grouped by some_col. I use the following code to calculate the EMA for the sliding window, and the results seem to be the same as those obtained from xarray's rolling_exp.mean().

polars code:

df.with_columns(
        pl.col(col_name)
        .ewm_mean(span=window_size, ignore_nulls=True)
        .over("some_col")
        .alias(f"{col_name}_ema_{window_size}")
    )

Could you please let me know if there are similar methods to calculate rank/prod, etc.?

For the rank/prod calculation, the current solution I think is:

df.with_row_index().rolling(
    index_column="dt", group_by="some_col", period="3m"
).agg(pl.col("val1").rank().last().alias("rank"))

To calculate prod, I just replace .rank() with .cumprod() (Sorry, I might have been a bit misleading. I meant cumprod, not prod.)

My dt is at the minute level, but there is a period of time where it is discontinuous. Therefore, I would prefer to determine the sliding window based on the number of rows, i.e., period='3i'. Do I need to manually create an incrementing integer column based on the group?

I hope to learn the most elegant and standard way to do this from your best practices. Thanks again for all your help!