apache / datafusion

Apache DataFusion SQL Query Engine
https://datafusion.apache.org/
Apache License 2.0
5.47k stars 1.01k forks source link

Support Gap Filling on Time Series Data #4809

Open wolffcm opened 1 year ago

wolffcm commented 1 year ago

Is your feature request related to a problem or challenge? Please describe what you are trying to do.

A common use case when working with time series data is to compute an aggregate value for windows of time, e.g., every minute, hour, week or whatever. It is possible to do this with the DATE_BIN function in DataFusion. However, DATE_BIN will not produce any value for a window that did not contain any rows.

For example, for this input date:

time c0
2022-12-01 10
2022-12-03 30

We might run this query;

select
  date_bin(interval '1 day', time, timestamp '1970-01-01T00:00:00Z') as day,
  avg(c0) 
from t
group by day;

And we would get something like:

day avg
2022-12-01 10
2022-12-03 30

Generating a row in the output for 2022-12-02 is difficult to do with ANSI-SQL. Here is one attempt: Fill Gaps in Time Series with this simple trick in SQL. Having to write SQL like this for what is an intuitive and common use case is frustrating.

Describe the solution you'd like

It would be good to have a concise, idiomatic way to do this. Many vendors provide a solution for this problem. The have the following in common:

One such solution would be to use a function like TimeScale's functions time_bucket_gapfill and locf (last observation carried forward): https://docs.timescale.com/api/latest/hyperfunctions/gapfilling-interpolation/time_bucket_gapfill/

The above query might be changed to this, using time_bucket_gapfill and locf:

select
  time_bucket_gapfill(interval '1 day', time, timestamp '1970-01-01T00:00:00Z') as day,
  avg(c0),
  locf(avg(c0))
from t
group by day;
day avg locf
2022-12-01 10 10
2022-12-02 10
2022-12-03 30 30

TimeScale also provides interpolate to populate a gap with an interpolated value (e.g., would put 20 in the gap for the example).

I've written up an approach to this work here: https://docs.google.com/document/d/1vIcs9uhlCX_AkD9bemcDx-YhBOVe_TW5sBbXtKCHIfk/edit?usp=sharing

Initially we (InfluxData) were going to implement this in IOx directly, but seems like it could be worthy of upstreaming into DataFusion.

Describe alternatives you've considered

Postgres provides a general purpose way to generate data: https://www.postgresql.org/docs/9.1/functions-srf.html#FUNCTIONS-SRF-SERIES But this seems like it would be more difficult to use than something like time_bucket_gapfill.

alamb commented 1 year ago

@waynexia @jiacai2050 (CeresDB) @v0y4g3r (GrepTime) @gruuya (SeaFowl) As I believe you are building other timeseries database systems on DataFusion, I wonder if you have any thoughts about adding such a feature to DataFusion?

We plan on building this feature natively in IOx but might be willing to upstream it as well if there is community interest

cc @waitingkuo @andygrove and @liukun4515 in case you have thoughts as well

waynexia commented 1 year ago

I wonder if you have any thoughts about adding such a feature to DataFusion?

It looks good to have time_bucket_gapfill and locf natively in DataFusion to me. Selecting and postprocessing the selected data in TSDB is a bit complex compared to general DBMS. Prometheus/PromQL also have this kind of logic, which will try to align and lookback the "series": https://promlabs.com/blog/2020/07/02/selecting-data-in-promql#lookback-delta

I build something similar in the very recent: https://github.com/GreptimeTeam/greptimedb/blob/develop/src/promql/src/extension_plan/instant_manipulate.rs#L423

And we plan to expose this functionality to SQL interface in some way, which will become something similar to this proposal I think.

But my concern is, to provide a good use experience and functionality, we may need a bunch of "gap-filling" functions. Like filling it with null, filling it with the last value, filling it with the last value if the gap is less than 1 day otherwise left blank etc. I'm not sure if these "time-series functions" is also useful to other users of DataFusion (but it might be fine as PostgreSQL also provides such utils).

jiacai2050 commented 1 year ago

