opensearch-project / opensearch-spark

Spark Accelerator framework ; It enables secondary indices to remote data stores.
Apache License 2.0
14 stars 23 forks source link

[FEATURE] Enhance covering index to generic secondary index for efficient filtering and optimized cost #390

Open dai-chen opened 3 months ago

dai-chen commented 3 months ago

Is your feature request related to a problem?

Currently, Flint only supports a covering index for SparkSQL queries, which requires all columns present in the query to be indexed in the covering index. Moreover, the covering index rows do not have a direct reference to the source table rows. This approach works well for users who need the full search and dashboard capabilities of OpenSearch.

However, for users who primarily want to accelerate their queries, covering indexes can be inefficient. They may ingest many columns, leading to slower performance and excessive space consumption. This is particularly problematic for large datasets or for users who need to index all columns due to uncertainty about future queries.

What solution would you like?

Enhance the covering index to function as a generic secondary index that maintains a unique row ID for rows in the source table. The new behavior of the index should be as follows:

Key Benefits and Impacts

The proposed enhancement to the covering index functionality in Flint offers a range of key benefits and positive impacts across various aspects:

  1. [UX] Users no longer need to predict all possible queries upfront to create covering indexes. They can focus on indexing columns relevant to common filtering conditions.
  2. [Performance] With fewer columns to be indexed, the index refresh process can speed up significantly, reducing the overhead and potential lag.
  3. [Cost] The storage overhead for indexes can be significantly reduced, especially for large datasets with many columns.

What alternatives have you considered?

N/A

Do you have any additional context?

Here is an example of how this enhancement could be used:

# Create index on source table
CREATE INDEX full_text_request ON http_logs (request);

# OS index data
{
  "id": "xxx" -- unique row id, assuming timestamp is the primary key of source table
  "request": "text"
}

# Accelerate query using index even if not covered
SELECT
  clientip,
  status,
  AVG(size)
FROM http_logs
WHERE MATCH(request, 'Chrome')
GROUP BY client_ip, status;

=>

SELECT
  src.clientip,
  src.status,
  AVG(src.size)
FROM http_logs src
JOIN flint_full_text_request_index idx
  ON src.timestamp = idx.id  -- only scan rows matched
WHERE MATCH(idx.request, 'Chrome')
GROUP BY src.client_ip, src.status;
dai-chen commented 2 months ago

Proof of Concept [TBD]

Demo

Create Spark table for CloudTrail logs. Ref: https://github.com/opensearch-project/dashboards-observability/blob/main/server/adaptors/integrations/__data__/repository/aws_cloudtrail/assets/create_table_cloud-trail-1.0.0.sql

CREATE TABLE cloudtrail_logs (
  eventversion STRING,
  useridentity STRUCT<
    type:STRING,
    principalid:STRING,
    arn:STRING,
    ...
  requestparameters STRING,
  ...
)
USING json
PARTITIONED BY (region, year, month, day)
LOCATION 's3://DOC-EXAMPLE-BUCKET/AWSLogs/Account_ID/CloudTrail/';

Create Flint secondary index:

CREATE INDEX request_params ON cloudtrail_logs (
  CAST(requestparameters AS STRUCT) -- index entire JSON instead of single TEXT field
)
WHERE eventtime BETWEEN
  CURRENT_TIMESTAMP - INTERVAL '30' DAY AND CURRENT_TIMESTAMP; -- partial indexing 30 days' logs

# Use OpenSearch table behind the scene to solve capacity and read performance issue [TBD]
CREATE TABLE flint_cloudtrail_logs_request_params (
  requestparameters TYPE -- a) type is JSON? b) disable source/docValue, only need inverted index
)
USING opensearch
PARTITIONED BY (region, year, month, day)
LOCATION 'http://...';

Query CloudTrail logs:

SELECT
  eventname,
  sourceipaddress,
  useridentity.arn
FROM cloudtrail_logs
WHERE
  eventtime BETWEEN CURRENT_TIMESTAMP - INTERVAL '7' DAY
    AND CURRENT_TIMESTAMP AND -- partition pruning
  eventname = 'CreatePolicy' AND
  JSON_EXTRACT(requestparameters, '$.policyName') = 'example-policy'; -- Flint secondary index kicks in

Workflow

Using Flint Covering Index

Screenshot 2024-07-05 at 3 01 32 PM

Using Flint Secondary Index in PoC

Screenshot 2024-07-05 at 3 01 47 PM