rapidsai / cudf

cuDF - GPU DataFrame Library
https://docs.rapids.ai/api/cudf/stable/
Apache License 2.0
8.44k stars 903 forks source link

[BUG] Rolling window aggregations are very slow with large windows #15119

Open shwina opened 8 months ago

shwina commented 8 months ago

With large windows, the .rolling() function in cuDF can be pathologically slow:

In [6]: dt = cudf.date_range("2001-01-01", "2002-01-01", freq="1s")
In [7]: df = cudf.DataFrame({"x": np.random.rand(len(dt))}, index=dt)
In [8]: %time df.rolling("1D").sum()
CPU times: user 10.3 s, sys: 57.1 ms, total: 10.3 s
Wall time: 10.4 s
Out[8]:
                                x
2001-01-01 00:00:00      0.815418
2001-01-01 00:00:01      1.238151
2001-01-01 00:00:02      1.811390
2001-01-01 00:00:03      2.065794
2001-01-01 00:00:04      2.195230
...                           ...
2001-12-31 23:59:55  43308.909704
2001-12-31 23:59:56  43309.098228
2001-12-31 23:59:57  43308.658888
2001-12-31 23:59:58  43308.790256
2001-12-31 23:59:59  43308.915838

[31536000 rows x 1 columns]

Why is it slow?

Of the 10s of execution time above, about 8s is spent in computing the window sizes, which is done in a hand-rolled numba CUDA kernel: https://github.com/rapidsai/cudf/blob/6f6e521257dce5732eea7b6b9d56243f8b0a69cc/python/cudf/cudf/utils/cudautils.py#L17. Note that running the code through a profiler will show execution time being spent in the next CUDA kernel (column.full) - but that's a red herring I think, because there's no synchronization after the numba kernel call.

What can we do about it?

I see a couple of options here:

  1. I wonder if there's a better way to write that kernel. Currently, it naively launches one thread per element, and does a linear search for the next element that would exceed the window bounds.
  2. We could make it libcudf's responsibility to compute the window sizes. I believe they already do window sizes computation in the context of grouped rolling window aggreagations: see grouped_range_rolling_window().
mroeschke commented 8 months ago

For 1, here's pandas implementation for finding the window bounds (defined in terms of start/end indices instead of the end - start difference (?)) https://github.com/pandas-dev/pandas/blob/4ed67ac9ef3d9fde6fb8441bc9ea33c0d786649e/pandas/_libs/window/indexers.pyx#L107. pandas uses a sliding window algorithm for this case

For 2, I suppose you could defined the non-grouped case as the 1 large group so that group_range_rolling_window could be used

shwina commented 8 months ago

For 2, I suppose you could defined the non-grouped case as the 1 large group so that group_range_rolling_window could be used

I did experiment with this and it is still somewhat slow. Perhaps libcudf is serializing within groups.

wence- commented 8 months ago

This is not due to the computation of the rolling window sizes, but rather that the libcudf rolling window computation is slow, I think. Here's an example to show that:

import cudf
import cudf._lib as libcudf
import numpy as np
dt = cudf.date_range("2001-01-01", "2002-01-01", freq="1s")
df = cudf.DataFrame({"x": np.random.rand(len(dt))}, index=dt)
max_window_size = 86400
pre_window = cudf.core.column.as_column(
    np.concatenate([np.arange(1, max_window_size, dtype="int32"), np.full(len(df) - (max_window_size - 1), max_window_size, dtype="int32")])
)
source_column = df["x"]._column
follow_window = cudf.core.column.full(len(df), 0, dtype="int32")
window = None
min_periods = 1
center = False
op = "sum"
agg_params = {}

result = libcudf.rolling.rolling(source_column, pre_window, follow_window, window, min_periods, center, op, agg_params)

The call to rolling takes 10 seconds for me. It's, in this example, linear in the size of the windows (change max_window_size).

I think that scaling kind of makes sense in that irrespective of the window size, one produces the same output windows, but each window is O(window_size) large, and the window-by-window approach implemented then scales in the same way.

I was wondering if there's some kind of fourier-space approach that one might use, but the potential for non-equispaced samples complicates things (there are non-uniform FFT methods but they are non-exact). And my brain is not sufficiently in gear.

In any case, it feels like this should be able to run faster than it does, and I wonder if it can do so by a combination of change in parallelisation strategy and/or clever algorithmic changes.

davidwendt commented 8 months ago

cc @mythrocks

wence- commented 8 months ago

I was wondering if there's some kind of fourier-space approach that one might use, but the potential for non-equispaced samples complicates things (there are non-uniform FFT methods but they are non-exact). And my brain is not sufficiently in gear.

No FFTs needed I think, this should be solvable in $\mathcal{O}(n)$ time for an $n$-row column via a summed-area table approach (AKA, in 1D, a prefix scan) for rolling operations whose aggregation op has an inverse.

This would be a two-pass algorithm I think, let's take sum as the example op.

Pass 1: compute scan(+, column) -> scan_column Pass 2: For each row i, the result is scan_column[i + forward_size[i]] - scan_column[i - backward_size[i]]

