opensearch-project / sql

Query your data using familiar SQL or intuitive Piped Processing Language (PPL)
https://opensearch.org/docs/latest/search-plugins/sql/index/
Apache License 2.0
120 stars 139 forks source link

Create index on external table #1379

Closed dai-chen closed 1 year ago

dai-chen commented 1 year ago

Is your feature request related to a problem?

Object store query capability is being introduced in Spark integration and OpenSearch JDBC connector: https://github.com/opensearch-project/sql/issues/1331. To get the answer efficiently, we need some auxiliary data structure such as index to accelerate this kind of federated query.

What solution would you like?

Will work on the following task for prototype, demo and development afterwards:

  1. Add Spark parser extension with CREATE INDEX
  2. Store index data in OpenSearch index
  3. Auto refresh by Spark streaming execution (integrate with table metadata refresh job in https://github.com/opensearch-project/sql/issues/1363)

What alternatives have you considered?

Alternative solution and reference for this work:

  1. Apache Carbondata: https://carbondata.apache.org/index.html
  2. Microsoft Hyperspace: https://microsoft.github.io/hyperspace/

Do you have any additional context?

Please find more details in:

  1. https://github.com/opensearch-project/opensearch-spark/issues/128
  2. https://github.com/opensearch-project/opensearch-spark/issues/4
dai-chen commented 1 year ago

PoC branch: https://github.com/dai-chen/hyperspace/tree/add-create-index

dai-chen commented 1 year ago

Proof of Concepts

Setup

Test

In the test, we first create a skipping index t001_name_skipping_index based on current records (parquet file 1 and 2) in Maximus external table t001. Then we add new file (parquet file 3) to remote data location to verify index data can auto refresh with table metadata.

spark-integration (5) (3) (1)-Page-10

$ bin/spark-sql --packages io.delta:delta-core_2.12:1.0.1,com.microsoft.hyperspace:hyperspace-core-spark3.1_2.12:0.5.0-SNAPSHOT --conf "spark.sql.extensions=com.microsoft.hyperspace.HyperspaceSparkSessionExtension,io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" --conf spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.DefaultAWSCredentialsProviderChain --conf "spark.hyperspace.index.sources.fileBasedBuilders=com.microsoft.hyperspace.index.sources.delta.DeltaLakeFileBasedSourceBuilder,com.microsoft.hyperspace.index.sources.default.DefaultFileBasedSourceBuilder"

opensearch-project/sql#1.Create a table only for adding external records
CREATE TABLE IF NOT EXISTS temp (name STRING, age INT) 
USING PARQUET
LOCATION "s3a://maximus-table/";

INSERT INTO default.temp VALUES ('a', 1), ('b', 2);

$ aws s3 ls maximus-table/
  0 _SUCCESS
598 part-00000-a96257e2-864c-4b0a-9a9b-8874f0ebd9bb-c000.snappy.parquet
598 part-00001-a96257e2-864c-4b0a-9a9b-8874f0ebd9bb-c000.snappy.parquet

opensearch-project/sql#2.Create Maximus (Delta) table with auto refresh enabled
CREATE EXTERNAL TABLE IF NOT EXISTS default.t001
(name STRING, age INT)
USING DELTA
LOCATION "s3a://maximus-table"
TBLPROPERTIES ('auto_refresh'='true');

#Output
#CreateDeltaTableCommand: === Refresh with files ===
#CreateDeltaTableCommand: New file: s3a://maximus-table/part-00000-a96257e2-864c-4b0a-9a9b-8874f0ebd9bb-c000.snappy.parquet
#CreateDeltaTableCommand: New file: s3a://maximus-table/part-00001-a96257e2-864c-4b0a-9a9b-8874f0ebd9bb-c000.snappy.parquet
#CreateDeltaTableCommand: === Refreshing index ===

opensearch-project/sql#3.Create skipping index on name column
CREATE INDEX t001_name_skipping_index ON default.t001 (name) AS 'bloomfilter';

$ curl "localhost:9200/t001_name_skipping_index/_search?pretty"
{
    ...
    "hits" : [
      {
        "_index" : "t001_name_skipping_index",
        "_id" : "u6hHvoYBuDNJL8xoDiMO",
        "_score" : 1.0,
        "_source" : {
          "_data_file_id" : 0,
          "BloomFilter_name__10__0" : {
            "bitCount" : 7,
            "data" : "1161928704398458880,36310272130023424",
            "numHashFunctions" : 7
          }
        }
      },
      {
        "_index" : "t001_name_skipping_index",
        "_id" : "vKhHvoYBuDNJL8xoDiMx",
        "_score" : 1.0,
        "_source" : {
          "_data_file_id" : 1,
          "BloomFilter_name__10__0" : {
            "bitCount" : 7,
            "data" : "144116296177483776,1152930300699870208",
            "numHashFunctions" : 7
          }
        }
      }
    ]
  }
}

opensearch-project/sql#4.Add new records which trigger table metadata and index data auto refresh
INSERT INTO default.temp VALUES ('g', 7);

$ aws s3 ls maximus-table/
  0 _SUCCESS
