opensearch-project / opensearch-spark

Spark Accelerator framework ; It enables secondary indices to remote data stores.
Apache License 2.0
14 stars 23 forks source link

[FEATURE] Enhance show Flint index statement to include refresh status #385

Open dai-chen opened 3 months ago

dai-chen commented 3 months ago

Is your feature request related to a problem?

Currently there is no way to check if the Flint index refresh has caught up with the source data. Users need to know whether the Flint index is up-to-date.

What solution would you like?

Due to the lack of direct correspondence between each micro batch and the source:

  1. For raw datasets, there is no version control;
  2. For table formats like Iceberg, there is also no 1-to-1 mapping from snapshot to micro batch

The proposed solution is to find such correspondence using committed offsets in the streaming job checkpoint:

  1. For raw dataset, found how many files refreshed
  2. For table format, found snapshot ID that already processed

This can return to users through a column in SHOW FLINT INDEX statement output.

What alternatives have you considered?

N/A

Do you have any additional context?

Quick test with Iceberg table:

  /**
   * Logging after V4 Snapshot ID:
   * {"version":1,"snapshot_id":5955838147460350333,"position":1,"scan_all_files":false}
   *
   * Logging after V5 Snapshot ID:
   * {"version":1,"snapshot_id":7393048775934035924,"position":1,"scan_all_files":false}
   *
   * Verify if correct in Iceberg history metadata table
   * |      made_current_at |         snapshot_id |           parent_id | is_current_ancestor |
   * |---------------------:|--------------------:|--------------------:|:--------------------|
   * | 2024-06-17 15:30:... | 1552180198061740807 |                null | true                |
   * | 2024-06-17 15:30:... | 4538411166290920590 | 1552180198061740807 | true                |
   * | 2024-06-17 15:30:... | 8681526725408438495 | 4538411166290920590 | true                |
   * | 2024-06-17 15:30:... | 5955838147460350333 | 8681526725408438495 | true                |
   * | 2024-06-17 15:30:... | 7393048775934035924 | 5955838147460350333 | true                |
   */
  test("version tracking") {
    val job = spark.streams.active.find(_.name == testFlintIndex)
    val query = job.get match {
      case wrapper: StreamingQueryWrapper => wrapper.streamingQuery
      case other: StreamExecution => other
    }

    def snapshotId: String = query.committedOffsets.headOption.get._2.json()

    // v4
    sql(
      s"INSERT INTO $testIcebergTable VALUES (TIMESTAMP '2023-10-04 00:01:00', '200', 'Accept')")
    awaitStreamingComplete(job.get.id.toString)
    logInfo(s"Snapshot ID: $snapshotId")

    // v5
    sql(
      s"INSERT INTO $testIcebergTable VALUES (TIMESTAMP '2023-10-05 00:01:00', '200', 'Accept')")
    awaitStreamingComplete(job.get.id.toString)
    logInfo(s"Snapshot ID: $snapshotId")

    sql(s"SELECT * FROM $testIcebergTable.history").show