opensearch-project / opensearch-spark

Spark Accelerator framework ; It enables secondary indices to remote data stores.
Apache License 2.0
14 stars 23 forks source link

[FEATURE] Support COPY operation #129

Open penghuo opened 1 year ago

penghuo commented 1 year ago

Feature - COPY

Overview

OpenSearch is the search and analytics suite powering popular use cases such as application search, log analytics, and more. (1) Users use the _bulk indexing API to ingest and index. The current _bulk indexing API places a high configuration burden on users today to avoid RejectedExecutionException due to TOO_MANY_REQUESTS. (2) While OpenSearch is part of critical business and application workflows, it is seldom used as a primary data store because there are no strong guarantees on data durability as the cluster is susceptible to data loss in case of hardware failure. In this document, we propose providing a solution to let customers manage raw data on a highly reliable object storage (e.g. S3), then use the COPY command to transfer data to OpenSearch at any time.

COPY

SYNTAX

LOAD DATA *index**-name* 
FROM *data_source* LOCATION *location*
[ COMPRESSION *file-compression* ] 
[ DATA FORMAT *data**-**format* ] [ *parameter* [ *argument* ] [, ... ] ]
[ AUTO ON | OFF [ notification ]]

Overview

You can perform a COPY operation with as few as three parameters: a index name, a data source and a location. OpenSearch COPY command enable you to load data in several data formats from multiple data sources, control access to load data, manage data transformations, and manage the load operation.

Index name

The name of the index for the COPY command. The index must already exist in the OpenSearch. The COPY command appends the new input data to any existing docs in the index.

FROM data_source LOCATION location

data source

The data source must already exist in OpenSearch. More reading Datasource Metadata Management.

location - Amazon S3

E.g. object path to load data from Amazon S3.

["s3://objectpath"]

File compression

File compression parameters

Data format

You can load data from text files in fixed-width, character-delimited, comma-separated values (CSV), or JSON format, or from Avro files.

JSON

The source data is in JSON format. The JSON data file contains a set of objects. COPY load each JSON object into index as a doc. Order in a JSON object doesn't matter. Internally, engine use _bulk api to index JSON object. For each error, OpenSearch records a row in the STL_LOAD_ERRORS system table. The LINE_NUMBER column records the last line of the JSON object that caused the error.

AUTO

If AUTO is set to true, the OpenSearch COPY operation will automatically detect any newly added objects and index them automatically. User could enable Amazon S3 event notification, then instead of pulling new data regularly, the COPY operation can pull objects after receiving the notification.

Usage

Load data from Amazon S3 into logs index.

LOAD DATA logs
FROM myS3 LOCATION "s3://my_http_logs"
COMPRESSION gzip
DATA FORMAT json

Solution

Leverage https://github.com/opensearch-project/sql/issues/948.

YANG-DB commented 9 months ago

@penghuo I like the idea !! can we also add a filter clause to narrow the load ?

dai-chen commented 9 months ago

@penghuo I like the idea !! can we also add a filter clause to narrow the load ?

I think it's doable. Currently the challenge is in Flint data source. If there is a simple way to pass entire JSON doc to data source to generate bulk, the rest is just translate COPY SQL statement to data frame code, similar as covering index/MV with WHERE clause.

penghuo commented 3 months ago

Storing Semi-structured Data in a VARIANT Column vs. Flattening the Nested Structure? https://docs.snowflake.com/en/user-guide/semistructured-considerations

Spark 4.0 include Variant data type. https://issues.apache.org/jira/browse/SPARK-45891

dai-chen commented 1 month ago

It seems this can be quickly tested and benchmarked/micro-benchmarked as follows:

Note that there is an option wholeText to load the entire file data as a single Row object. This approach can be more efficient, especially for small file without considering rate limit (spark.datasource.flint.write.batch_bytes). However, OpenSearch requires each JSON line to be preceded by an action line. We might want to consider whether a more efficient protocol than bulk is useful in our case.

@@ -31,6 +32,62 @@ class FlintDataSourceV2ITSuite

   import testImplicits._

+  test("copy from location to Flint data source") {
+    // Create a temporary JSON file with 5 lines
+    val jsonLines = Seq(
+      """{"accountId": "1", "eventName": "login", "eventSource": "source1"}""",
+      """{"accountId": "2", "eventName": "logout", "eventSource": "source2"}""",
+      """{"accountId": "3", "eventName": "login", "eventSource": "source3"}""",
+      """{"accountId": "4", "eventName": "logout", "eventSource": "source4"}""",
+      """{"accountId": "5", "eventName": "login", "eventSource": "source5"}"""
+    )
+    val tempFilePath = Files.createTempFile("tempJson", ".json")
+    Files.write(tempFilePath, jsonLines.mkString("\n").getBytes)
+
+    val tempFile = tempFilePath.toFile
+    try {
+      // Read JSON file as whole text
+      val df = spark.read
+        // .option("wholetext", "true")
+        .text(tempFile.getAbsolutePath)
+
+      df.show(false)
+
+      val indexName = "flint_test_index"
+
+      // Write to Flint data source
+      df.write
+        .format("flint")
+        .options(openSearchOptions)
+        .mode("overwrite")
+        .save(indexName)
+
+      // Read from Flint data source
+      val resultDf = spark.sqlContext.read
+        .format("flint")
+        .options(openSearchOptions)
+        .schema("accountId STRING, eventName STRING, eventSource STRING")
+        .load(indexName)
+
+      resultDf.show(false)
+
+      assert(resultDf.count() == 5)
+      val expectedRows = Seq(
+        Row("1", "login", "source1"),
+        Row("2", "logout", "source2"),
+        Row("3", "login", "source3"),
+        Row("4", "logout", "source4"),
+        Row("5", "login", "source5")
+      )
+
+      expectedRows.foreach { row =>
+        assert(resultDf.collect().contains(row))
+      }
+    } finally {
+      tempFile.delete()
+    }
+  }
--- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/json/FlintJacksonGenerator.scala
+++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/json/FlintJacksonGenerator.scala
@@ -264,11 +264,14 @@ case class FlintJacksonGenerator(
    *   The row to convert
    */
   def write(row: InternalRow): Unit = {
+    gen.writeRaw(row.getString(0))
+    /*
     writeObject(
       writeFields(
         fieldWriters = rootFieldWriters,
         row = row,
         schema = dataType.asInstanceOf[StructType]))
+     */
   }