opensearch-project / opensearch-spark

Spark Accelerator framework ; It enables secondary indices to remote data stores.
Apache License 2.0
23 stars 33 forks source link

[RFC] OpenSearch and Apache Spark Integration #4

Open penghuo opened 1 year ago

penghuo commented 1 year ago

Introduction

We received a feature request for query execution on object stores in OpenSearch.

We have investigated the possibility to build a new solution for OpenSearch uses and leverage object store as storage. Which includes

We found the challenges are

We found these work have been solved by general purpose data preprocessing system, E.g. Presto, Spark, Trino. And build such a platform require years to mature.

Idea

The initial idea is

  1. Using SQL as interface.
  2. Leverage spark as query/compute execution engine.

High level diagram:

Screenshot 2023-06-16 at 8 21 37 AM

User Experience

  1. User configure SPARK cluster as computation resource, E.g. https://SPARK:7707.
  2. User submit SQL to OpenSearch cluster use _plugins/_sql REST API.
    1. SQL engine parse and analysis the SQL query.
    2. SQL engine decide whether route the query to SPARK cluster or run query locally.
  3. In phase-1, we provide interface to let user create derived dataset from data on object store and store in OpenSearch. Then query will be optimized based derived dataset automatically during query time.
  4. In phase-2, we provide opt-in optimization choice for user. The derived dataset will be create automatically based on query pattern.

Epic

anirudha commented 1 year ago

This enables spark as a compute connector to opensearch data. correct ? can we set this up as a remote compute connection similar to a data source ?

penghuo commented 1 year ago

This enables spark as a compute connector to opensearch data. correct ?

Yes. But not limit to OpenSearch data.

can we set this up as a remote compute connection similar to a data source ?

It is one option, but i feel we should make it more generic. ML could also leverage Spark as computation engine.

query from spark cluster to index is SQL ?

YANG-DB commented 1 year ago

Will opensearch SQL engine be responsible for analyzing the query and dispatching all the queries to the MPP engine ? or it will have the ability to do parts of the query itself (for opensearh indices) and other parts delegate to spark ? will this require adding rules to Catalyst ?

dai-chen commented 1 year ago

Just some thoughts for discussion and PoC later: we need to verify and confirm the role of Spark RDD (with/without Spark SQL) in OpenSearch:

  1. Provide capability for querying object store only by Spark RDD: immediate requirement and our own query engine is still needed for querying OpenSearch index and planning execution jobs for Spark
  2. Provide capability for querying OpenSearch index as well by Spark SQL + RDD: higher complexity and needs to either get Spark SQL/RDD work with OpenSearch DSL, or OpenSearch index, or Lucene index directly and thus: 2a. Provide faster execution path or remove limitation in OpenSearch aggregation/join 2b. Replace OpenSearch DSL query
dai-chen commented 1 year ago

As discussed, Spark SQL and RDD is only for purpose # 1 above. Leveraging it for querying OpenSearch index is a totally different story and not our current goal. So in this case, the question for introducing Spark SQL is: whether we need it for Spark RDD job optimizing and planning to query object store.

Implementation options:

  1. Introduce Spark SQL as library
  2. Introduce partial, such as only Catalyst optimizer
  3. Copy source code to make required changes
  4. Reuse our own engine to plan RDD job

Research items:

  1. Metastore: how/where to manage table metadata for Spark SQL
  2. Fault tolerance: get WAL and intermediate data store work for streaming
  3. Thread pool: check if any blocking operation in Spark SQL
  4. Data source integration: pass credential from Data Source introduced in 2.4 to file system reader
  5. Plugin setting: for example response size limit, need to make it work as well
anirudha commented 1 year ago

https://user-images.githubusercontent.com/19362/222788831-68184ee3-d96c-4919-bb2e-8944f9c35d63.mov

ps48 commented 1 year ago

https://user-images.githubusercontent.com/4348487/222835845-e0165208-d00a-4323-94e6-6b0af23d1d09.mp4

ryn9 commented 1 year ago

Amazing stuff!

How will you support filtering (eg. timestamp ranges and/or keywords) in relation to S3 path schema.

For example - if using fluentbit's S3 output with s3_keyformat /$TAG[2]/$TAG[0]/%Y/%m/%d/%H%M_%S/$UUID.gz how will we map a keyword to pull objects with only the tags in a supplied filter and time range desired?

ref: https://docs.fluentbit.io/manual/pipeline/outputs/s3/

dai-chen commented 1 year ago

Amazing stuff!

How will you support filtering (eg. timestamp ranges and/or keywords) in relation to S3 path schema.

For example - if using fluentbit's S3 output with s3_keyformat /$TAG[2]/$TAG[0]/%Y/%m/%d/%H%M_%S/$UUID.gz how will we map a keyword to pull objects with only the tags in a supplied filter and time range desired?

ref: https://docs.fluentbit.io/manual/pipeline/outputs/s3/

@ryn9 Similar as optimization in other query engine, we can leverage partition pruning and data skipping on your data (path or content). Please see general example for data skipping in https://github.com/opensearch-project/sql/issues/1379#issuecomment-1459022886. We may look into FluentBit later. Thanks!

anirudha commented 1 year ago

Decorators will be available in FluentBit/ Data-Pepper/ otel-exporter

dai-chen commented 1 year ago

Phase 0 demo: https://github.com/opensearch-project/sql/discussions/1465

muralikpbhat commented 1 year ago

