vaexio / vaex

Out-of-Core hybrid Apache Arrow/NumPy DataFrame for Python, ML, visualization and exploration of big tabular data at a billion rows per second 🚀
https://vaex.io
MIT License
8.28k stars 590 forks source link

[FEATURE-REQUEST] `origin` parameter in `BinnerTime`? #1959

Open yohplala opened 2 years ago

yohplala commented 2 years ago

Description Similar to pandas origin parameter in resample method, could vaex's BinnerTime offers an equivalent origin parameter? As per pandas documentation:

origin : Timestamp or str, default ‘start_day’

The timestamp on which to adjust the grouping. The timezone of origin must match the timezone of the index. If string, must be one of the following:

‘epoch’: origin is 1970-01-01
‘start’: origin is the first value of the timeseries
‘start_day’: origin is the first day at midnight of the timeseries
‘end’: origin is the last value of the timeseries
‘end_day’: origin is the ceiling midnight of the last day

__

Is your feature request related to a problem? Please describe. When conducting a groupby using BinnerTime, the anchor used currently for the bins is the timestamp of the 1st row.

import vaex
import pandas as pd

# Example setup
start = "2022/03/01 2:00"
ts = pd.date_range(start=start, periods=4, freq='3H')
len_ts = len(ts)
pdf = pd.DataFrame({'val':range(len_ts), 'ts':ts})
vdf = vaex.from_pandas(pdf)

vdf
Out[56]: 
  #    val  ts
  0      0  2022-03-01 02:00:00.000000000
  1      1  2022-03-01 05:00:00.000000000
  2      2  2022-03-01 08:00:00.000000000
  3      3  2022-03-01 11:00:00.000000000

# Testing resampling with '4h' binning.
vdf.groupby(vaex.BinnerTime(vdf.ts, resolution='h', every=4), agg={'sum': vaex.agg.sum("val")})
Out[57]: 
  #  ts               sum
  0  2022-03-01 02      1      # 1st bin start at 1st timestamp from initial df
  1  2022-03-01 06      2
  2  2022-03-01 10      3

But what if I want my bins anchored to midnight? (the bin to start at midnight) In this case, result in column sum would not be the same.

Additional context A workaround I see is to re-use some pandas functions to modify the timestamp of the first row, and set it at the timestamp the user wants the 1st bin to start.

# re-using previous example setup

# pandas utils for offseting first timestamp
from pandas.core.resample import _get_timestamp_range_edges as gtre
from pandas.tseries.frequencies import to_offset

# Anchoring '4H' bin to midnight.
ts_start = pd.Timestamp(vdf['ts'][:0].to_numpy()[0])
ts_start, _ = gtre(ts_start, ts_start, freq=to_offset('4H'), origin="start_day")

# Modifying 1st timestamp in initial vaex dataframe
first_row = vdf[:1].to_pandas_df()
first_row.loc[0,"ts"] = ts_start
vdf = vaex.from_pandas(first_row).concat(vdf[1:])

vdf
Out[58]: 
  #    val  ts
  0      0  2022-03-01 00:00:00.000000000
  1      1  2022-03-01 05:00:00.000000000
  2      2  2022-03-01 08:00:00.000000000
  3      3  2022-03-01 11:00:00.000000000

... well, I thought it would work, but vaex is moving the start one hour earlier. Hmmm... what is this mystery about?

vdf.groupby(vaex.BinnerTime(vdf.ts, resolution='h', every=4), agg={'sum': vaex.agg.sum("val")})
Out[59]: 
  #  ts               sum
  0  2022-02-28 23      0
  1  2022-03-01 03      1
  2  2022-03-01 07      2
  3  2022-03-01 11      3

First timestamp is starting at 11pm on 28th of February?

maartenbreddels commented 2 years ago

@JovanVeljanoski what do you think?

JovanVeljanoski commented 2 years ago

The request makes sense to me. I don't know what the api should look like yet, but having a reference point does make sense. Especially given this: https://github.com/vaexio/vaex/issues/408#issuecomment-694674972 :)

Dunno how we are with time to take this one, but that is a separate story.

solwarsop commented 1 year ago

Here's a quick modification that seems to work in the limited testing I've done...

class AnchoredBinnerTime(vaex.BinnerTime):
    def __init__(self, expression, resolution='W', df=None, every=1, start_anchor=None):
        self.resolution = resolution
        self.expression = expression
        self.df = df or expression.ds
        self.every = every
        self.sort_indices = None
        # make sure it's an expression
        self.expression = self.df[str(self.expression)]
        self.label = self.expression._label
        self.tmin, self.tmax = self.df[str(self.expression)].minmax()

        self.resolution_type = 'M8[%s]' % self.resolution
        if start_anchor is not None:
            self.tmin = np.datetime64(start_anchor).astype(self.resolution_type)

        dt = (self.tmax.astype(self.resolution_type) - self.tmin.astype(self.resolution_type))
        self.N = (dt.astype(int).item() + 1)
        # divide by every, and round up
        self.N = (self.N + every - 1) // every
        self.bin_values = np.arange(self.tmin.astype(self.resolution_type), self.tmax.astype(self.resolution_type)+1, every)
        self._promise = vaex.promise.Promise.fulfilled(None)

# Testing resampling with '4h' binning with a chosen start anchor.
vdf.groupby(AnchoredBinnerTime(vdf.ts, resolution='h', every=4, start_anchor='2022-03-01 00:00'), agg={'sum': vaex.agg.sum("val")})
Out[101]:
  # ts  sum
  0 2022-03-01 00   0
  1 2022-03-01 04   1
  2 2022-03-01 08   5