opensearch-project / opensearch-spark

Spark Accelerator framework ; It enables secondary indices to remote data stores.
Apache License 2.0
14 stars 21 forks source link

[RFC] Flint serverless skipping index #118

Open dai-chen opened 10 months ago

dai-chen commented 10 months ago

Is your feature request related to a problem?

When using Flint skipping index, user first needs to create a Spark table and then has to decide what skipping data structure for a column. Afterwards, the freshness of skipping index is maintained by a long running Spark streaming job.

As a user, the pain point includes:

  1. Create table: difficulty to figure out what table to create due to semi-structured and deep nested characteristic in log data
  2. Choose skipping data structure: expertise required for user to figure out which skipping algorithm fits the user data best
  3. Spark streaming job maintenance: extra maintenance efforts and cost to keep streaming job running even if source log data doesn't change often

What solution would you like?

Propose idea below and need PoC for each item:

  1. Automatic Skipping Algorithm Selection a. Develop a component that analyzes column characteristics like data type, size, and cardinality, and automatically selects the most suitable skipping algorithm b. Implement a user-friendly option to enable or disable this feature so user only decides to enable or not (like Snowflake)
  2. Serverless Skipping Index Building a. Rewrite the query plan to wrap a scan operator that collects skipping index data on demand b. Apply similar mechanism to the current hybrid scan mode, where new files trigger skipping data collection as necessary
  3. Serverless Skipping Index Storage a. Skipping data are essentially aggregated data structure and not necessarily rely on Lucene b. Tiering the skipping index storage (like Apache Iceberg) and maintain hot tier-1 data in OpenSearch index c. Or write Flint index format during ingestion directly
dai-chen commented 9 months ago

Proposal: Flint Serverless Skipping Index Building

Design Options

  1. Offline build by Spark streaming job (current) a. Pros: user doesn’t needs to change ingestion b. Cons: long running job maintenance and cost
  2. Ingestion time build a. Pros: index data always sync with source b. Cons: user needs to change ingestion pipeline; index all fields in log data
  3. Query time on-demand build a. Pros: serverless and amortize computation cost to each query b. Cons: need warm up and no guarantee for query latency if no obvious query pattern

Next we will dive into the Option 3 Query time on-demand build.

Example

Following the item proposed above, here is an example that illustrates the idea:

T-1: Query timestamp after 2023-05-01

  1. [FlintOptimizer] fetch source file list and query skipping index
  2. [SkippingIndex] return empty result
  3. [FlintOptimizer] nothing to skip
  4. [FlintOptimizer] append new metadata log entry with column and file list to index
  5. [FlintOptimizer] determine index algorithm and rewrite query plan with Flint collect and store operator
  6. [FlintCollect] collect unique values of the column (@message.params.0.instanceId)
  7. [FlintStore] store index data and append metadata log entry

Screenshot 2023-11-27 at 11 35 43 AM

T-2: Query timestamp after 2023-04-30 with new files 2023-05-05, 2023-05-06

  1. [FlintOptimizer] fetch source file list and query skipping index
  2. [SkippingIndex] return file to scan list [2023-05-02] for range [2023-05-01, 2023-05-04]
  3. [FlintOptimizer] file to scan after skip [2023-04-30, 2023-05-02, 2023-05-05, 2023-05-06]
  4. [FlintOptimizer] append new metadata log entry with column and file list to index
  5. [FlintOptimizer] determine index algorithm and rewrite query plan with Flint collect and store operator
  6. [FlintCollect] collect unique values of the column (@message.params.0.instanceId)
  7. [FlintStore] store index data and append metadata log entry

Screenshot 2023-11-27 at 11 51 58 AM

Implementation Challenges

  1. Flint Query Optimizer a. Automatic algorithm selection: without column stats, choose skipping algorithm in heuristics way b. Concurrency control: simply not use skipping index while it's refreshing or parallel update it (treat different column as physical partition)
  2. Flint Query Operator a. Flint collect: Spark splits source file to chunk as worker task b. Flint store: separate communication channel with Flint collect operator
  3. Flint Skipping Index a. Metadata: store valid range (maybe discrete) for index data b. Index data: optimize/evolve data structure over the time, ex. from value set to bloom filter as more and more unique values indexed
dai-chen commented 9 months ago

Design: Flint Serverless Skipping Index Storage

TODO

dai-chen commented 8 months ago

Design: Automatic Skipping Algorithm Selection

TODO

Reference

Delta table column stats

Delta table collects column stats automatically. However, it only collects min-max for numerical, date and string column. Probably because it stores data as Parquet (which uses min-max, dictionary encoding and bloom filter already), Delta only aggregate file-level min-max to Delta table level.

Code: https://github.com/delta-io/delta/blob/master/spark/src/main/scala/org/apache/spark/sql/delta/stats/StatisticsCollection.scala#L252

      collectStats(MIN, statCollectionPhysicalSchema) {
        // Truncate string min values as necessary
        case (c, SkippingEligibleDataType(StringType), true) =>
          substring(min(c), 0, stringPrefix)

        // Collect all numeric min values
        case (c, SkippingEligibleDataType(_), true) =>
          min(c)
      },

object SkippingEligibleDataType {
  // Call this directly, e.g. `SkippingEligibleDataType(dataType)`
  def apply(dataType: DataType): Boolean = dataType match {
    case _: NumericType | DateType | TimestampType | StringType => true
    case _ => false
  }

Hyperspace Analysis Utility

Hyperspace also provides analysis utility to help users estimate the effectiveness of Z-Ordering before creation.

scala> import com.microsoft.hyperspace.util.MinMaxAnalysisUtil
import com.microsoft.hyperspace.util.MinMaxAnalysisUtil

scala> println(MinMaxAnalysisUtil.analyze(df, Seq("A", "B"), format = "text"))
Min/Max analysis on A

                  < Number of files (%) >
     +--------------------------------------------------+
100% |                                                  |
     |                                                  |
     |                                                  |
     |                                                  |
 75% |                                                  |
     |                                                  |
     |                                                  |
     |                                                  |
     |                                                  |
 50% |                                                  |
     |                                                  |
     |                                                  |
     |                                                  |
     |                                                  |
 25% |                                                  |
     |                                                  |
     |                                                  |
     |                                                  |
     |**************************************************|
  0% |**************************************************|
     +--------------------------------------------------+
     Min <-----             A value            -----> Max

min(A): 0
max(A): 99
Total num of files: 12
Total byte size of files: 11683
Max. num of files for a point lookup: 1 (8.33%)
Estimated average num of files for a point lookup: 1.00 (8.33%)
Max. bytes to read for a point lookup: 982 (8.41%)
...

X-axis: it represents the range group of the column values. Y-axis: the percentage of number of files to look up a value based on the minimum and maximum value of each file. So lower percentage means better distribution as we could skip more files.