For things like variance and covariance, one needs to use some suitable adaptation of Welford's online approach. Some relevant recent papers:

Edit: one would have to worry (more) about overflow than with the naive approach.

shwina commented 8 months ago

There's almost definitely two inefficiencies at play here then, computing the window sizes given an offset is slower than we'd like, and the rolling window aggregation implementation is slower than we'd like.

wence- commented 8 months ago

computing the window sizes given an offset is slower than we'd like

I didn't really manage to measure that part as a noticeable problem, but maybe I was doing something different.

shwina commented 8 months ago

In your benchmark you're constructing the inputs to the libcudf rolling function by hand. But going through the public API takes you down a code path that uses a numba kernel to do that.

wence- commented 8 months ago

In your benchmark you're constructing the inputs to the libcudf rolling function by hand. But going through the public API takes you down a code path that uses a numba kernel to do that.

Ah sorry, yes, now I see it. I was tricked by the lack of synchronisation in the numba kernel launch.

Yes, that kernel has exactly the same problem the rolling window kernel does. Each row linearly searches backwards in the column until the difference between the preceding entry and the current one is larger than the requested offset.

I think you can do this by doing a reverse prefix scan of the differences between the entries in the to-be-windowed column, and then ... (brain out of gear again)

wence- commented 8 months ago

cc @harrism as local scan expert.

wence- commented 8 months ago

Just to note one further thing while it is in my thoughts, the (potential) downside to doing a full-column scan to implement this is, in addition to overflow, numerical roundoff if using floating point types[^1].

[^1]: The Chiemlowiec paper linked above provides bounds on the number of bits required to compute mean and variance if data are represented in fixed point. If the data are normally distributed with small(ish) variance, these bounds are not too bad, but if they are heavy-tailed they overflow 64bit.

wence- commented 8 months ago

The general term of art here is range queries. If the binary operator induces a group structure on the data then these can be done as suggested above (via prefix scans). If it only induces a semigroup structure (no inverse), for example rolling-min, then one needs to build more sophisticated data structures (A.C. Yao, Space-time tradeoff for answering range queries (1982), https://doi.org/10.1145/800070.802185), but can be done in at worse $\Theta{(c n)}$ space and $\mathcal{O}(\alpha_c(n))$ time, where $\alpha_c$ is the inverse Ackermann function (so effectively constant time for any feasible $n$)).

Since range minimum queries pop up a lot in geospatial analysis, I wonder if the cuspatial team implemented them.

harrism commented 8 months ago

Note that running the code through a profiler will show execution time being spent in the next CUDA kernel (column.full) - but that's a red herring I think, because there's no synchronization after the numba kernel call.

I don't think this should be true. I think Nsight systems can distinguish the execution time of kernels even without synchronisation. If you time manually in host code then you need to synchronize to time accurately.

shwina commented 8 months ago

Oh sorry, I meant a host profiler (in this case the Python profiler cprofile)

harrism commented 8 months ago

Regarding the numba kernel to find the windows, a first step is usually to move this to C++. When I search the codebase for this it only finds the definition, no calls. However the non-numba version is called, here: https://github.com/rapidsai/cudf/blob/d158ccdbe651952bd649cb0f17c41467c5209824/python/cudf/cudf/core/window/rolling.py#L483

So is this numba kernel actually being used?

Next question, is the arr data passed to it random, or does it happen to be ordered? If the latter, then this could be replaced by a call to thrust::lower_bound in C++ (with fancy iterators).

wence- commented 8 months ago

Regarding the numba kernel to find the windows, a first step is usually to move this to C++. When I search the codebase for this it only finds the definition, no calls. However the non-numba version is called, here:

https://github.com/rapidsai/cudf/blob/d158ccdbe651952bd649cb0f17c41467c5209824/python/cudf/cudf/core/window/rolling.py#L483

So is this numba kernel actually being used?

Next question, is the arr data passed to it random, or does it happen to be ordered? If the latter, then this could be replaced by a call to thrust::lower_bound in C++ (with fancy iterators).

The non-numba version calls the numba version: https://github.com/rapidsai/cudf/blob/6f6e521257dce5732eea7b6b9d56243f8b0a69cc/python/cudf/cudf/utils/cudautils.py#L32

I am not sure if the rolling window API allows non-sorted arrays but I would have thought not, so arr is probably (someone else to confirm) required to be sorted in ascending order.

mythrocks commented 3 months ago

Apologies for being late to this party.

The general term of art here is range queries... ... I am not sure if the rolling window API allows non-sorted arrays but I would have thought not, so arr is probably (someone else to confirm) required to be sorted in ascending order.

Please pardon my inexperience with the Pandas side of rolling window. (I've done a little bit of work on the Apache Spark end of this problem.)

If arr is with reference to the order-by column in range queries, then you're partly right: the column needs to be sorted. But the ordering may be ascending or descending.

Looking at grouped_range_rolling_window might illuminate the matter: The function takes the cudf::order corresponding to the column. The window ranges are calculated depending on the direction of ordering.