opensearch-project / opensearch-spark

Spark Accelerator framework ; It enables secondary indices to remote data stores.
Apache License 2.0
12 stars 18 forks source link

[EPIC] Zero-ETL - Apache Iceberg Table Support #372

Open dai-chen opened 2 weeks ago

dai-chen commented 2 weeks ago

Is your feature request related to a problem?

Apache Iceberg is designed for managing large analytic tables in a scalable and performant way, using features like schema evolution, partitioning, and metadata management to optimize query performance. Despite these robust optimizations, the inherent latency of querying large datasets directly from S3 can be a pain point, especially for real-time analytics and interactive querying scenarios, when running complex or frequently accessed queries on large Iceberg tables.

TODO: current problem statement is more technical. need more feedback from real Iceberg customer.

What solution would you like?

Integrate current Flint’s query acceleration features with Iceberg to enhance performance:

TODO: evaluate missing features in https://github.com/opensearch-project/opensearch-spark/issues/367

What alternatives have you considered?

N/A

Do you have any additional context?

Known issues related:

dai-chen commented 2 weeks ago

Proof of Concept 1: Basic Integration of Flint Core Functionality with Iceberg Tables

Objectives

This PoC aims to evaluate the integration of Flint's core functionalities, including skipping index, covering index and materialized views, with Apache Iceberg tables.

The key questions to answer:

  1. Is a Flint skipping index helpful for Iceberg tables in certain case?
  2. How can Flint track changes in Iceberg tables on the fly?

Demo Query

This query demonstrates finding the top IP address pairs (source -> target) that have had their connections rejected in the past hour:

-- Identify the top IP address pairs with rejected connections in the last hour
SELECT
  src_endpoint.ip || '->' || dst_endpoint.ip AS ip_pair,
  action,
  COUNT(*) AS count
FROM vpc_flow_logs
WHERE action = 'REJECT'
  AND time_dt > (current_timestamp - interval '1' hour)
GROUP BY 1, 2
ORDER BY count DESC
LIMIT 25;

Indexing Support [2d]

Index Maintenance [2d]

  1. How does Flint track versions in Iceberg tables?
  2. How does Flint handle data expiration or compaction in Iceberg tables?
  3. How does Flint handle schema evolution in Iceberg tables?

Performance Benchmark [1d]

  1. Direct queries on Iceberg tables vs. queries accelerated by Flint's skipping index
  2. Direct queries on Iceberg tables vs. queries accelerated by Flint's covering index
dai-chen commented 2 weeks ago

Proof of Concept 2: Advanced Integration of Flint with Iceberg Tables

Enabling Advanced Search Capability

This part of the PoC explores the implementation of advanced search capabilities in Flint, integrated with Iceberg tables. Take full-text search capability for example, below is a demonstration query:

-- Identify the number of HTTP status occurrences with requests containing 'Chrome' in the past hour
SELECT
  status,
  COUNT(*) AS count
FROM http_logs
WHERE MATCH(request, 'Chrome')
  AND timestamp > (current_timestamp - interval '1' hour)
GROUP BY status;

Changes required:

  1. User-Defined Function (UDF) for Full-Text Search:
    • Implement a UDF that supports full-text search queries directly on Iceberg tables.
  2. Query Rewrite with Covering Index:
    • Query rewriting mechanisms in Flint to utilize the covering index for enhanced performance in text search queries.
  3. Search Pushdown to Covering Index Scan:
    • Enable search query conditions to be pushed down directly to OpenSearch DSL query on index.

Is skipping index helpful in this case?

Depending on latency requirement, it is possible to build skipping index such as:

  1. Value set of text
  2. Bloom filter of each token

Alternative to current enhanced covering index solution?

Covering indexes provide full search and dashboard capabilities, but it require indexing of all involved columns. An alternative is a non-covering (filtering) index, which indexes only the columns used in filters, with each index entry pointing back to the row ID in the source table.

To implement this, we need to answer the following questions:

  1. Is it feasible for users to create a non-covering index for the entire dataset?
  2. How can SQL syntax be differentiated to specify the use of non-covering versus covering indexes?
  3. If universal indexing isn’t viable, how can we enable on-demand index creation specific to individual queries?

Accelerating Iceberg Metadata Queries

The following SQL examples illustrate how Flint can be leveraged to accelerate queries against Iceberg's metadata, which is essential for schema management and data governance:

-- Example query to fetch historical metadata from an Iceberg table
TODO
dai-chen commented 1 week ago

Proof of Concept Conclusion [WIP]

Conclusion

dai-chen commented 1 week ago

High-Level Design and Task Breakdown

  1. User Experience
  2. Architecture
  3. Data Exploration
  4. Zero-ETL Support
  5. SparkSQL Query Acceleration
  6. Index Maintenance
  7. Testing

User Experience

Here we outline the end-to-end user experience, demonstrating the steps involved from initial data exploration through advanced query optimization and table management.

# Step 1: Data exploration
SELECT
  src_endpoint,
  dst_endpoint,
  action
FROM glue.iceberg.vpc_flow_logs -- limit size or sampling
LIMIT 10;

# Step 2: Zero-ETL by Flint index
CREATE INDEX src_dst_action ON glue.iceberg.vpc_flow_logs (
  src_endpoint,
  dst_endpoint,
  action
)
WHERE timestamp > (current_timestamp - interval '1' hour) -- partial indexing
WITH (
  auto_refresh = true
);

# Step 3a: Dashboard / DSL query Flint index directly
POST flint_glue_iceberg_vpc_flow_logs_src_dst_action_index
{
  ...
}

# Step 3b: SparkSQL query acceleration
# Identify the top IP address pairs with rejected connections in the last hour
SELECT
  src_endpoint.ip || '->' || dst_endpoint.ip AS ip_pair,
  action,
  COUNT(*) AS count