Great initiative. Really like the price performance trade off that this solution will bring in. Few questions below:

  1. Can we think about and call out what are the downsides of doing query planing in spark? Will it restrict some of the existing features of open search? What are those ? Few pointers:

a. What are the types of queries that doesn’t work with sql today ? b. How will DLS/FLS work ? c. How document level alerting/percolator work?

  1. How are we thinking about life cycle management of materialised views ? We need an ability to delete old MVs. Assuming maximus table and skipping indices don’t need that as they will not be very huge.
  2. Are we using data streams for MV so that we don’t need explicit index rotation ?
  3. Can we think of on-demand materialised views instead of keeping it up to date (cost reduction)
  4. In case of MV, can the query span across MV and raw data ? (Case where one data file is projected completely and the other is not)
  5. Similarly, can the query span across fields in MV and raw data for the same document? (Not for fields in skipping index, but for fields in MVs covered index)
dai-chen commented 1 year ago

@muralikpbhat Thanks for all the comment! Please find my answer inline as below.


1.Can we think about and call out what are the downsides of doing query planing in spark? Will it restrict some of the existing features of open search? What are those?

In our demo, we use Spark SQL mostly for building skipping index and MV into OpenSearch index. Finally all query and dashboard works with the index as before.

As you asked below, I assume we're talking about Spark SQL query with OS index involved, if so there are limitations:

a. What are the types of queries that doesn’t work with sql today ?

OpenSearch functions including full text search and aggregation: this maybe solved by either improving OS-Hadoop or introducing our OS SQL plugin into Spark.

How will DLS/FLS work ?

I think we need separate AuthN/Z for raw data on S3. If you're talking about OS index, the query sent to OS is still DSL which may work. We need deep dive.

How document level alerting/percolator work?

I think all OS feature can work with MV. But for raw data, I'm not sure. Need to understand the use case and workflow.


2.How are we thinking about life cycle management of materialised views ? We need an ability to delete old MVs. Assuming maximus table and skipping indices don’t need that as they will not be very huge.

Yes, we're considering MV as second level and on-demand acceleration strategy. We will provide standard SQL API for higher level application to use, such as SHOW/DROP MV.


3.Are we using data streams for MV so that we don’t need explicit index rotation ?

As shown in demo above, the sink (destination) of streaming job behind MV is regular OpenSearch index. I think we can make it any OpenSearch object as long as OpenSearch-Hadoop connector can support it.


4.Can we think of on-demand materialised views instead of keeping it up to date (cost reduction)

Yes, that's what we're doing in the demo. We ignore the strong consistency between MV and source intentionally.


5.In case of MV, can the query span across MV and raw data ? (Case where one data file is projected completely and the other is not)

Yes, because MV itself is a table too. User can use it in any query with raw data. We didn't do this in the demo because currently OS-Hadoop doesn't extend Spark Catalog so efforts required to register MV or any OS index to Spark catalog.

Meanwhile, I'm not sure what specific use case or query you're referring to. Actually we also consider and may need this in future for Hybrid Scan capability. Hybrid scan will union the MV data and latest raw data. This will be helpful for customer who want strong consistency.


6.Similarly, can the query span across fields in MV and raw data for the same document? (Not for fields in skipping index, but for fields in MVs covered index)

Not pretty sure what the query looks like. I think it's possible as long as there is primary key field in MV correlated to row in raw data.

sathishbaskar commented 1 year ago

Would joins involve pulling data to RDDs?

penghuo commented 1 year ago

Would joins involve pulling data to RDDs?

could you eleberate more? do you mean join OpenSearch Index and S3?

sathishbaskar commented 1 year ago

could you eleberate more? do you mean join OpenSearch Index and S3?

An example would help explain this better. Consider following datasets,

users [20 billion docs, ~ 2 TB]
user_id, user_name, user_location

pages [1 trillion docs, ~ 90 TB]
page_id, website_id

page_views [10 trillion docs, over 1 PB]
hour_timestamp
user_id
page_id

If I have to prepare a report every day, to summarize page view pattern in the 7 days - top 100 pages and top 100 locations, with following result schemas,

  1. day, hour, page_id, website_id, views
  2. day, hour, user_location, views
SELECT
  DATE(pv.hour_timestamp) AS day, HOUR(pv.hour_timestamp) AS hour, pv.page_id, p.website_id,  COUNT(*) AS views
FROM
  page_views pv JOIN pages p ON pv.page_id = p.page_id
WHERE
  pv.hour_timestamp >= date_sub(current_date(), interval 7 days)
GROUP BY
  day, hour, pv.page_id, p.website_id
ORDER BY views DESC LIMIT 100
SELECT
  DATE(pv.hour_timestamp) AS day, HOUR(pv.hour_timestamp) AS hour, u.user_location, COUNT(*) AS views
FROM
  page_views pv JOIN users u ON pv.user_id = u.user_id
WHERE
  pv.hour_timestamp >= date_sub(current_date(), interval 7 days)
GROUP BY
  day, hour, u.user_location
ORDER BY views DESC
LIMIT 100

Assuming users & pages are completely available in Opensearch storage in a reasonably large cluster, and page_views is a materialized view, with most data in S3, I'd like to understand how we plan to make the joins work. Would Spark data frames be loaded with data fetched from Opensearch index and Opensearch materialized views, and then processed within Spark runtime? And do we intend to push down some of the compute to Opensearch, as we could avoid good amount of network transfers?