pola-rs / polars

Dataframes powered by a multithreaded, vectorized query engine, written in Rust
https://docs.pola.rs
Other
29.59k stars 1.89k forks source link

Obtain multiple ranges of data (for aggregation) from a huge dataset - similar to R's foverlaps() in the data.table package #9481

Closed lux5 closed 2 weeks ago

lux5 commented 1 year ago

Problem description

I would like to use Polars to perform some analysis on multiple small ranges of data from a huge dataset. I will provide example data for a quick demo of my goal.

import polars as pl

ts = pl.read_parquet('flo_ts.parquet')
print(ts)

# shape: (35_849, 2)
# ┌──────────────────────────┬────────┐
# │ Time                     ┆ Flow   │
# │ ---                      ┆ ---    │
# │ datetime[μs, Etc/GMT-12] ┆ f64    │
# ╞══════════════════════════╪════════╡
# │ 2013-07-01 00:00:00 +12  ┆ 1.293  │
# │ 2013-07-01 00:15:00 +12  ┆ 1.293  │
# │ 2013-07-01 00:30:00 +12  ┆ 1.293  │
# │ 2013-07-01 00:45:00 +12  ┆ 1.27   │
# │ …                        ┆ …      │
# │ 2014-06-30 23:00:00 +12  ┆ 23.693 │
# │ 2014-06-30 23:15:00 +12  ┆ 21.968 │
# │ 2014-06-30 23:30:00 +12  ┆ 21.456 │
# │ 2014-06-30 23:45:00 +12  ┆ 20.169 │
# └──────────────────────────┴────────┘

lookup = pl.read_parquet('events_of_interests.parquet')
print(lookup)

# shape: (4, 3)
# ┌─────┬──────────────────────────┬──────────────────────────┐
# │ ID  ┆ Start                    ┆ End                      │
# │ --- ┆ ---                      ┆ ---                      │
# │ i32 ┆ datetime[μs, Etc/GMT-12] ┆ datetime[μs, Etc/GMT-12] │
# ╞═════╪══════════════════════════╪══════════════════════════╡
# │ 1   ┆ 2013-07-02 10:24:00 +12  ┆ 2013-10-13 15:05:00 +12  │
# │ 2   ┆ 2013-10-13 22:44:00 +12  ┆ 2013-11-18 18:16:00 +12  │
# │ 3   ┆ 2014-05-23 09:28:00 +12  ┆ 2014-06-03 07:01:00 +12  │
# │ 4   ┆ 2014-06-25 00:43:00 +12  ┆ 2014-06-30 23:47:00 +12  │
# └─────┴──────────────────────────┴──────────────────────────┘

Now, I would like to get the results shown below - have the Maximum and Time to Maximum for each event ID in the lookup DataFrame:

"""
shape: (4, 5)
┌─────┬──────────────────────────┬──────────────────────────┬────────┬──────────────────────────┐
│ ID  ┆ Start                    ┆ End                      ┆ Max    ┆ Time_max                 │
│ --- ┆ ---                      ┆ ---                      ┆ ---    ┆ ---                      │
│ i32 ┆ datetime[μs, Etc/GMT-12] ┆ datetime[μs, Etc/GMT-12] ┆ f64    ┆ datetime[μs, Etc/GMT-12] │
╞═════╪══════════════════════════╪══════════════════════════╪════════╪══════════════════════════╡
│ 1   ┆ 2013-07-02 10:24:00 +12  ┆ 2013-10-13 15:05:00 +12  ┆ 93.504 ┆ 2013-07-06 04:20:00 +12  │
│ 2   ┆ 2013-10-13 22:44:00 +12  ┆ 2013-11-18 18:16:00 +12  ┆ 23.561 ┆ 2013-10-24 08:00:00 +12  │
│ 3   ┆ 2014-05-23 09:28:00 +12  ┆ 2014-06-03 07:01:00 +12  ┆ 24.522 ┆ 2014-05-29 05:00:00 +12  │
│ 4   ┆ 2014-06-25 00:43:00 +12  ┆ 2014-06-30 23:47:00 +12  ┆ 117.61 ┆ 2014-06-25 12:15:00 +12  │
└─────┴──────────────────────────┴──────────────────────────┴────────┴──────────────────────────┘
"""

Before I can get the above results, I need to assign the ID to the input flow time series based on the Start and End of each event - so that I could aggregate on the Flow values by each corresponding ID. For the moment, based on my understanding of Polars module. It would be very easy for me to write a loop like follows:

