deephaven / deephaven-core

Deephaven Community Core
Other
255 stars 80 forks source link

Simple user creatable aggregations for `agg_by`, `update_by`, and `range_join` #4838

Open chipkent opened 12 months ago

chipkent commented 12 months ago

Current agg_by, update_by, and range_join aggregations:

  1. Are limited to DH-supplied options
  2. Are very difficult to implement in DH and require very specialized knowledge
  3. Are impossible to create simple example cases for new implementations
  4. Require changes deep in the engine

To deal with these limitations, operations such as AggFormula and AggGroup+update are suggested to users. These steps can create viable output, but they suffer from limited functionality (e.g. #4194, #4195, #4052). Additionally, these solutions do large recomputations, rather than computations on changes, which makes them inefficient.

There should be agg_by, update_by, and range_join operations that:

  1. Are more complex than formulas to implement.
  2. Are simple for users to construct custom cases.
  3. Compute on deltas, rather than complete recomputations.
  4. Support multiple input columns.
  5. Have better performance than using a formula but possibly worse performance than the existing ultra-optimized operators
  6. Can be created in Java or Python
  7. Can be implemented once and used by agg_by, update_by, and range_join

As an example, let's consider a weighted absolute sum. Pseudocode may look something like:

import numpy as np
from deephaven import AggOperator, AggUpdateDelta, agg, updateby

class WAbsSum(AggOperator):
    def __init__(class, result: str, weight: str, value: str):
        # result is the result column name
        # weight is the weight column name
        # value is the value column name 
        self.cols = [weight, value]
        self.weight = weight
        self.value = value
        AggOperator.__init__(result, self.cols)

    def update(self, key: Any, delta: AggUpdateDelta) -> float
        # AggUpdateDelta is similar to deephaven.table_lister.TableUpdate
        adds = delta.added(self.cols)
        removes = delta.added(self.cols)
        mods = delta.modified(self.cols)
        mods_prev = delta.modified_prev(self.cols)

        # state is a generic object particular to this implementation
        state = get_state(key) # from AggOperator

        if state is None:
            rst = 0.0
        else:
            rst = state        

        def f(col_vals) -> float:
            return np.sum(col_vals[self.weight]*np.abs(col_vals[self.value]))

        rst += f(adds) - f(removes) + f(mods) - f(mods_prev)
        set_state(key, rst) # from AggOperator
        return state

t_aby = t.agg_by([agg.abs_sum("AbsX=X"), WAbsSum("MyWAbsSum", "W", "V")], by=["A","B"])

# Here updateby.TickRange takes the generic user-defined aggregation operation and wraps it in a 20-tick window operation.
t_uby = t.update_by([updateby.cum_min(["X","Y"]), updateby.TickRange(20, WAbsSum("MyWAbsSum", "W", "V"))], by=["A","B"])

t_rj = t.range_join(table=t_right, on=["A", "B"], aggs=[group("X"), WAbsSum("MyWAbsSum", "W", "V")])
jjbrosnan commented 11 months ago

To follow up to Chip's initial comment on this post, here's more motivation to make this happen.

I wrote up code that calculates Aroon indicators for some finance data. Here's the code:

from deephaven.updateby import rolling_min_time, rolling_max_time, rolling_group_time
from deephaven.plot.figure import Figure
from deephaven.time import to_j_instant, to_np_datetime64
from deephaven import read_csv
import numpy.typing as npt
import numpy as np

crypto_table = read_csv("https://media.githubusercontent.com/media/deephaven/examples/main/CryptoCurrencyHistory/CSV/CryptoTrades_20210922.csv")

min_25min = rolling_min_time(ts_col="Timestamp", cols=["Min_25m = Price"], rev_time="PT25m")
max_25min = rolling_max_time(ts_col="Timestamp", cols=["Max_25m = Price"], rev_time="PT25m")
group_25min = rolling_group_time(ts_col="Timestamp", cols=["Prices_25m = Price", "Timestamps_25m = Timestamp"], rev_time="PT25m")

def aroon(prices, times) -> npt.NDArray[np.double]:
    prices = np.array(prices)
    maxprice_idx = prices.argmax()
    minprice_idx = prices.argmin()
    tmax = to_np_datetime64(times[maxprice_idx])
    tmin = to_np_datetime64(times[minprice_idx])
    tlast = to_np_datetime64(times[-1])
    tmin_diff = float((tlast - tmin) / np.timedelta64(1, "m"))
    tmax_diff = float((tlast - tmax) / np.timedelta64(1, "m"))
    return [tmin_diff, tmax_diff]

crypto_updated = crypto_table.update_by(
    ops=[min_25min, max_25min, group_25min],
    by=["Instrument", "Exchange"]
).update_view(
    formulas=["Aroon = (double[])aroon(Prices_25m, Timestamps_25m)", "AroonDown = ((25 - (double)Aroon[0]) / 25) * 100", "AroonUp = ((25 - (double)Aroon[1]) / 25) * 100"]
).drop_columns(
    cols=["Aroon", "Timestamps_25m", "Prices_25m", "Min_25m", "Max_25m"]
)

eth_coinbase_aroon = crypto_updated.where(["Instrument == `ETH/USD`", "Exchange == `coinbase-pro`"])

price_plot = Figure().\
    plot_xy(series_name="Aroon Up", t=eth_coinbase_aroon, x="Timestamp", y="Price").\
    chart_title(title="Price").\
    show()

aroon_plot = Figure().\
    plot_xy(series_name="Aroon Up", t=eth_coinbase_aroon, x="Timestamp", y="AroonUp").\
    plot_xy(series_name="Aroon Down", t=eth_coinbase_aroon, x="Timestamp", y="AroonDown").\
    chart_title(title="Aroon Indicators").\
    show()

The table I'm doing this on has 1M rows. The code, as-is, uses update_view to do on-demand calculations. That works well to produce the resultant table, but then takes a long time to render the output. If I change it to an update instead, the operation just hangs for 10+ minutes.

The code is slow for some fairly obvious reasons, mostly with having to use a rolling_group with a custom Python function. It would be amazing if I could implement a custom aggregation/UpdateByOperation that could do it more efficiently.

If there's a better way to implement this currently, I'm all ears. There are probably ways to improve the code as-is, but I can't think of a way to make this operation efficient in the current version of Deephaven.