@alamb Thanks for bring me in.

I have one concern when implement this feature in datafusion.

Suppose time range of one query is [2022-12-02, 2022-12-04], and the real dataset is

Then what is the result of locf(avg(c0) for 2022-12-02, None or 10?

In Prometheus, it will be 10 when lookback-delta is 1d, and this require to rewrite time range of the query to [start-delta, end], and trim data to [start, end] after fill in gaps.

I don't know how timescale deal with this case, IMO rewrite time query of one query may not suitable for datafusion since it's a generic SQL engine.

Any ideas about this first value issue?

ozankabak commented 1 year ago

We are interested in this too. Are you aware of any approaches other than the two we have so far (time_bucket_gapfill and generate_series)? Any planned features related to this in recent and/or upcoming SQL standards and whatnot? The only possible pitfall I can see is not doing our homework before choosing an approach and running with it -- other than that this would be a great feature to have.

alamb commented 1 year ago

@ozankabak

Are you aware of any approaches other than the two we have so far (time_bucket_gapfill and generate_series)?

I don't know if any upcoming SQL standard for this but I didn't look hard at it either.

This use case is common. It is often called bucketing with "gap filling" or "interpolation" in other SQL implementations. This type of query is not easy to express in ANSI-SQL and thus databases often offer some sort of SQL extension.

Here are some example extensions I found:

All of these extensions have two main features:

alamb commented 1 year ago

@waynexia

But my concern is, to provide a good use experience and functionality, we may need a bunch of "gap-filling" functions. Like filling it with null, filling it with the last value, filling it with the last value if the gap is less than 1 day otherwise left blank etc. I'm not sure if these "time-series functions" is also useful to other users of DataFusion (but it might be fine as PostgreSQL also provides such utils).

I agree specifying the interpolation policy / gap filling is important.

In addition to the simple https://docs.timescale.com/api/latest/hyperfunctions/gapfilling-interpolation/locf/ function they have interpolate and one of the examples shows how to do some complicated lookup that appear similar to what you are suggesting

https://docs.timescale.com/api/latest/hyperfunctions/gapfilling-interpolation/interpolate/

I wonder if that would be sufficient 🤔

alamb commented 1 year ago

@jiacai2050

Support time range of one query is [2022-12-02, 2022-12-04], and the real dataset is

2022-12-01, 10 2022-12-03, 20 2022-12-04, 30

Then what is the result of locf(avg(c0) for 2022-12-02, None or 10?

I am not sure

I don't know how timescale deal with this case, IMO rewrite time query of one query may not suitable for datafusion since it's a generic SQL engine.

It may well be the case that this is something that is not easy / reasonable to express in SQL (

Any ideas about this first value issue?

The timebucket_gap_fill function (docs) can take an optional start and finish arguments which perhaps offers a way to express this case (apply start/finish filters after the query?)

Another way I could imagine is to run a subquery that has the full range [2022-12-01, 2022-12-04] with timebucket_gap_fill and then apply a filter in an outer query to restrict the data to [2022-12-02, 2022-12-04]

waynexia commented 1 year ago

I wonder if that would be sufficient thinking

Thanks for those links, interpolate looks amazing and powerful, allowing a sub-select as the argument can accomplish many requirements :+1: It would be nice if we support this function.

wolffcm commented 1 year ago

Here is a design document to do this work in DataFusion: https://docs.google.com/document/d/1vIcs9uhlCX_AkD9bemcDx-YhBOVe_TW5sBbXtKCHIfk/edit?usp=sharing

Initially I wrote this design thinking it would go into IOx, but since there is some interest here, I have adapted it a bit for DataFusion. Feedback on the approach would be appreciated!

ozankabak commented 1 year ago

I would like to think a little bit on how one can do this with multi-row window functions (i.e. window functions that may generate multiple rows for every frame). It seems the syntax and semantics of such an approach would be more in-line with standard SQL's treatment of windowing (which would be good from a POLA perspective). I will share my thoughts in a few days when things mature a little bit.

liukun4515 commented 1 year ago

@waynexia @jiacai2050 (CeresDB) @v0y4g3r (GrepTime) @gruuya (SeaFowl) As I believe you are building other timeseries database systems on DataFusion, I wonder if you have any thoughts about adding such a feature to DataFusion?

We plan on building this feature natively in IOx but might be willing to upstream it as well if there is community interest

cc @waitingkuo @andygrove and @liukun4515 in case you have thoughts as well

If the SQL and usage is compatible with the PG SQL syntax, I think it can be added in datafusion easily. If the the SQL syntax and usage is not compatible with PG or other RDBMS, I think we should add the feature in the corresponding lib or project instead of datafusion. It will breaks the SQL standards.

liukun4515 commented 1 year ago

Here is a design document to do this work in DataFusion: https://docs.google.com/document/d/1vIcs9uhlCX_AkD9bemcDx-YhBOVe_TW5sBbXtKCHIfk/edit?usp=sharing

Initially I wrote this design thinking it would go into IOx, but since there is some interest here, I have adapted it a bit for DataFusion. Feedback on the approach would be appreciated!

Thank @wolffcm . I need more time to take look your documents.

jiacai2050 commented 1 year ago

The timebucket_gap_fill function (docs) can take an optional start and finish arguments which perhaps offers a way to express this case (apply start/finish filters after the query?)

I'm afraid this doesn't work, timescale docs says

start and finish arguments do not filter input rows.

start/finish works after scan data(using where), if fetched data contains no value of 2022-12-01, then we won't get it in upper plan node.

I did following tests against timescale:

CREATE TABLE stocks_real_time (
  time TIMESTAMPTZ NOT NULL,
  price DOUBLE PRECISION NULL
);

SELECT create_hypertable('stocks_real_time','time');

insert into stocks_real_time values
       ('2022-10-01', 10),
       ('2022-10-03', 30),
       ('2022-10-04', 40),
       ('2022-10-05', 50);

SELECT
  time_bucket_gapfill('1 day', time, timestamp '2022-09-30', timestamp '2022-10-10') AS day,
  avg(price) AS value,
  locf(avg(price)),
  interpolate(avg(price))
FROM stocks_real_time
WHERE time > '2022-10-02' AND time < '2022-10-05'
GROUP BY day
ORDER BY day;

It will output

| day                    | value | locf | interpolate |
|------------------------+-------+------+-------------|
| 2022-09-30 00:00:00+00 |       |      |             |
| 2022-10-01 00:00:00+00 |       |      |             |
| 2022-10-02 00:00:00+00 |       |      |             |
| 2022-10-03 00:00:00+00 |    30 |   30 |          30 |
| 2022-10-04 00:00:00+00 |    40 |   40 |          40 |
| 2022-10-05 00:00:00+00 |       |   40 |             |
| 2022-10-06 00:00:00+00 |       |   40 |             |
| 2022-10-07 00:00:00+00 |       |   40 |             |
| 2022-10-08 00:00:00+00 |       |   40 |             |
| 2022-10-09 00:00:00+00 |       |   40 |             |

Another way I could imagine is to run a subquery that has the full range [2022-12-01, 2022-12-04] with timebucket_gap_fill and then apply a filter in an outer query to restrict the data to [2022-12-02, 2022-12-04]

Subquery seems unnecessary, if time range in time_bucket_gapfill different with range in where clause, maybe we can overwrite where clause, and filter data in GapFill plan node, something like this(adopted from google docs above):

Projection: datebin(...) AS day, locf
  GapFill: groupBy=[[datebin_gapfill(..)]], aggr=[[locf(avg(price)) as locf]], original_time=(2022-10-02, 2022-10-05)
    Sort: cpu ASC NULLS LAST, datebin(...) ASC NULLS LAST
      Aggregate: groupBy=[[datebin(...) AS datebin(...)]], aggr=[]
        TableScan: stocks_real_time projection=[time, price]   -- time range is rewritten as (2022-09-30, 2022-10-10)

@wolffcm @alamb Make sense?

wolffcm commented 1 year ago

@jiacai2050

Subquery seems unnecessary, if time range in time_bucket_gapfill different with range in where clause, maybe we can overwrite where clause, and filter data in GapFill plan node, something like this(adopted from google docs above):

I understand what you're suggesting, but I worry that rewriting a filter like that would have unforeseen effects that are difficult to understand. For example, if the input to Aggregate was not a simple scan or filter, but instead the output of a derived table or a join, it could be hard to do a rewrite. What would the behavior be for that case?

I think this problem is a really tricky one. In the TImeScale docs for locf() there is a prev parameter which solves this problem. It is basically a subquery. It's a little awkward to have to type it but has the advantage of not requiring rewriting other parts of the plan.

  locf(
    avg(temperature),
    (SELECT temperature FROM metrics m2 WHERE m2.time < now() - INTERVAL '2 week' AND m.device_id = m2.device_id ORDER BY time DESC LIMIT 1)
  )

I'm curious about what you think of that approach.

jiacai2050 commented 1 year ago

It seems subquery is more flexible and "safe" than rewrite where clause.

One more question: if the subquery return multiple values, which value will be chosen by locf?

liukun4515 commented 1 year ago

But my concern is, to provide a good use experience and functionality, we may need a bunch of "gap-filling" functions. Like filling it with null, filling it with the last value, filling it with the last value if the gap is less than 1 day otherwise left blank etc.

I'm not sure if these "time-series functions" is also useful to other users of DataFusion (but it might be fine as PostgreSQL also provides such utils).

@alamb I also have this concern, some functions specified in time-series maybe not compatible with PG, and they maybe not useful to other user who use datafusion just as the PG.

wolffcm commented 1 year ago

@jiacai2050

One more question: if the subquery return multiple values, which value will be chosen by locf?

I just tried this in TimeScale. if more than one value is returned, it returns an error more than one row returned by a subquery used as an expression similar to other situations where a subquery is expected to return a scalar.

ozankabak commented 1 year ago

I agree with @liukun4515 that unless we come up with something that deviates very little from standard SQL (and/or PG), it may be prudent to think on this and maybe leave it to other packages if we can't find a way.

I am not hopeless BTW -- I think there are ways to do this in a very much standard-like way, I just haven't had the time to look into it.

alamb commented 1 year ago

I think in terms of IOx we are happy to do it downstream in IOx via the existing DataFusion extension points as well -- I think it would help @wolffcm to know which way we are leaning we can avoid too much rework

jirislav commented 2 days ago

This issue didn't move at all for 1.5 year, what's the status here? I see that IOx has implemented this as an UDF, but it seems to me like the standard SQL (and/or PG) is taken too seriously within DataFusion and is inherently limiting the adoptability within time-series applications, such as finance or IoT, both quite big and growing industries.

I wouldn't suggest this if IOx stayed open-source, but since it is not anymore, couldn't it be supported at least through some kind of feature flag, something like SETTINGS enable_non_sql_standard_time_series_features = 1 similarly as ClickHouse does it?

alamb commented 2 days ago

This issue didn't move at all for 1.5 year, what's the status here?

I don't think there is any new status to report from my perspective

I wouldn't suggest this if IOx stayed open-source, but since it is not anymore,

Our implementation's source is currently in influxdb3_core: https://github.com/influxdata/influxdb3_core/blob/0f5ecbd6b17f83f7ad4ba55699fc2cd3e151cf94/query_functions/src/lib.rs#L33 in case anyone cares

FWIW "soon" InfluxData plans an open source offering based on the 3.0 architecture (aka IOx) but I don't have any additional specific details to share there

couldn't it be supported at least through some kind of feature flag, something like SETTINGS enable_non_sql_standard_time_series_features = 1 similarly as ClickHouse does it?

I think that would be possible

To improve timeseries support in DataFusion itself, I think working on ASOF join might be a good first step as that is more "standard" perhaps https://github.com/apache/datafusion/issues/318

Note there are a bunch of interesting timeseries optimizations such as https://github.com/apache/datafusion/issues/10316 and https://github.com/apache/datafusion/issues/10313 that could be added