zzeekk / spark-temporalquery

Implicit functions for querying interval data with Apache Spark/Scala
MIT License
2 stars 1 forks source link

spark-temporalquery

Implicit functions for querying interval data with Apache Spark/Scala. Features:

Breaking changes in version 2.x:

Usage

Spark-temporalquery releases are published on maven central. To use it just add the following maven dependency for your Scala version to the project:

<dependency>
  <groupId>ch.zzeekk.spark</groupId>
  <artifactId>spark-temporalquery_2.12</artifactId>
  <version>2.0.1</version>
</dependency>

See also Builds to review compatibility between Spark, Scala and Java.

temporal queries

TemporalQueryUtil provides implicit function on DataFrame to query temporal data with timestamp interval axis datatype.

  // this imports temporal* implicit functions on DataFrame
  import ch.zzeekk.spark.temporalquery.TemporalQueryUtil._
  import ch.zzeekk.spark.temporalquery.{ClosedInterval, DiscreteTimeAxis}
  import java.sql.Timestamp
  import java.time.temporal.ChronoUnit
  // configure options for temporal query operations
  val intervalDef = ClosedInterval(Timestamp.valueOf("0001-01-01 00:00:00"), Timestamp.valueOf("9999-12-31 00:00:00"), DiscreteTimeAxis(ChronoUnit.MILLIS))
  implicit val tqc = TemporalClosedIntervalQueryConfig( fromColName="valid_from", toColName="valid_to", intervalDef = intervalDef)
  // make SparkSession implicitly available
  implicit val sss = spark
  import sss.implicits._
  // prepare some DataFrames
  val dfLeft = Seq((0, Timestamp.valueOf("2017-12-10 00:00:00"), Timestamp.valueOf("2018-12-08 23:59:59.999"), 4.2))
    .toDF("id", "valid_from", "valid_to","value_l")
  val dfRight = Seq((0, Timestamp.valueOf("2018-01-01 00:00:00"), Timestamp.valueOf("2018-12-31 23:59:59.999"), 5))
    .toDF("id", "valid_from", "valid_to","value_r")
  // use temporal* functions
  dfLeft.temporalInnerJoin(dfRight, Seq("id"))

linear queries

LinearGenericQueryUtil provides implicit function on DataFrame to query linear data with any numeric interval axis datatype. The following shortcuts exists to use it with predefined datatypes:

  // this imports linear* implicit functions on DataFrame
  import ch.zzeekk.spark.temporalquery.LinearDoubleQueryUtil._
  import ch.zzeekk.spark.temporalquery.HalfOpenInterval
  // configure options for linear query operations
  val intervalDef = HalfOpenInterval(0d, Double.MaxValue)
  implicit val lqc: LinearQueryConfig = LinearHalfOpenIntervalQueryConfig(fromColName="pos_from", toColName="pos_to", intervalDef = intervalDef)
  // make SparkSession implicitly available
  implicit val sss = session
  import sss.implicits._
  // prepare some DataFrames
  val dfLeft = Seq((0, 0.0, 100.0, 4.2))
    .toDF("id", "pos_from", "pos_to","value_l")
  val dfRight = Seq((0, 50.0, 200.0, 5))
    .toDF("id", "pos_from", "pos_to","value_r")
  // use linear* functions
  dfLeft.linearInnerJoin(dfRight, Seq("id"))

The following sections are written for temporal queries, but the library works in the same way with linear queries by exchanging temporal vs. linear in function names.

Precondition

For temporal queries a time axis with datatype timestamp is needed. The axis can be configured as:

Before using the operations below you must ensure that your data satisfies the requirements of the chosen intervalDef configuration. Moreover the data frame must not contain temporally overlapping entries or entries where validTo < validFrom as this will lead to confusing results.

temporalCombine() to clean up data frames

You may use the method temporalCleanupExtend and temporalCombine() in order to clean up your data frame. For example

IdvalvalidFromvalidTocomment
12.722019-01-05 12:34:56.1234567892019-02-01 02:34:56.1235nanoseconds
12.722019-02-01 01:00:00.02019-02-01 02:34:56.1245overlaps with previous
12.722019-02-10 00:00:02019-02-09 00:00:0ends before it starts
142.02019-01-01 00:00:02019-12-31 23:59:59.999does not overlap because different value:
many-to-many relation

is cleaned up to

IdvalvalidFromvalidTo
12.722019-01-05 12:34:56.1242019-02-01 02:34:56.124
142.02019-01-01 00:00:02019-12-31 23:59:59.999

Operations

You can then use the following additional functions on Dataset/DataFrame

Builds

Spark-temporalquery is built and released for Scala 2.11 with Spark 2.4.x and Scala 2.12 with Spark 3.x. Spark 3.x does not support Scala 2.11. Newer versions of Spark 2.4.x would support Scala 2.12, but there is no spark-temporalquery release for this combination.

Note that Spark 2.4 needs Java version 8, whereas Spark 3.x is compatible with Java 8/11/17. See also https://spark.apache.org/docs/latest/#downloading.

Troubleshooting

AnalysisException: Column ... is ambiguous.

On exception org.apache.spark.sql.AnalysisException: Column ... are ambiguous. It's probably because you joined several Datasets together, and some of these Datasets are the same. ... when using temporal*Join methods, try to use df.alias on both DataFrames before joining. If temporal-query finds aliases it will use them in the join conditions.

The exception might remain. In these cases you can disable the check by setting Spark property spark.sql.analyzer.failAmbiguousSelfJoin = false.