FROM glue.iceberg.vpc_flow_logs
WHERE action = 'REJECT'
  AND time_dt > (current_timestamp - interval '1' hour)
GROUP BY 1, 2
ORDER BY count DESC
LIMIT 25;

# Step 4: Iceberg table management
# Data compaction on a regular basis triggered manually or by Glue
CALL local.system.rewrite_data_files(
  table => 'glue.iceberg.vpc_flow_logs',
  options => map('rewrite-all', true)
);

# Step 5: Clean up
# User deletes unused covering index after analytics
DELETE INDEX src_dst_action ON glue.iceberg.vpc_flow_logs;
VACUUM INDEX src_dst_action ON glue.iceberg.vpc_flow_logs;

Architecture

Here is the architecture diagram that provides a comprehensive overview of the high-level design and key components:

Screenshot 2024-06-18 at 9 21 43 AM


Task Breakdown

Here presents the high-level task breakdown, providing a description of each task and its respective components. Please find more detailed task descriptions in the following sections:

Feature Component Priority Task Github Issue Comment
Data Exploration Catalog High Add Iceberg catalog config in Spark job params todo
Data Types High Support all Iceberg data types in direct query todo
Zero-ETL Covering Index Med Map source column to OpenSearch field type #384 OpenSearch table design related
Med Fix single OS index capacity issue #339
High Improve Flint data source reader performance #334
Materialized View Low Support event time ordering when cold start #90
SparkSQL Query Acceleration Skipping Index Med Disable skipping index create on Iceberg table todo
Covering Index High Query rewrite with partial covering index #298
Med Add more filtering condition pushdown #148 OpenSearch table design related
Materialized View Low Query rewrite with materialized view todo
Index Advisor Low Disable skipping index advisor on Iceberg table todo
Index Maintenance Index Data Freshness Low Index refresh idempotency #88
Med Include refresh status in show Flint index statement #385
Low Support hybrid scan for covering index #386
Index Management Low Support schema change in alter index statement #387

Data Exploration

Users can execute common DDL statement and direct SQL queries on Iceberg tables for ad-hoc data analytics. Flint must support the Iceberg catalog and fully accommodate Iceberg data types, ensuring seamless integration and comprehensive data analysis capabilities.

Iceberg Catalog

Configure Spark job parameters to activate the Iceberg catalog, ensuring compatibility with FlintDelegatingSessionCatalog. Ref: https://docs.aws.amazon.com/emr/latest/EMR-Serverless-UserGuide/using-iceberg.html

Iceberg Data Types

Flint must fully support all Iceberg data types, including complex structures like Struct, List, and Map, to ensure comprehensive data handling capabilities. Ref: https://iceberg.apache.org/docs/latest/spark-getting-started/#type-compatibility

Spark Iceberg Notes
boolean boolean
short integer
byte integer
integer integer
long long
float float
double double
date date
timestamp timestamp with timezone
timestamp_ntz timestamp without timezone
char string
varchar string
string string
binary binary
decimal decimal
struct struct
array list
map map

Zero-ETL Support

Users can load raw or aggregated data directly into OpenSearch via covering indexes and materialized views, enabling full-text search and dashboard capabilities without the need for an Extract, Transform, Load (ETL) process.

Covering Index Enhancement

Addressing limitations and improving the performance of covering indexes in issues below:

Materialized View Enhancement

Addressing limitations of materialized views:


SparkSQL Query Acceleration

Users continue to use the familiar SparkSQL interface and leverage OpenSearch's indexing capabilities to accelerate SparkSQL queries.

Data Skipping

  1. Disable Flint skipping index creation: Since Flint skipping indexes are not supported on Iceberg tables, utilize Iceberg's metadata indexing instead.
  2. Disable Skipping Index Advisor: Prevent users from using the ANALYZE index statement for skipping index recommendations.

Query Optimization

covering-index-acceleration

Support query rewriting for covering index (full or partial) and materialized view:


Index Maintenance

Index Data Freshness

Provides tool for user to inspect index data freshness and ensure up-to-date query results:

Index Management


Testing

Functional Testing

Functional testing ensures Iceberg support works with all existing components and features, and newly added features perform correctly and meet the specified requirements.

Category Priority Use Case Test Parameters
Table Management High Create Iceberg table
Med Create Spark data source table
Direct Query High Query Iceberg table
  • namespace: catalog, database
Med Query Spark data source table
  • namespace: catalog, database
Skipping Index High Build skipping index from Iceberg table
  • Skip type: Partition, MinMax, ValueSet, BloomFilter
  • Refresh mode: auto, incremental, manual
  • Refresh interval
High Accelerate Iceberg table query with skipping index
  • Hybrid scan: true, false
Covering Index High Build covering index from Iceberg table
  • Refresh mode: auto, incremental, manual
  • Refresh interval
High Accelerate Iceberg table query with covering index
Materialized View High Build materialized view from Iceberg table
  • Refresh mode: auto, incremental, manual
  • Refresh interval
High Accelerate Iceberg table query with materialized view
  • Refresh mode: auto, incremental, manual
  • Refresh interval
Index Management Med Show Flint indexes on Iceberg table
Med Describe Flint index on Iceberg table
Med Alter Flint index on Iceberg table
  • Alter refresh mode from auto to manual
  • Alter refresh mode from manual to auto
Med Drop and vacuum Flint index on Iceberg table
Med Recover index job for Flint index on Iceberg table
Index Advisor Low Recommend skipping index on Iceberg table

Performance Benchmark

Benchmarking performance for data exploration queries, zero-ETL ingestion, and SparkSQL query acceleration:

Issues related: