opensearch-project / opensearch-spark

Spark Accelerator framework ; It enables secondary indices to remote data stores.
Apache License 2.0
22 stars 33 forks source link

[FEATURE] Support loading source data to OpenSearch index in batches #699

Open dai-chen opened 1 month ago

dai-chen commented 1 month ago

Is your feature request related to a problem?

Currently, there is no way to load data from a source table to an OpenSearch index while controlling the number of rows in each batch. Users are forced to rely on covering indexes or materialized views for data loading, but these only allow controlling the number of files or the total byte size (Spark 4.0) per refresh.

What solution would you like?

To achieve row-based batching, I propose allowing users to utilize a low-level INSERT statement directly on the OpenSearch table. This would enable users to control the number of rows loaded in each batch by specifying row ranges, similar to:

# Generate batch by SQL windowing function:
WITH numbered_records AS (
    SELECT *,
           ROW_NUMBER() OVER (ORDER BY id) AS row_num
    FROM table
    WHERE date = 'xxx'
)
INSERT INTO [os_index_table]
SELECT *
FROM numbered_records
WHERE row_num > [start] AND row_num <= [end]

# Alternatively, Spark 3.5.2 supports OFFSET clause:
INSERT INTO [os_index_table]
SELECT *
FROM table
WHERE date = 'XXX'
ORDER BY id
LIMIT [page_size]
OFFSET [page_start]

What alternatives have you considered?

In certain conditions, such as when no filtering is applied or filtering is limited to partitions, it may be possible to implement row-based control within covering indexes or materialized views. However, it's essential to evaluate whether this approach aligns with the intended behavior and design of Flint index refresh.

Do you have any additional context?

dai-chen commented 1 month ago

To implement the approach outlined above, we need to integrate the OpenSearch table concept into SparkSQL by:

  1. Enabling the OpenSearch catalog and ensuring that multiple catalogs function seamlessly in Spark.
  2. Supporting the CREATE TABLE statement for the OpenSearch catalog.
  3. Supporting the INSERT statement for OpenSearch tables.

In addition to these primary tasks, several supplementary efforts are required, such as handling data type mapping between SparkSQL and OpenSearch, as discussed in issue #699. Given the complexity of these tasks, I am currently exploring alternative approaches that avoid exposing the OpenSearch table concept for now, such as COPY command previously proposed in https://github.com/opensearch-project/opensearch-spark/issues/129#issuecomment-2387009768.