databrickslabs / tempo

API for manipulating time series on top of Apache Spark: lagged time values, rolling statistics (mean, avg, sum, count, etc), AS OF joins, downsampling, and interpolation
https://pypi.org/project/dbl-tempo
Other
310 stars 53 forks source link

joinAsOf extra join conditions #353

Closed machielg closed 1 year ago

machielg commented 1 year ago

In one dataframe I have rows that don't need to joined because of non-time related columns. Would it be an option to add the extra join conditions as an optional argument to the joinAsOf function?

tnixon commented 1 year ago

Hi @machielg - could you give me an illustration of your situation? An example of the kind of dataframe you're looking at?

machielg commented 1 year ago

I have one df with a events for a particular site.

evenst_df

|ts_event        |site  | device|
|2020-01-01T12:00|site-A| d-1   |
|2020-01-01T12:30|site-A| d-1   |

and a measurements_df with measurements for that site but also other sites

|ts_event        |site   |device|measurement|
|2020-01-01T12:00|site-A |d-1   |9.0        |
|2020-01-01T12:20|site-A |d-1   |9.0        |
|2020-01-01T12:00|site-A |d-2   |10.0       |
|2020-01-01T12:00|site-B |d-1   |11.0       |
|2020-01-01T12:00|site-C |d-1   |12.0       |

So I want to join only the first 2 rows of measurements_df with events_df on their event-time (with a margin) because the other rows don't match the site and device.

tnixon commented 1 year ago

Hi @machielg, I believe the As-Of join will already do what you are asking for.

Here is some sample code based on your example dataframes:

from datetime import datetime as dt

from tempo import TSDF

# events data
events_schema = "ts_event timestamp, site string, device string"
events_data = [
  (dt(2020, 1, 1, 12), "site-A", "d-1"),
  (dt(2020, 1, 1, 12, 30), "site-A", "d-1")
]
events_df = spark.createDataFrame( events_data, events_schema )
events_tsdf = TSDF(events_df, "ts_event", ["site", "device"])
events_tsdf.show()

# measurements data
measurements_schema = "ts_event timestamp, site string, device string, measurement double"
measurements_data = [
  (dt(2020, 1, 1, 12, 0), "site-A", "d-1", 9.0),
  (dt(2020, 1, 1, 12, 20), "site-A", "d-1", 9.0),
  (dt(2020, 1, 1, 12, 0), "site-A", "d-2", 10.0),
  (dt(2020, 1, 1, 12, 0), "site-B", "d-1", 11.0),
  (dt(2020, 1, 1, 12, 0), "site-C", "d-1", 12.0)
]

measurements_df = spark.createDataFrame( measurements_data, measurements_schema )
measurements_tsdf = TSDF(measurements_df, "ts_event", ["site", "device"])
measurements_tsdf.show()

# perform as-of join
joined_tsdf = events_tsdf.asofJoin(measurements_tsdf)
joined_tsdf.show()

When I run this, I get the following output:

+-------------------+------+------+
|           ts_event|  site|device|
+-------------------+------+------+
|2020-01-01 12:00:00|site-A|   d-1|
|2020-01-01 12:30:00|site-A|   d-1|
+-------------------+------+------+

+-------------------+------+------+-----------+
|           ts_event|  site|device|measurement|
+-------------------+------+------+-----------+
|2020-01-01 12:00:00|site-A|   d-1|        9.0|
|2020-01-01 12:20:00|site-A|   d-1|        9.0|
|2020-01-01 12:00:00|site-A|   d-2|       10.0|
|2020-01-01 12:00:00|site-B|   d-1|       11.0|
|2020-01-01 12:00:00|site-C|   d-1|       12.0|
+-------------------+------+------+-----------+

+-------------------+------+------+-------------------+-----------------+
|           ts_event|  site|device|     right_ts_event|right_measurement|
+-------------------+------+------+-------------------+-----------------+
|2020-01-01 12:00:00|site-A|   d-1|2020-01-01 12:00:00|              9.0|
|2020-01-01 12:30:00|site-A|   d-1|2020-01-01 12:20:00|              9.0|
+-------------------+------+------+-------------------+-----------------+

Does this perform the join the way you want?

machielg commented 1 year ago

Yes, thanks!