deephaven / deephaven-core

Deephaven Community Core
Other
256 stars 80 forks source link

FEATURE REQUEST: Ring tables that use a time column instead of a number of rows #3305

Open jjbrosnan opened 1 year ago

jjbrosnan commented 1 year ago

One of our active community users, miek in Slack, asked about the possibility of creating a ring table where the upper bound on table size is set by a time column rather than a number of rows. So, for example, instead of creating a ring table that can only hold a million rows, you could create a ring table that removes rows with timestamps more than 24 hours old.

devinrsmith commented 1 year ago

A useful generalization would be for an arbitrary predicate/filter to define when rows get pruned. This would easily satisfy the time-based use case, as well as allow for more advanced functionality.

kzk2000 commented 1 year ago

Thanks, in particular, the use case is you want to store tick data for 2+ tickers, but one ticks a lot more often then the other, so having a giant ring buffer with enough rows to hold the most active ticker for 24 hours feels suboptimal and memory expensive

kzk2000 commented 1 year ago

put differently, something like this would be cool (or equivalent)

ring_table_by_product = table_stream.tail_by("last X hours", by=["product_id"])

devinrsmith commented 1 year ago

4309 related to the tail_by example above

devinrsmith commented 1 year ago

I'm not sure "ring" is the right name for this feature. Maybe something like time_tail?

It would be interesting to consider a very general mechanism for expiring rows off the "back" of a table; for example, the oldest row is kept as long as Column X is true... as soon as the oldest row Column X is false, it expires. In this sense, time_tail would be easy extension of this mechanism via io.deephaven.engine.util.WindowCheck.

You could imagine something more general than a "ring" too, where any rows are expired when Column X turns false (think about a where Filter that prunes); once false, it's gone forever. Of course, this mechanism is probably harder to implement, but would likely be implemented with some where filtering + compaction scheme. (And even though the table data might be compacted, the resulting index might necessarily be more sparse / complex than the simpler time_tail mechanism.)

rcaudy commented 1 year ago

Is this request different from io.deephaven.engine.util.WindowCheck + a where?

devinrsmith commented 1 year ago

ring + window_check + where is not a bad proxy for this feature, but it's not equivalent IMO, namely b/c you still need to specify a ring size; but also...

one of the nice features about a ring table is that its index changes in a very well defined manner; always removing from the end and adding to the front (which makes the implementation clean for batch copy/fill). I think these would be good semantics to preserver for time_tail. Downstream users from a "ring" time_tail could always re-apply window_check + where if they really needed window_check semantics (likely, it would be a built-in option for time_tail to make it easy for user to choose).

devinrsmith commented 1 year ago
ring_semantics = blink_table.time_tail("1d", "Timestamp", strict=False)
window_semantics = blink_table.time_tail("1d", "Timestamp", strict=True)
jjbrosnan commented 1 year ago

If a time-based ring table works like the row-based ring table operation from the perspective of memory usage, I'd prefer the name time_ring or ring_time for consistency.

devinrsmith commented 1 year ago

Here's a manual version that combines tail + time_window; it's not really a proper replacement for this feature request, but at least demonstrates potential semantics.

from datetime import timedelta

from deephaven.experimental import time_window
from deephaven.table import Table

def time_tail(table: Table, ts_col: str, window: timedelta, num_rows: int) -> Table:
    # timedelta doesn't have native nanosecond support
    total_nanoseconds = (int)(window / timedelta(microseconds=1)) * 1000
    return (
        time_window(table.tail(num_rows), ts_col, total_nanoseconds, "_IsRecent")
        .where("_IsRecent")
        .drop_columns(["_IsRecent"])
    )
alexpeters1208 commented 9 months ago

Adding to this discussion because I wanted more out of a potential time-based ring table feature, so I implemented a prototype. I wanted three new things:

  1. The ability to have a time-based ring table from historical data getting replayed. The time_tail implementation above does not support this because of its reliance on time_window.
  2. The ability to use "offsets", IE shifting the relevant time window to 5 minutes in the past.
  3. The ability to "snap". Basically, I wanted to be able to specify a new time duration s (not the window) such that a table would only contain data up to the most recent s interval. If s is 5 seconds, then my table will appear to only update every 5 seconds, and the most recent rows will have timestaps of 13:00:04.997, then 13:00:09.982, then 13:00:14.999, etc. The example below clarifies this.

Here is the prototype:

from typing import Optional
from datetime import timedelta
from deephaven.table import Table
from deephaven.agg import sorted_last
from deephaven.time import to_j_duration

def relative_time_window(
        table: Table,
        ts_col: str,
        window: timedelta,
        offset: timedelta = timedelta(seconds=0),
        snap: Optional[timedelta] = None) -> Table:

    j_window = to_j_duration(window)
    j_offset = to_j_duration(offset)
    current_time = table.agg_by(sorted_last(ts_col, ts_col))
    table = table.natural_join(current_time, on=None, joins=f"CurrentTime={ts_col}")

    if snap is not None:
        j_snap = to_j_duration(snap)
        return table.\
            update(f"SnappedTime = lowerBin({ts_col}, j_snap.toNanos())").\
            where(f"CurrentTime - SnappedTime < j_snap.toNanos() + j_offset.toNanos() + j_window.toNanos() && \
                    CurrentTime - SnappedTime > j_snap.toNanos() + j_offset.toNanos()").\
            drop_columns(["CurrentTime", "SnappedTime"])

    return table.\
        where(f"CurrentTime - {ts_col} < j_offset.toNanos() + j_window.toNanos() && \
                CurrentTime - {ts_col} > j_offset.toNanos()").\
        drop_columns("CurrentTime")

and some examples:

from deephaven import time_table

t = time_table("PT0.1s").update(["X = ii", "Y = X + Math.sin(X) + randomGaussian(0.0, 1.0)"])

most_recent_30_seconds = relative_time_window(
    table=t,
    ts_col="Timestamp",
    window=timedelta(seconds=30))

next_most_recent_30_seconds = relative_time_window(
    table=t,
    ts_col="Timestamp",
    window=timedelta(seconds=30),
    offset=timedelta(seconds=30))

most_recent_30_seconds_snapped = relative_time_window(
    table=t,
    ts_col="Timestamp",
    window=timedelta(seconds=30),
    snap=timedelta(seconds=5))

next_most_recent_30_seconds_snapped = relative_time_window(
    table=t,
    ts_col="Timestamp",
    window=timedelta(seconds=30),
    offset=timedelta(seconds=30),
    snap=timedelta(seconds=5))
kzk2000 commented 9 months ago

@alexpeters1208 thanks for this, had forgotten about this one!

I'm wondering though, wouldn't the newish https://deephaven.io/core/docs/reference/table-operations/update-by-operations/rolling-group-time/ operator do most of what you want here?

That is, maybe there's a clever way to leverage rolling_group_time in your relative_time_window() func above, in particular, as you might want to do this by=['symbol'] or similar

kzk2000 commented 9 months ago

@alexpeters1208, I believe your 3rd item in Clickhouse this is called the hop() operator (Arroyo calls it hop() too), and there's also tumble() https://clickhouse.com/docs/en/sql-reference/functions/time-window-functions#tumble https://clickhouse.com/docs/en/sql-reference/functions/time-window-functions#hop

would definitely cool to have wrappers like yours to accomplish similar behavior

alexpeters1208 commented 9 months ago

@kzk2000 That's a great insight, I didn't consider that. Here's a simple example:

import deephaven.updateby as uby
import deephaven.agg as agg
from deephaven import time_table

t = time_table("PT1s").update("X = ii").\
    update_by(
        uby.rolling_group_time(
            ts_col="Timestamp",
            cols=None,
            rev_time="PT10s")).\
    tail(1).\
    ungroup()

That said, a "final" implementation would not make use of any of this. It would be done in Java and likely lean on the WindowChecker. However, as a prototype, your idea may be more efficient than mine. I know the natural_join in mine results in every row getting updated every cycle, which is far from ideal.