databrickslabs / tempo

API for manipulating time series on top of Apache Spark: lagged time values, rolling statistics (mean, avg, sum, count, etc), AS OF joins, downsampling, and interpolation
https://pypi.org/project/dbl-tempo
Other
306 stars 51 forks source link

[Brainstorm] Support Waterfall join? #93

Closed CTCC1 closed 2 years ago

CTCC1 commented 2 years ago
Consider two spark dataframes: df_a = timestamp partition_col val_col_a
t0 val_id1 val_a1
t2 val_id1 val_a2
t3 val_id1 val_a3
df_b = timestamp partition_col val_col_b
t1 val_id1 val_b1
t4 val_id1 val_b2
t5 val_id1 val_b3
The desired result is df_ab = timestamp partition_col val_col_a val_col_b
t0 val_id1 val_a1 null
t1 val_id1 val_a1 val_b1
t2 val_id1 val_a2 val_b1
t3 val_id1 val_a3 val_b1
t4 val_id1 val_a3 val_b2
t5 val_id1 val_a3 val_b3

Effectively, this is like merging two time series, where each row in the result dataframe reflects the latest unioned state of both time series, as of that timestamp. Is there an existing way in spark to do this efficiently? Or is this something tempo could help solve?

Spratiher9 commented 2 years ago

@CTCC1 is this something general and prevalent in Time Series analysis?

I am asking this because if it is a general requirement in Time Series analysis then I will add the code for doing waterfallJoins as part of the TSDF class in tempo itself otherwise I will paste the code to implement for only this particular ask here.

CTCC1 commented 2 years ago

@Spratiher9 Hey, I would say it kind of depends on the correct way / efficient way to implement this feature. If the existing TSDF time indexing would be useful for the proper solution, it would make sense for tempo to support it imo.

And yes it is a common use case at the place I work for. But I am not sure if it qualifies for "time series analysis in general", as it kind of depends on the data model. This is effectively merging several stream of data and I imagine it is common.

rportilla-databricks commented 2 years ago

This is interesting @CTCC1 . It sounds like we can accomplish this with Delta MERGE; however, are there further assumptions or restrictions we can place on the new incremental data coming in? In this case, if the data were to come out of order (with a new column) for arbitrary late arrival events, this would cause a full re-write of data.

Spratiher9 commented 2 years ago

One approach I was thinking about was to do order by the timestamp column for each group of the partition_col's values and then forward fill the value columns.

rportilla-databricks commented 2 years ago

@guanjieshen is this something that df1.union(df).interpolate(method='ffill') will address?

CTCC1 commented 2 years ago

A quick update on what I ended up doing for my specific use case: in semi-pseudo code,

 # both asOfJoin with bucketing optimization, omitted here
left_as_of_right = left_df.asOfJoin(right_df)
right_as_of_left = right_df.asOfJoin(left_df)
# filter away rows in right_as_of_left where left_ts == right_ts
# clean up schema / columns after tempo join here
...
result = left_as_of_right.union(right_as_of_left)

In my specific case the as of join keys form a unique identifier between the left and right. So I achieved it by effectively doing 2 union-based as of join via tempo, filter away duplicates and union together the result. This avoided any expensive sql skew joins. But I haven't tested / benchmarked the other fill based methods against mine.

rportilla-databricks commented 2 years ago

This makes sense. I think the interpolation we developed uses the same logic essentially but might be a 1-liner. We'll update this thread in case there's something more succinct. Thanks @CTCC1 !

guanjieshen commented 2 years ago

@guanjieshen is this something that df1.union(df).interpolate(method='ffill') will address?

@rportilla-databricks yup the new interpolation feature using forward fill should return the exact result @CTCC1 is looking for.

I can share some sample code once PR#109 is merged.