ts_lookup = pl.DataFrame()
for row in lookup.iter_rows(named=True):
    ts_event = (
        ts.filter(
            (pl.col('Time') >= row['Start'])
            & (pl.col('Time') <= row['End'])
        )
        .with_columns(pl.lit(row['ID']).alias('ID'))
    )
    ts_lookup = pl.concat([ts_lookup, ts_event])

print(ts_lookup)

# shape: (16_703, 3)
# ┌──────────────────────────┬────────┬─────┐
# │ Time                     ┆ Flow   ┆ ID  │
# │ ---                      ┆ ---    ┆ --- │
# │ datetime[μs, Etc/GMT-12] ┆ f64    ┆ i32 │
# ╞══════════════════════════╪════════╪═════╡
# │ 2013-07-02 10:25:00 +12  ┆ 3.042  ┆ 1   │
# │ 2013-07-02 10:30:00 +12  ┆ 3.07   ┆ 1   │
# │ 2013-07-02 10:35:00 +12  ┆ 3.113  ┆ 1   │
# │ 2013-07-02 10:40:00 +12  ┆ 3.142  ┆ 1   │
# │ …                        ┆ …      ┆ …   │
# │ 2014-06-30 23:00:00 +12  ┆ 23.693 ┆ 4   │
# │ 2014-06-30 23:15:00 +12  ┆ 21.968 ┆ 4   │
# │ 2014-06-30 23:30:00 +12  ┆ 21.456 ┆ 4   │
# │ 2014-06-30 23:45:00 +12  ┆ 20.169 ┆ 4   │
# └──────────────────────────┴────────┴─────┘

Now I can aggregate on the Flow values to get what I need:

ts_lookup.groupby('ID').agg(
    pl.col('Flow').max().alias('Max'),
    pl.col('Time').sort_by('Flow', descending=True).first().alias('Time_max'),
)
# shape: (4, 3)
# ┌─────┬────────┬──────────────────────────┐
# │ ID  ┆ Max    ┆ Time_max                 │
# │ --- ┆ ---    ┆ ---                      │
# │ i32 ┆ f64    ┆ datetime[μs, Etc/GMT-12] │
# ╞═════╪════════╪══════════════════════════╡
# │ 1   ┆ 93.504 ┆ 2013-07-06 04:20:00 +12  │
# │ 2   ┆ 23.561 ┆ 2013-10-24 08:00:00 +12  │
# │ 3   ┆ 24.522 ┆ 2014-05-29 05:00:00 +12  │
# │ 4   ┆ 117.61 ┆ 2014-06-25 12:15:00 +12  │
# └─────┴────────┴──────────────────────────┘

I only used a small portion of my dataset, and the lookup table's height is only 4. So the loop is fine. But I sometimes have 10K events needed from close to 2 million values of the input time series, that is a problem! I have tried around 5k loops to do the aggregation at the same time (see here for an example), but it ended up with the terminal window being frozen...

I would like to request a Polars function/method - to achieve this, a similar function like R's foverlaps() in the data.table package. This tool can clip out all the needed time series (in my case) in a single operation. It is so fast and convenient (You can find example data and R codes here).

Many thanks to the Polars developers for making such a useful tool.

erinov1 commented 1 year ago

This is essentially the same as https://github.com/pola-rs/polars/issues/9467 and https://github.com/pola-rs/polars/issues/5891.

