twosigma / flint

A Time Series Library for Apache Spark
Apache License 2.0
993 stars 184 forks source link

join not only by time but additionally also by column #54

Closed geoHeil closed 5 years ago

geoHeil commented 5 years ago

How can I join not only by time but also by a column?

Currently, I get: Found duplicate columns, but I would like to perform the time series join per group.

val left = Seq((1,1L, 0.1), (1, 2L,0.2), (3,1L,0.3), (3, 2L,0.4)).toDF("group", "time", "valueA")
  val right = Seq((1,1L, 11), (1, 2L,12), (3,1L,13), (3, 2L,14)).toDF("group", "time", "valueB")
  val leftTs = TimeSeriesRDD.fromDF(dataFrame = left)(isSorted = false, timeUnit = MILLISECONDS)
  val rightTS        = TimeSeriesRDD.fromDF(dataFrame = right)(isSorted = false, timeUnit = MILLISECONDS)

  val mergedPerGroup = leftTs.leftJoin(rightTS, tolerance = "1s")

fails due to duplicate columns.

When renaming the columns:

val left = Seq((1,1L, 0.1), (1, 2L,0.2), (3,1L,0.3), (3, 2L,0.4)).toDF("groupA", "time", "valueA")
  val right = Seq((1,1L, 11), (1, 2L,12), (3,1L,13), (3, 2L,14)).toDF("groupB", "time", "valueB")
  val leftTs = TimeSeriesRDD.fromDF(dataFrame = left)(isSorted = false, timeUnit = MILLISECONDS)
  val rightTS        = TimeSeriesRDD.fromDF(dataFrame = right)(isSorted = false, timeUnit = MILLISECONDS)

  val mergedPerGroup = leftTs.leftJoin(rightTS, tolerance = "1s")
  mergedPerGroup.toDF.printSchema
  mergedPerGroup.toDF.show
+-------+------+------+------+------+
|   time|groupA|valueA|groupB|valueB|
+-------+------+------+------+------+
|1000000|     1|   0.1|     3|    13|
|1000000|     3|   0.3|     3|    13|
|2000000|     1|   0.2|     3|    14|
|2000000|     3|   0.4|     3|    14|
+-------+------+------+------+------+

a cross join is performed between each group and time series. that needs to be manually reduced.

mergedPerGroup.toDF.filter(col("groupA") === col("groupB")).show
+-------+------+------+------+------+
|   time|groupA|valueA|groupB|valueB|
+-------+------+------+------+------+
|1000000|     3|   0.3|     3|    13|
|2000000|     3|   0.4|     3|    14|

Is there any functionality to perform this type of join more efficiently / built in?

icexelloss commented 5 years ago

leftJoin takes a "key" argument that allows you to specify the secondary join key (equality only)

geoHeil commented 5 years ago

Thanks.

one fighter question: when using an interval which is rather large (i.e. multiple values from the right fall into the interval from the left the join will only join the first record. Which means the distinct I previously used is not required.

leftTs.leftJoin(rightTS, tolerance = "1s", key = Seq("group")).toDF.show
+-------+-----+------+------+
|   time|group|valueA|valueB|
+-------+-----+------+------+
|1000000|    1|   0.1|    11|
|1000000|    3|   0.3|    13|
|2000000|    1|   0.2|    12|
|2000000|    3|   0.4|    14|
+-------+-----+------+------+

is looking better

leftTs.leftJoin(rightTS, tolerance = "1hour", key = Seq("group")).toDF.show
+-------+-----+------+------+
|   time|group|valueA|valueB|
+-------+-----+------+------+
|1000000|    1|   0.1|    11|
|1000000|    3|   0.3|    13|
|2000000|    1|   0.2|    12|
|2000000|    3|   0.4|    14|
+-------+-----+------+------+
geoHeil commented 5 years ago

I believe this is also stated in the documentation:

leftJoin A function performs the temporal left-join to the right TimeSeriesRDD, i.e. left-join using inexact timestamp matches. For each row in the left, append the most recent row from the right at or before the same time. An example to join two TimeSeriesRDDs is as follows.