opensearch-project / opensearch-spark

Spark Accelerator framework ; It enables secondary indices to remote data stores.
Apache License 2.0
14 stars 23 forks source link

[FEATURE] Enhance materialized views to support streaming JOIN refresh #474

Open dai-chen opened 2 months ago

dai-chen commented 2 months ago

Is your feature request related to a problem?

Currently, Flint's auto/incremental-refresh materialized views (MVs) are limited to supporting single-table queries with aggregations and windowing functions. This lack of support for JOIN operations restricts the ability to create complex, pre-computed aggregates that involve multiple tables and thus impacts users who need to optimize such queries by a single MV for improved performance.

Specifically, there are constraints in Spark Structured Streaming and Flint SQL regarding streaming join support:

  1. [Spark] Stream-stream joins must be non-aggregate: An exception is thrown if multiple streaming aggregations are attempted with streaming DataFrames/Datasets, resulting in the error message: "Multiple streaming aggregations are not supported with streaming DataFrames/Datasets." (Good to know)

  2. [Spark] Equality join condition required: At least one equality join condition is necessary; cross joins are not supported. An exception message reads: "Stream-stream join without equality predicate is not supported." (Good to know)

  3. [Flint] Window generation without aggregation: Flint currently lacks a way to generate a window without requiring aggregation. The TUMBLE function must be used in the GROUP BY clause, which may not be suitable for all use cases. (Good to have)

  4. [Flint] Watermark delay configuration for multiple tables: There is a need for a mechanism to configure watermark delays for multiple tables. Currently, the watermark_delay option does not support this flexibility. (Needs to be addressed)

What solution would you like?

I propose enhancing Flint's materialized view functionality to support JOIN operations within the materialized view queries. Specifically, users should be able to define materialized views that incorporate JOIN operations between multiple tables, with the aforementioned constraints alleviated by a newly introduced Windowing Table-Valued Function (TVF) in Flint SQL.

This new TVF function allows users to define windowing without the necessity to group or collapse data, while also enabling them to provide a watermark delay for each table. Below is an example showcasing the new TVF TUMBLE function:

CREATE MATERIALIZED VIEW test
AS
SELECT ...
FROM (
  SELECT
    window.startTime AS startTime,
    ...
  FROM
    TUMBLE(orders, time, '1 Day', '1 Hour')
) AS t1
JOIN (
  SELECT
    window.startTime AS startTime,
    ...
  FROM
    TUMBLE(audit, time, '1 Day', '10 Minutes')
) AS t2
ON
  t1.startTime = t2.startTime
  AND
  t1.id = t2.id
GROUP BY ...

(Appendix)

What alternatives have you considered?

An alternative solution is to create separate materialized views for each table and use the OpenSearch SQL plugin to perform JOIN operations across multiple indices. However, this approach has several drawbacks:

This alternative does not fully address the need for efficient, complex JOIN operations within materialized views and introduces additional challenges related to storage and maintenance.

Do you have any additional context?

Apache Flint

In Apache Flint, the time column and watermark delay are characteristics of the data source rather than the queries themselves. Therefore, users are required to specify these attributes in the CREATE TABLE statement:

CREATE TABLE Orders (
    `user` BIGINT,
    product STRING,
    order_time TIMESTAMP(3),
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH ( . . . );

In queries, window functions are solely used to define windowing. For reference, here’s an example of the syntax used in Apache Flink:

SELECT L.num as L_Num, L.id as L_Id, R.num as R_Num, R.id as R_Id,
 COALESCE(L.window_start, R.window_start) as window_start,
 COALESCE(L.window_end, R.window_end) as window_end
FROM (
   SELECT * FROM TABLE(TUMBLE(TABLE LeftTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))
) L
FULL JOIN (
   SELECT * FROM TABLE(TUMBLE(TABLE RightTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))
) R
ON L.num = R.num AND L.window_start = R.window_start AND L.window_end = R.window_end;

Flint SQL

CREATE MATERIALIZED VIEW test
AS
SELECT ...
FROM (
  SELECT
    time,
    ...
  FROM orders
) AS t1
JOIN (
  SELECT
    time,
    ...
  FROM audit
) AS t2
ON
  t1.time BETWEEN t2.time AND t2.time + INTERVAL 1 DAY
  AND
  t1.id = t2.id
GROUP BY ...
WITH (
  watermark_delay = {
    orders, time, '1 Hour',
    audit, time, '10 Minutes'
  }
);
dai-chen commented 1 month ago

Quick tested joined queries with windowing function but failed due to missing support in Spark 3.3.1:

  test("create materialized view with join") {
    withTempDir { checkpointDir =>
      sql(s"""
           | CREATE MATERIALIZED VIEW $testMvName
           | AS
           | SELECT
           |   t1.startTime AS eventTime,
           |   t1.name,
           |   t2.productId
           | FROM (
           |   SELECT
           |     window.start AS startTime,
           |     name
           |   FROM $testTable
           |   GROUP BY
           |     TUMBLE(time, '10 Minutes'),
           |     name
           | ) AS t1
           | JOIN (
           |   SELECT
           |     window.start AS startTime,
           |     productId
           |   FROM $testTable2
           |   GROUP BY
           |     TUMBLE(transactionDate, '10 Minutes'),
           |     productId
           | ) AS t2
           | ON t1.startTime = t2.startTime
           | WITH (
           |   auto_refresh = true,
           |   checkpoint_location = '${checkpointDir.getAbsolutePath}',
           |   watermark_delay = '1 Second'
           | )
           |""".stripMargin)

      val job = spark.streams.active.find(_.name == testFlintIndex)
      job shouldBe defined
      failAfter(streamingTimeout) {
        job.get.processAllAvailable()
      }

      flint.queryIndex(testFlintIndex).show()
    }
  }

org.apache.spark.sql.AnalysisException: Multiple streaming aggregations are not supported with streaming DataFrames/Datasets; line 1 pos 0;
Project [startTime#160 AS eventTime#162, name#173, productId#178]
+- Join Inner, (startTime#160 = startTime#161)
   :- SubqueryAlias t1
   :  +- Aggregate [window#187-T1000ms, name#173], [window#187-T1000ms.start AS startTime#160, name#173]
   :     +- Project [named_struct(start, precisetimestampconversion(((precisetimestampconversion(time#172-T1000ms, TimestampType, LongType) - (((precisetimestampconversion(time#172-T1000ms, TimestampType, LongType) - 0) + 600000000) % 600000000)) - 0), LongType, TimestampType), end, precisetimestampconversion((((precisetimestampconversion(time#172-T1000ms, TimestampType, LongType) - (((precisetimestampconversion(time#172-T1000ms, TimestampType, LongType) - 0) + 600000000) % 600000000)) - 0) + 600000000), LongType, TimestampType)) AS window#187-T1000ms, time#172-T1000ms, name#173, age#174, address#175]
   :        +- Filter isnotnull(time#172-T1000ms)
   :           +- EventTimeWatermark time#172: timestamp, 1 seconds
   :              +- SubqueryAlias spark_catalog.default.mv_test
   :                 +- StreamingRelation DataSource(org.apache.spark.sql.test.TestSparkSession@3db2665d,CSV,List(),Some(StructType(StructField(time,TimestampType,true),StructField(name,StringType,true),StructField(age,IntegerType,true),StructField(address,StringType,true))),List(),None,Map(header -> false, delimiter ->   , path -> file:/Users/daichen/IdeaProjects/opensearch-spark/spark-warehouse/mv_test),None), FileSource[file:/Users/daichen/IdeaProjects/opensearch-spark/spark-warehouse/mv_test], [time#172, name#173, age#174, address#175]
   +- SubqueryAlias t2
      +- Aggregate [window#188-T1000ms, productId#178], [window#188-T1000ms.start AS startTime#161, productId#178]
         +- Project [named_struct(start, precisetimestampconversion(((precisetimestampconversion(transactionDate#177-T1000ms, TimestampType, LongType) - (((precisetimestampconversion(transactionDate#177-T1000ms, TimestampType, LongType) - 0) + 600000000) % 600000000)) - 0), LongType, TimestampType), end, precisetimestampconversion((((precisetimestampconversion(transactionDate#177-T1000ms, TimestampType, LongType) - (((precisetimestampconversion(transactionDate#177-T1000ms, TimestampType, LongType) - 0) + 600000000) % 600000000)) - 0) + 600000000), LongType, TimestampType)) AS window#188-T1000ms, transactionId#176, transactionDate#177-T1000ms, productId#178, productsAmount#179, customerId#180, year#181, month#182]
            +- Filter isnotnull(transactionDate#177-T1000ms)
               +- EventTimeWatermark transactionDate#177: timestamp, 1 seconds
                  +- SubqueryAlias spark_catalog.default.mv_test_2
                     +- StreamingRelation DataSource(org.apache.spark.sql.test.TestSparkSession@3db2665d,CSV,List(),Some(StructType(StructField(transactionId,StringType,true),StructField(transactionDate,TimestampType,true),StructField(productId,StringType,true),StructField(productsAmount,IntegerType,true),StructField(customerId,StringType,true),StructField(year,IntegerType,true),StructField(month,IntegerType,true))),List(),None,Map(header -> false, delimiter ->   , path -> file:/Users/daichen/IdeaProjects/opensearch-spark/spark-warehouse/mv_test_2),None), FileSource[file:/Users/daichen/IdeaProjects/opensearch-spark/spark-warehouse/mv_test_2], [transactionId#176, transactionDate#177, productId#178, productsAmount#179, customerId#180, year#181, month#182]
dai-chen commented 1 month ago

I quickly tested joined queries without using windowing functions (instead, the conditions were specified in the join clause) and observed the following limitations:

  1. Watermark delay: It is not possible to specify a watermark delay for each table, as the watermark_delay option in the WITH clause does not take effect.
  2. Streaming join type: Both sides of the join are treated as streaming sources, resulting in a stream-stream join, with no current way to express a stream-static join.

Despite these limitations, the MV with joined queries seems function as intended:

  protected def createTimeSeriesTransactionTable(testTable: String): Unit = {
    sql(s"""
      | CREATE TABLE $testTable
      | (
      |   transactionId STRING,
      |   transactionDate TIMESTAMP,
      |   productId STRING,
      |   productsAmount INT,
      |   customerId STRING
      | )
      | USING $tableType $tableOptions
      | PARTITIONED BY (
      |    year INT,
      |    month INT
      | )
      |""".stripMargin)

    // Update data insertion
    // -- Inserting records into the testTable for April 2023
    sql(s"""
      | INSERT INTO $testTable PARTITION (year=2023, month=4)
      | VALUES
      | ('txn001', CAST('2023-04-01 10:30:00' AS TIMESTAMP), 'prod1', 2, 'cust1'),
      | ('txn001', CAST('2023-04-01 14:30:00' AS TIMESTAMP), 'prod1', 4, 'cust1'),
      | ('txn002', CAST('2023-04-02 11:45:00' AS TIMESTAMP), 'prod2', 1, 'cust2'),
      | ('txn003', CAST('2023-04-03 12:15:00' AS TIMESTAMP), 'prod3', 3, 'cust1'),
      | ('txn004', CAST('2023-04-04 09:50:00' AS TIMESTAMP), 'prod1', 1, 'cust3')
      |  """.stripMargin)

    // Update data insertion
    // -- Inserting records into the testTable for May 2023
    sql(s"""
      | INSERT INTO $testTable PARTITION (year=2023, month=5)
      | VALUES
      | ('txn005', CAST('2023-05-01 08:30:00' AS TIMESTAMP), 'prod2', 1, 'cust4'),
      | ('txn006', CAST('2023-05-02 07:25:00' AS TIMESTAMP), 'prod4', 5, 'cust2'),
      | ('txn007', CAST('2023-05-03 15:40:00' AS TIMESTAMP), 'prod3', 1, 'cust3'),
      | ('txn007', CAST('2023-05-03 19:30:00' AS TIMESTAMP), 'prod3', 2, 'cust3'),
      | ('txn008', CAST('2023-05-04 14:15:00' AS TIMESTAMP), 'prod1', 4, 'cust1')
      |  """.stripMargin)
  }

  test("create materialized view with join") {
    withTempDir { checkpointDir =>
      sql(s"""
           | CREATE MATERIALIZED VIEW $testMvName
           | AS
           | SELECT
           |   t1.startTime AS eventTime,
           |   t1.transactionId,
           |   t2.productId
           | FROM (
           |   SELECT
           |     transactionId,
           |     transactionDate AS startTime,
           |     productId
           |   FROM $testTable2
           | ) AS t1
           | JOIN (
           |   SELECT
           |     transactionId,
           |     transactionDate AS startTime,
           |     productId
           |   FROM $testTable3
           | ) AS t2
           | ON
           |   t1.transactionId = t2.transactionId
           |   AND
           |   t1.startTime BETWEEN t2.startTime AND t2.startTime + INTERVAL 1 DAY
           | WITH (
           |   auto_refresh = true,
           |   checkpoint_location = '${checkpointDir.getAbsolutePath}',
           |   watermark_delay = '1 Second'
           | )
           |""".stripMargin)

      val job = spark.streams.active.find(_.name == testFlintIndex)
      job shouldBe defined
      failAfter(streamingTimeout) {
        job.get.processAllAvailable()
      }

      flint.queryIndex(testFlintIndex)
        .select("eventTime", "transactionId", "productId")
        .orderBy("eventTime", "transactionId").show()
    }

+-------------------+-------------+---------+
|          eventTime|transactionId|productId|
+-------------------+-------------+---------+
|2023-04-01 10:30:00|       txn001|    prod1|
|2023-04-01 14:30:00|       txn001|    prod1|
|2023-04-01 14:30:00|       txn001|    prod1|
|2023-04-02 11:45:00|       txn002|    prod2|
|2023-04-03 12:15:00|       txn003|    prod3|
|2023-04-04 09:50:00|       txn004|    prod1|
|2023-05-01 08:30:00|       txn005|    prod2|
|2023-05-02 07:25:00|       txn006|    prod4|
|2023-05-03 15:40:00|       txn007|    prod3|
|2023-05-03 19:30:00|       txn007|    prod3|
|2023-05-03 19:30:00|       txn007|    prod3|
|2023-05-04 14:15:00|       txn008|    prod1|
+-------------------+-------------+---------+