In your case it is probably much faster to find which row indices in ts each interval in lookup corresponds to (by sorting the data and using either search_sorted or a rolling join). Then you can replace the filter operation in the construction of ts_lookup with a much faster zero-copy slice operation (e.g., https://github.com/pola-rs/polars/issues/5891#issuecomment-1364124215).

As I commented in https://github.com/pola-rs/polars/issues/9467, in my own applications (~500M rows of data, ~50M intervals each spanning ~10K rows), it was much faster to first find the slice indices using polars, but then iterate + aggregate over the data slices with numba.

In principle, one might also be able to do this on the rust side by constructing a custom GroupsProxy consisting of all the slices, but I wasn't able to see an entrypoint to the aggregation context from such a construction. I previously opened opened https://github.com/pola-rs/polars/issues/7211, perhaps @ritchie46 can comment.

avimallu commented 1 year ago

Oh wow, @erinov1, I did not catch the link between foverlap like functionality and groupby_slice requests! Once non-equi joins are in place in Polars, I think implementing these will become easier.

lux5 commented 1 year ago

This is essentially the same as #9467 and #5891.

In your case it is probably much faster to find which row indices in ts each interval in lookup corresponds to (by sorting the data and using either search_sorted or a rolling join). Then you can replace the filter operation in the construction of ts_lookup with a much faster zero-copy slice operation (e.g., #5891 (comment)).

As I commented in #9467, in my own applications (~500M rows of data, ~50M intervals each spanning ~10K rows), it was much faster to first find the slice indices using polars, but then iterate + aggregate over the data slices with numba.

In principle, one might also be able to do this on the rust side by constructing a custom GroupsProxy consisting of all the slices, but I wasn't able to see an entrypoint to the aggregation context from such a construction. I previously opened opened #7211, perhaps @ritchie46 can comment.

Thank you for giving me the idea of using search_sorted! I will give it a try myself. As for the rolling join example in here, I couldn't get the example code working.

cmdlineluser commented 1 year ago

This is the approach mentioned in #9467 incase you want to try it.

You explode the range of slices and use .take(idx) to populate the values corresponding values:

ts = pl.scan_parquet('flo_ts.parquet')
lookup = pl.scan_parquet('events_of_interests.parquet')

slices = ts.with_context(lookup).select( 
   'ID',
   'Start',
   'End',
   idx = pl.arange(
      pl.col('Time').search_sorted('Start'),
      pl.col('Time').search_sorted('End')
   )
)

(ts.with_context(slices.explode('idx'))
   .select('ID', 'Start', 'End', pl.col('Time', 'Flow').take('idx'))
   .groupby('ID').agg(
      pl.col('Flow').max().alias('Max'),
      pl.col('Time').sort_by('Flow', descending=True).first().alias('Time_max'),
   )
).collect()
shape: (4, 3)
┌─────┬────────┬──────────────────────────┐
│ ID  ┆ Max    ┆ Time_max                 │
│ --- ┆ ---    ┆ ---                      │
│ i32 ┆ f64    ┆ datetime[μs, Etc/GMT-12] │
╞═════╪════════╪══════════════════════════╡
│ 1   ┆ 93.504 ┆ 2013-07-06 04:20:00 +12  │
│ 2   ┆ 23.561 ┆ 2013-10-24 08:00:00 +12  │
│ 3   ┆ 24.522 ┆ 2014-05-29 05:00:00 +12  │
│ 4   ┆ 117.61 ┆ 2014-06-25 12:15:00 +12  │
└─────┴────────┴──────────────────────────┘
lux5 commented 1 year ago

This is the approach mentioned in #9467 incase you want to try it.

You explode the range of slices and use .take(idx) to populate the values corresponding values:

ts = pl.scan_parquet('flo_ts.parquet')
lookup = pl.scan_parquet('events_of_interests.parquet')

slices = ts.with_context(lookup).select( 
   'ID',
   'Start',
   'End',
   idx = pl.arange(
      pl.col('Time').search_sorted('Start'),
      pl.col('Time').search_sorted('End')
   )
)

(ts.with_context(slices.explode('idx'))
   .select('ID', 'Start', 'End', pl.col('Time', 'Flow').take('idx'))
   .groupby('ID').agg(
      pl.col('Flow').max().alias('Max'),
      pl.col('Time').sort_by('Flow', descending=True).first().alias('Time_max'),
   )
).collect()
shape: (4, 3)
┌─────┬────────┬──────────────────────────┐
│ ID  ┆ Max    ┆ Time_max                 │
│ --- ┆ ---    ┆ ---                      │
│ i32 ┆ f64    ┆ datetime[μs, Etc/GMT-12] │
╞═════╪════════╪══════════════════════════╡
│ 1   ┆ 93.504 ┆ 2013-07-06 04:20:00 +12  │
│ 2   ┆ 23.561 ┆ 2013-10-24 08:00:00 +12  │
│ 3   ┆ 24.522 ┆ 2014-05-29 05:00:00 +12  │
│ 4   ┆ 117.61 ┆ 2014-06-25 12:15:00 +12  │
└─────┴────────┴──────────────────────────┘

Wow, this is awesome! I like this one. The hard part for me is

...
pl.arange(
      pl.col('Time').search_sorted('Start'),
      pl.col('Time').search_sorted('End')
   )
...

I eventually see what it does after I break it and check the DOC. Thank you for this solution, and it generally solves my issues.

cmdlineluser commented 1 year ago

Yeah, it explodes out the range - it seems to perform well enough in some of these cases.

If I try it with a larger version of @erinov1 's #5891 example though it gets real slow and tries to use huge amounts of memory.

I guess it's not an efficient/scalable approach for when there are large amounts of overlapping ranges.

lux5 commented 1 year ago

Thank you for explanation about its use case, cmdlineluser. Hopefully, Polars could have a similar function like foverlaps() in the near future.

areeh commented 1 year ago

I hit the same situation a while back and wrote some code inside Polars to get it to work (See: #9691, bear in mind it probably has errors and was copying from the groupby_dynamic/rolling functions as they were months ago). I'd love to see similar functionality added to Polars properly

cmdlineluser commented 3 weeks ago

I think the new .join_where() may also close this issue?

(lookup
  .join_where(ts,
     pl.col.Time >= pl.col.Start,
     pl.col.Time <= pl.col.End
  )
  .group_by(pl.col.ID)
  .agg(pl.all().get(pl.col.Flow.arg_max()))
)

# shape: (4, 5)
# ┌─────┬──────────────────────────┬──────────────────────────┬──────────────────────────┬────────┐
# │ ID  ┆ Start                    ┆ End                      ┆ Time                     ┆ Flow   │
# │ --- ┆ ---                      ┆ ---                      ┆ ---                      ┆ ---    │
# │ i32 ┆ datetime[μs, Etc/GMT-12] ┆ datetime[μs, Etc/GMT-12] ┆ datetime[μs, Etc/GMT-12] ┆ f64    │
# ╞═════╪══════════════════════════╪══════════════════════════╪══════════════════════════╪════════╡
# │ 3   ┆ 2014-05-23 09:28:00 +12  ┆ 2014-06-03 07:01:00 +12  ┆ 2014-05-29 05:00:00 +12  ┆ 24.522 │
# │ 4   ┆ 2014-06-25 00:43:00 +12  ┆ 2014-06-30 23:47:00 +12  ┆ 2014-06-25 12:15:00 +12  ┆ 117.61 │
# │ 1   ┆ 2013-07-02 10:24:00 +12  ┆ 2013-10-13 15:05:00 +12  ┆ 2013-07-06 04:20:00 +12  ┆ 93.504 │
# │ 2   ┆ 2013-10-13 22:44:00 +12  ┆ 2013-11-18 18:16:00 +12  ┆ 2013-10-24 08:00:00 +12  ┆ 23.561 │
# └─────┴──────────────────────────┴──────────────────────────┴──────────────────────────┴────────┘
lux5 commented 2 weeks ago

I think the new .join_where() may also close this issue?


(lookup

  .join_where(ts,

     pl.col.Time >= pl.col.Start,

     pl.col.Time <= pl.col.End

  )

  .group_by(pl.col.ID)

  .agg(pl.all().get(pl.col.Flow.arg_max()))

)

# shape: (4, 5)

# ┌─────┬──────────────────────────┬──────────────────────────┬──────────────────────────┬────────┐

# │ ID  ┆ Start                    ┆ End                      ┆ Time                     ┆ Flow   │

# │ --- ┆ ---                      ┆ ---                      ┆ ---                      ┆ ---    │

# │ i32 ┆ datetime[μs, Etc/GMT-12] ┆ datetime[μs, Etc/GMT-12] ┆ datetime[μs, Etc/GMT-12] ┆ f64    │

# ╞═════╪══════════════════════════╪══════════════════════════╪══════════════════════════╪════════╡

# │ 3   ┆ 2014-05-23 09:28:00 +12  ┆ 2014-06-03 07:01:00 +12  ┆ 2014-05-29 05:00:00 +12  ┆ 24.522 │

# │ 4   ┆ 2014-06-25 00:43:00 +12  ┆ 2014-06-30 23:47:00 +12  ┆ 2014-06-25 12:15:00 +12  ┆ 117.61 │

# │ 1   ┆ 2013-07-02 10:24:00 +12  ┆ 2013-10-13 15:05:00 +12  ┆ 2013-07-06 04:20:00 +12  ┆ 93.504 │

# │ 2   ┆ 2013-10-13 22:44:00 +12  ┆ 2013-11-18 18:16:00 +12  ┆ 2013-10-24 08:00:00 +12  ┆ 23.561 │

# └─────┴──────────────────────────┴──────────────────────────┴──────────────────────────┴────────┘

Thank you for the nice solution. I didn't know that method .join_where() was added. This answers my question. Yes, the post can be closed.