598 part-00000-66a5ad0a-85c3-4641-85e4-85f67fe7b082-c000.snappy.parquet
598 part-00000-a96257e2-864c-4b0a-9a9b-8874f0ebd9bb-c000.snappy.parquet
598 part-00001-a96257e2-864c-4b0a-9a9b-8874f0ebd9bb-c000.snappy.parquet

#Output
#CreateDeltaTableCommand: === Refresh with files ===
#CreateDeltaTableCommand: New file: s3a://maximus-table/part-00000-66a5ad0a-85c3-4641-85e4-85f67fe7b082-c000.snappy.parquet
#CreateDeltaTableCommand: === Refreshing index ===
#CreateDeltaTableCommand: Index: t001_name_skipping_index

$ curl "localhost:9200/t001_name_skipping_index/_search?pretty"
{
    ...
    "hits" : [
      {
        "_index" : "t001_name_skipping_index",
        "_id" : "u6hHvoYBuDNJL8xoDiMO",
        "_score" : 1.0,
        "_source" : {
          "_data_file_id" : 0,
          "BloomFilter_name__10__0" : {
            "bitCount" : 7,
            "data" : "1161928704398458880,36310272130023424",
            "numHashFunctions" : 7
          }
        }
      },
      {
        "_index" : "t001_name_skipping_index",
        "_id" : "vKhHvoYBuDNJL8xoDiMx",
        "_score" : 1.0,
        "_source" : {
          "_data_file_id" : 1,
          "BloomFilter_name__10__0" : {
            "bitCount" : 7,
            "data" : "144116296177483776,1152930300699870208",
            "numHashFunctions" : 7
          }
        }
      },
      {
        "_index" : "t001_name_skipping_index",
        "_id" : "vahJvoYBuDNJL8xo-yNi",
        "_score" : 1.0,
        "_source" : {
          "_data_file_id" : 2,
          "BloomFilter_name__10__0" : {
            "bitCount" : 7,
            "data" : "-9223372036854708736,202375168",
            "numHashFunctions" : 7
          }
        }
      }
    ]
  }
}

5.Use index to skip data
EXPLAIN SELECT * FROM t001 WHERE name = 'g';
== Physical Plan ==
*(1) Filter (isnotnull(name#3590) AND (name#3590 = g))
+- *(1) ColumnarToRow
   +- FileScan Hyperspace(Type: DS, Name: t001_name_skipping_index, LogVersion: 3) default.t001[name#3590,age#3591] Batched: true, DataFilters: [isnotnull(name#3590), (name#3590 = g)], Format: Parquet, Location: DataSkippingFileIndex[s3a://maximus-table], PartitionFilters: [], PushedFilters: [IsNotNull(name), EqualTo(name,g)], ReadSchema: struct<name:string,age:int>
dai-chen commented 1 year ago

Here is more test for covering index:

#Precreate OpenSearch index to avoid field type infer problem
$ curl -XPUT localhost:9200/t001_name_age_index -H "Content-Type: application/json" -d'
{
  "mappings" : {
    "properties" : {
      "age" : {
        "type" : "integer"
      },
      "name" : {
        "type" : "text",
        "fields" : {
          "keyword" : {
            "type" : "keyword",
            "ignore_above" : 256
          }
        }
      }
    }
  }
}'

#Create covering index
CREATE INDEX t001_name_age_index ON default.t001 (name, age) AS 'lucene';

SHOW INDEXES ON t001;
t001_name_age_index ["name","age"]  t001_name_age_index/v__=0   ACTIVE  {"includedColumns":"","numBuckets":"200","schema":"{"type":"struct","fields":[{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"age","type":"integer","nullable":true,"metadata":{}}]}"}
t001_name_skipping_index    ["name"]    t001_name_skipping_index/v__=0  ACTIVE  {"sketches":"BloomFilter(name, 10)"}

$ curl "localhost:9200/t001_name_age_index/_search?pretty"
{
    ...
    "hits" : [
      {
        "_index" : "t001_name_age_index",
        "_id" : "8mBxvoYBxyMZlj4aE4pm",
        "_score" : 1.0,
        "_source" : {
          "name" : "b",
          "age" : 2
        }
      },
      {
        "_index" : "t001_name_age_index",
        "_id" : "8WBxvoYBxyMZlj4aE4pI",
        "_score" : 1.0,
        "_source" : {
          "name" : "g",
          "age" : 7
        }
      },
      {
        "_index" : "t001_name_age_index",
        "_id" : "82BxvoYBxyMZlj4aE4pr",
        "_score" : 1.0,
        "_source" : {
          "name" : "a",
          "age" : 1
        }
      }
    ]
  }
}

EXPLAIN SELECT * FROM t001 WHERE name = 'g';
== Physical Plan ==
*(1) Filter (isnotnull(name#1204) AND (name#1204 = g))
+- *(1) Scan OpenSearchRelation(Map(opensearch.resource -> t001_name_age_index),org.apache.spark.sql.SQLContext@40574edf,None) default.t001[name#1204,age#1205] PushedFilters: [IsNotNull(name), EqualTo(name,g)], ReadSchema: struct<name:string,age:int>