opensearch-project / opensearch-spark

Spark Accelerator framework ; It enables secondary indices to remote data stores.
Apache License 2.0
17 stars 26 forks source link

[FEATURE] Efficient storage of high cardinality data in materialized view #765

Open dai-chen opened 4 days ago

dai-chen commented 4 days ago

Is your feature request related to a problem?

When dealing with high cardinality data, especially with columns like source and destination IPs in VPC flow logs, materialized views (MVs) can become excessively large and inefficient, leading to significant performance and storage challenges. This results in both slow query performance and unnecessary storage overhead. I’m looking for solutions to reduce the storage footprint and optimize query performance for these high-cardinality fields, particularly for dashboard visualizations.

What solution would you like?

To efficiently handle high cardinality data in materialized views, the following approach can be implemented:

What alternatives have you considered?

Here are several alternative methods for optimizing storage and performance in materialized views:

Do you have any additional context?

(I) VPC Flow Logs Example

Take the VPC flow logs dataset as an example: high cardinality fields like source and destination IP pairs create significant storage challenges when using materialized views. Consider the following materialized view at the terabyte (TB) scale. After grouping, each 1-minute window can result in hundreds of millions of rows.

CREATE MATERIALIZED VIEW vpc_flow_log_mv
AS
SELECT
  window.start AS startTime,
  activity_name,
  src_endpoint.ip AS src_ip,
  dst_endpoint.ip AS dst_ip,
  COUNT(*) AS total_count,
  SUM(traffic.bytes) AS total_bytes,
  SUM(traffic.packets) AS total_packets
FROM vpc_flow_logs
GROUP BY
  TUMBLE(eventTime, '1 Day'),
  activity_name,
  src_endpoint.ip,
  dst_endpoint.ip

The proposed approaches aim to address this issue by significantly reducing the amount of data stored. For example, by storing only the Top 100 per group, either by filtering cells in a cube or using an approximate Top K function, the storage remains bounded, regardless of the original size after grouping.

(II) Optimizations in OpenSearch Index

While the current focus is on optimizing the materialized view (MV) output size, there are additional optimizations that can be applied within the OpenSearch index to further reduce storage size:

The last item can be configured in index_settings option while the first two will be configurable once support for https://github.com/opensearch-project/opensearch-spark/issues/772 is implemented.

dai-chen commented 3 days ago

Proof of Concept: Approximate Aggregation Approach

Goals

Verify the feasibility of the Approximate Aggregation MV approach and evaluate its impact on storage, performance and cost, specifically including:

Design

Syntax:

CREATE MATERIALIZED VIEW vpc_flow_log_mv
AS
SELECT
  window.start AS startTime,
  activity,
  APPROX_TOP_COUNT(src_endpoint.ip, 100) AS top_k_src_ip_by_count,
  APPROX_TOP_COUNT(dst_endpoint.ip, 100) AS top_k_dst_ip_by_count,
  APPROX_TOP_SUM(src_endpoint.ip, 100) AS top_k_src_ip_by_sum,
  APPROX_TOP_SUM(dst_endpoint.ip, 100) AS top_k_dst_ip_by_sum,
  APPROX_TOP_COUNT(ARRAY(src_endpoint.ip, dst_endpoint.ip), 100) AS top_k_src_dst_ip_by_count,
  COUNT(*) AS total_count,
  SUM(traffic.bytes) AS total_bytes,
  SUM(traffic.packets) AS total_packets
FROM vpc_flow_logs
GROUP BY
  TUMBLE(eventTime, '1 Day'),
  activity_name

Materialized view data:

    "_source": {
          "startTime": "2024-10-01 12:00:00",
          "activity": "Traffic",
          "top_k_src_ip_by_count": [
            {
              "ip": "192.168.0.100",
              "count": 23205
            },
           "top_k_dst_ip_by_count": [
            {
              "ip": "127.0.01",
              "count": 238
            }
            ...
          ]
        },

OpenSearch DSL query:

POST /vpc_flow_log_approx_mv/_search
{
  "size": 0,
  "aggs": {
    "top_ips": {
      "nested": {
        "path": "top_k_src_ip_by_count"
      },
      "aggs": {
        "ip_buckets": {
          "terms": {
            "field": "top_k_src_ip_by_count.ip",
            "size": 100,
            "order": {
              "total_count": "desc"
            }
          },
          "aggs": {
            "total_count": {
              "sum": {
                "field": "top_k_src_ip_by_count.count"
              }
            }
          }
        }
      }
    }
  }
}

Implementation Tasks

  1. Implement approx_top_count function: Create a function to compute approximate top K counts for high cardinality fields.
  2. Implement approx_top_sum function: Develop a similar function for approximate top K sum calculations.
  3. Support nested fields in MV output: Ensure the materialized view (MV) can output nested fields to store approximate aggregation results.
  4. Create a dashboard on MV data: Build a dashboard for visualizing the results from the MV, using approximate aggregation for top K values.

Testing Tasks

  1. Storage comparison: Compare the actual storage usage in the OpenSearch index for aggregate MV and approximate aggregate MV.
  2. Performance benchmark: Measure and compare the performance of building and querying aggregate MV and approximate aggregate MV.
  3. EMR-S cost evaluation: Evaluate the overall cost on EMR-S for building aggregate MV versus approximate aggregate MV.