apache / pinot

Apache Pinot - A realtime distributed OLAP datastore
https://pinot.apache.org/
Apache License 2.0
5.26k stars 1.23k forks source link

Create DATETIMECONVERTWINDOWHOP function #11775

Open alexch2000 opened 9 months ago

alexch2000 commented 9 months ago

Create a Window HOP function similar to DATETIMECONVERT.

The DATETIMECONVERTWINDOWHOP converts the value from a column that contains an epoch timestamp into another time unit and buckets based on the given time granularity and window hop size.

A hopping time window has a fixed duration (hopWindow paramenter) and hops by a specified hop interval (outputGranularity parameter). If the hop interval is smaller than the window size, hopping windows are overlapping. Thus, rows can be assigned to multiple windows. For example, a hopping window of 15 minutes size and 5 minute hop interval assigns each row to 3 different windows of 15 minute size, which are evaluated in an interval of 5 minutes. This is why the function returns an array of timestamps, in this example the array would contain 3 elements.

Signature

DATETIMECONVERTWINDOWHOP(columnName, inputFormat, outputFormat, outputGranularity, hopWindow)
inputFormat and outputFormat are defined using the following structure:
<time size>:<time unit>:<time format>:<pattern>

Usage Examples

These examples are based on the Batch JSON Quick Start. created_at_timestamp from milliseconds since epoch to seconds since epoch, bucketed to 1 hour window with 15 min granularity:

select id,
  created_at_timestamp,
  cast(created_at_timestamp AS long) AS timeInMs,
  DATETIMECONVERTWINDOWHOP(
    created_at_timestamp,
    '1:MILLISECONDS:EPOCH',
    '1:SECONDS:EPOCH',
    '15:MINUTES',
    '1:HOURS'
  ) AS windowHops
from githubEvents
WHERE id = 7044874134
id created_at_timestamp timeInMs windowHops
7044874134 2018-01-01 11:00:00.0 1514804402000 [1514804402,   1514803502, 1514802602, 1514801702]

Moving window of unique user counts per hour with 15 min granularity:

select
  DATETIMECONVERTWINDOWHOP(
    created_at_timestamp,
    '1:MILLISECONDS:EPOCH',
    '1:SECONDS:EPOCH',
    '15:MINUTES',
    '1:HOURS'
  ) AS windowHops,
  DISTINCTCOUNT(id) AS unique_prs
from githubEvents
group by 1
ORDER BY 1

windowHops unique_prs
1514801700 6680
1514802600 10000
1514803500 10000
1514804400 10000
1514805300 3320

Jackie-Jiang commented 9 months ago

Is this supported in other DBs? Or is this more like a workaround for window function? Trying to see if there is a standard SQL equivalence to this function to better understand the semantic

dario-liberman commented 9 months ago

In streaming SQL there are also similar workarounds, for example see flink construct here: https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-agg/

I think this approach is very convenient for working with timeseries data, specially when the windowed aggregation can't be done outside as a simple aggregation of individual data points, such as the case with distinct counts.

In traditional SQL, it's not an easy query to write, will try and write back.

dario-liberman commented 9 months ago

There are also moving time window SQL extensions in timeseries DBs, for example: https://docs.tdengine.com/taos-sql/distinguished/#time-window

Jackie-Jiang commented 8 months ago

@dario-liberman Thanks for providing the context! The docs you referenced are actually window functions, and here are some discussions around it #7213. It is not really possible to achieve it with single-stage engine (we can potentially bring everything back to broker, but it is not a scale solution when the table is large). We already have it implemented in the multi-stage engine, and we should explore if this feature can fit into that. cc @walterddr

@alexch2000 I don't think a new transform function can solve the moving time window problem because this calculation cannot fit into single-stage engine model.

walterddr commented 8 months ago

hmm. i am a bit confused. isn't the SQL you described above similar to https://calcite.apache.org/docs/reference.html#table-functions?

select id,
  created_at_timestamp,
  cast(created_at_timestamp AS long) AS timeInMs,
from TABLE(
  HOP(
    TABLE githubEvents,
    DESCRIPTOR(created_at_timestamp), 
    INTERVAL '15' MINUTE,
    INTERVAL '60' MINUTE))
WHERE id = 7044874134

was this proposed as a workaround b/c the non-support of table function for pinot currently?

BTW i dont understand what the 2 other argument '1:MILLISECONDS:EPOCH', and '1:SECONDS:EPOCH' means

dario-liberman commented 8 months ago

@dario-liberman Thanks for providing the context! The docs you referenced are actually window functions, and here are some discussions around it #7213. It is not really possible to achieve it with single-stage engine (we can potentially bring everything back to broker, but it is not a scale solution when the table is large). We already have it implemented in the multi-stage engine, and we should explore if this feature can fit into that. cc @walterddr

@alexch2000 I don't think a new transform function can solve the moving time window problem because this calculation cannot fit into single-stage engine model.

I work together with @alexch2000 - I actually suggested this approach so we can have moving windows in single stage. The trick is to take advantage of multivalued group by in Pinot. By assigning each row to multiple windows, one achieves the hopping window behaviour. This function is most useful when one groups by it, similar to how one groups by the regular date time convert to achieve a tumble window.

dario-liberman commented 8 months ago

hmm. i am a bit confused. isn't the SQL you described above similar to https://calcite.apache.org/docs/reference.html#table-functions?

select id,
  created_at_timestamp,
  cast(created_at_timestamp AS long) AS timeInMs,
from TABLE(
  HOP(
    TABLE githubEvents,
    DESCRIPTOR(created_at_timestamp), 
    INTERVAL '15' MINUTE,
    INTERVAL '60' MINUTE))
WHERE id = 7044874134

was this proposed as a workaround b/c the non-support of table function for pinot currently?

BTW i dont understand what the 2 other argument '1:MILLISECONDS:EPOCH', and '1:SECONDS:EPOCH' means

Yes, that is the proposal, a simple workaround until we can do a bigger change like this.

dario-liberman commented 8 months ago

BTW i dont understand what the 2 other argument '1:MILLISECONDS:EPOCH', and '1:SECONDS:EPOCH' means

See existing support for tumbling window here: https://docs.pinot.apache.org/configuration-reference/functions/datetimeconvert

The API here simply expands on this to support hopping windows.

dario-liberman commented 8 months ago

@jackjlli - I see in https://github.com/apache/pinot/issues/7213 you wanted to be able to calculate aggregations on moving time windows, such as avg COVID cases in a 7 day moving window. Here a proposal to solve it with a very simple transform function which works already in single stage engine thanks to multivalued group by support in Pinot.

Note also that the traditional SQL window semantics discussed as answer to your use case are actually not very good at sliding time windows, here instead we refer for analogy to streaming SQL constructs which capture timeseries semantics much better.

I actually believe moving time window aggregates for distinct counts with traditional SQL can not be calculated with row window aggregation, most answers out there do a left join on itself by time column between time window boundaries, with likely edge cases for time gaps in the data; a surprisingly difficult task.

cc: @kishoreg

Jackie-Jiang commented 8 months ago

@dario-liberman @alexch2000 You are correct, and it is a very smart way of leveraging MV group-by to achieve moving time window. Reviewing the PR