opensearch-project / opensearch-spark

Spark Accelerator framework ; It enables secondary indices to remote data stores.
Apache License 2.0
22 stars 33 forks source link

[RFC]Cross index (cross cluster) OpenSearch Join using OpenSearch hadoop plugin #753

Open YANG-DB opened 1 month ago

YANG-DB commented 1 month ago

Is your feature request related to a problem? Currently, OpenSearch does not provide a way to perform cross-index or cross-cluster joins using the OpenSearch DSL. This RFC proposes to extend OpenSearch's capabilities by leveraging Spark's MPP engine through the OpenSearch Flint API and Hadoop OpenSearch Library. This will enable users to execute cross-index joins natively via Spark, while abstracting away the underlying complexity.

What solution would you like?

The Problem Statement

The OpenSearch Query engine lacks native support for cross-index (or cross-cluster) joins. This limitation hinders scenarios where users need to merge data residing in different indexes (or clusters) without manually combining the results at the application level or (in the SQL plugin case) in the OpenSearch coordinating node running the two sides of the join.

In large-scale data environments, this becomes a bottleneck for performing analytics or relational-style queries across distributed datasets.

Proposed Solution

We propose using Apache Spark's engine via the OpenSearch Flint API (using PPL Join commands) and Hadoop OpenSearch integration, to allow users to perform cross-index joins. The solution will:


Architecture

[OpenSearch Client (SQL Async API)] 
        ↓
[PPL Engine] → [Join Operation]
        ↓
[Catalyst AST Translator]
        ↓
[Spark Execution Engine]
        ↓
[Hadoop OpenSearch Connector]
        ↓
[Data Retrieval from Multiple Indexes/Clusters]

Point to Consider

Hive - Table

Today Spark / Flint can use to query Hive tables using OpenSearch hadoop .

Spark - SQL

Using spark SQL to read indices directly from Spark as shown here:

SQLContext sql = new SQLContext(sc);
DataFrame df = sql.read().format("opensearch").load("buckethead/albums");
DataFrame playlist = df.filter(df.col("category").equalTo("pikes").and(df.col("year").geq(2024)))

Do you have any additional context?

YANG-DB commented 1 month ago

@penghuo @LantaoJin can you plz review and comment ? thanks

LantaoJin commented 1 month ago

IMO, there are two kinds of cases which will leverage opensearch-hadoop (haven't deep dive, from the description, it sounds a connector):

  1. Fundamental usage of opensearch-hadoop: An OpenSearch index which is mapped as a Hive external table can be queried from Hadoop ecosystem (MR, Hive, Spark, Presto).
  2. External usage: OpenSearch query (DSL/SQL/PPL) can query data crossing HDFS and OpenSearch index.

In the case 1, the target query is from Hadoop ecosystem (Hive, Spark, Presto). Sounds there is no requirement for enhancement here. It's what the opensearch-hadoop project did. In the case 2, the target query is from OpenSearch (DSL/SQL/PPL). My question is what is the user story of this kind of crossing query. Why not just run SQL via case 1? What problem this solution resolve? Did you hear any request from community now?