databrickslabs / tempo

API for manipulating time series on top of Apache Spark: lagged time values, rolling statistics (mean, avg, sum, count, etc), AS OF joins, downsampling, and interpolation
https://pypi.org/project/dbl-tempo
Other
306 stars 50 forks source link

Support for structured streaming #70

Open kjw03 opened 3 years ago

kjw03 commented 3 years ago

Hi team, I just watched a talk from Ricardo and Tristan on Mar 16, 2021. At the end there was a question from the audience about support for streaming. It seemed the answer was not at this time, and Tempo was primarily intended for batch use cases at the time. Does Tempo now support structured streaming? If not, is it on the road map?

rportilla-databricks commented 3 years ago

Hi thanks for the question! We do not yet support structured streaming - but do you have a use case you're interested in? For example streaming AS OF joins?

On Mon, Jun 7, 2021 at 11:37 PM kjw03 @.***> wrote:

Hi team, I just watched a talk from Ricardo and Tristan on Mar 16, 2021. At the end there was a question from the audience about support for streaming. It seemed the answer was not at this time, and Tempo was primarily intended for batch use cases at the time. Does Tempo now support structured streaming? If not, is it on the road map?

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/databrickslabs/tempo/issues/70, or unsubscribe https://github.com/notifications/unsubscribe-auth/AJCRAXFP5GKK6KO55S7TKL3TRWF6ZANCNFSM46I6W43Q .

--

Ricardo Portilla

Lead Solutions Architect

Databricks Inc.

@.***

databricks.com

kjw03 commented 3 years ago

Hi Ricardo,

I'm looking at financial data rather than, for example, sensor data so I'm not getting data at regular intervals. However, I need to provide summaries & features at regular intervals. My first cut was to create a time_ticks DataFrame as shown below.

image

And then join the financial data in the analysis time window for each time tick as shown below.

image

As you may have guessed, this approach has terrible performance. To improve it, I thought I would (with trades data as an example): 1) From the irregularly spaced trades data, create lookback vectors for the event_timestamp and feature columns (e.g., price, volume, taker_side) using a RANGE frame over the event_timestamp the length of the selected analysis time window size 2) Create a new DataFrame with the selectively spaced time ticks of interest and do an AS OF type join to get the columns holding the lookback vectors 3) Run my aggregations on the lookback arrays, in each case only processing those elements that are within analysis_time_window_seconds from the time tick rather than the original event I used to construct the vector

This led me to tempo, which I was pleasantly surprised to find. It seems there are two requests from here in order to support the use case described above.

1) Can we adjust tempo to account not just for regularly spaced/sampled time series data like you might see in an IOT ecosystem, but providing support for creating statistics on a specified cadence using overlapping analysis windows of a specified size? 2) Can we extend tempo to support structured streaming?

I'll keep looking into this but I'm certainly curious what your thoughts are in regards to how aligned this type of work is with your vision for the library.

kjw03 commented 3 years ago

Just to clarify, in the above use case, I'm building a continuous application.

rportilla-databricks commented 3 years ago

Thanks for the detailed explanation! We can help with this - would you mind sending over your email so I can set up a meeting.

On Tue, Jun 8, 2021 at 2:01 PM kjw03 @.***> wrote:

Just to clarify, in the above use case, I'm building a continuous application.

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/databrickslabs/tempo/issues/70#issuecomment-856978096, or unsubscribe https://github.com/notifications/unsubscribe-auth/AJCRAXHI4Z6U3HOAP2PME2LTRZLG3ANCNFSM46I6W43Q .

--

Ricardo Portilla

Lead Solutions Architect

Databricks Inc.

@.***

databricks.com

rportilla-databricks commented 2 years ago

@Sonali-guleria , just for reference.

sim-san commented 2 years ago

@Sonali-guleria @tnixon Is there any progress here ?

When using resample and interpolate inside the foreachBatch function, I get an error message. (See Issue https://github.com/databrickslabs/tempo/issues/192#issuecomment-1160196626)

Also the use of ASOF Join with streams leads to an error message:

AttributeError                            Traceback (most recent call last)
<command-3872531940924524> in <module>
----> 1 stream_stream_asof_join()

<command-2772754795498460> in stream_stream_asof_join()
     12                              ts_col="logging_timestamp", partition_cols = ["logger_trip_counter"])
     13 
---> 14     joined_df = can_tsdf.asofJoin(analog_tsdf,
     15                                   left_prefix="can",
     16                                   right_prefix="analog").df

/local_disk0/.ephemeral_nfs/envs/pythonEnv-bff3f78d-b0ef-4734-b3ca-e83e3c63113f/lib/python3.8/site-packages/tempo/tsdf.py in asofJoin(self, right_tsdf, left_prefix, right_prefix, tsPartitionVal, fraction, skipNulls, sql_join_opt, suppress_null_warning)
    388 
    389     spark = (SparkSession.builder.getOrCreate())
--> 390     left_bytes = self.__getBytesFromPlan(left_df, spark)
    391     right_bytes = self.__getBytesFromPlan(right_df, spark)
    392 

/local_disk0/.ephemeral_nfs/envs/pythonEnv-bff3f78d-b0ef-4734-b3ca-e83e3c63113f/lib/python3.8/site-packages/tempo/tsdf.py in __getBytesFromPlan(self, df, spark)
    348       import re
    349 
--> 350       result = re.search(r"sizeInBytes=.*(['\)])", plan, re.MULTILINE).group(0).replace(")", "")
    351       size = result.split("=")[1].split(" ")[0]
    352       units = result.split("=")[1].split(" ")[1]

AttributeError: 'NoneType' object has no attribute 'group'
Sonali-guleria commented 1 year ago

Hi @sim-san : yes, soon we are releasing streaming support for tempo.

kurtmaile commented 1 year ago

Hi, I had a call with our Databricks account team and SA today around our timeseries use case. We have a need to do timeseries 'temporal' joins a big financial service use case, and also as we are big DLT users, looking to see how this can be done natively in DLT.

He pointed us at this repo as the best place to start - he also mentioned this issue, making it available in structured streaming (soon) and by association making it possible in DLT (when upgraded to 13.2) in some incremental mode.

What is the current status of this release please? Hope we get to see a non-trivial example in DLT as well as that would be super - our use case is joining 4 streams - 2 faster moving intraday (every few hours), the other 2 slowly changing reference data (once a week). Is there a tentative release date for this issue? Thanks heaps!