opensearch-project / opensearch-spark

Spark Accelerator framework ; It enables secondary indices to remote data stores.
Apache License 2.0
12 stars 18 forks source link

[FEATURE] Support hybrid scan for Flint covering index #386

Open dai-chen opened 1 week ago

dai-chen commented 1 week ago

Is your feature request related to a problem?

The Limitation section in this comment discusses how stale index data may lead to incorrect query results. While Flint's skipping index addresses this issue with a hybrid scan mode, this approach is not feasible for covering index with raw datasets due to the lack of versioning.

What solution would you like?

For data lake tables such as Delta, Iceberg, and Hudi, it is possible to implement hybrid scans using the version information in table metadata. For example, consider the following query with a primary key (timestamp column) in an Iceberg table http_logs:

SELECT timestamp, request, status
FROM glue.iceberg.http_logs;

This query can be rewritten to a hybrid scan leveraging Iceberg's time travel capabilities:

  1. During query rewriting, retrieve the Iceberg snapshot ID X from the committed log in the checkpoint. Ref: https://github.com/opensearch-project/opensearch-spark/issues/385
  2. Rewrite the query to a hybrid scan as shown below:
spark.read
  .format("flint")
  .load("flint_glue_iceberg_http_logs_index")
UNION
spark.read
  .format("iceberg")
  .option("start-snapshot-id", "X")
  .load("glue.iceberg.http_logs")

What alternatives have you considered?

Alternatively, users can disable the covering index optimization by setting spark.flint.optimizer.covering.enabled to false to retrieve the latest data. This ensures that queries always access the most current data, but it may impact query performance due to the lack of indexing benefits.

Do you have any additional context?

N/A