opensearch-project / opensearch-spark

Spark Accelerator framework ; It enables secondary indices to remote data stores.
Apache License 2.0
17 stars 27 forks source link

[FEATURE] Performance - LIMIT clause does been optimized in execution stage #754

Open penghuo opened 1 week ago

penghuo commented 1 week ago

Is your feature request related to a problem?

Problem statements

A query with a LIMIT clause, such as SELECT * FROM cloudtrail LIMIT 10, takes 1.5 minutes to execute when querying a CloudTrail dataset containing over 10,000 files. Key issues observed:

Root cause analysis

We use the following example to explain the problem. The dataset consists of 225 files, and Spark splits and groups them into 12 input splits. When the user submits the query SELECT * FROM alb_logs LIMIT 10, the expected behavior is that Spark should scan only one split (i.e., one file). If the query successfully retrieves the 10 rows, it returns the result without scanning additional files. Otherwise, Spark will scan more files, controlled by the spark.sql.limit.scaleUpFactor. For instance, the query execution plan for this query is shown in the figure below. The Spark job only contains one task, which fetches 10 rows from a single file without requiring a shuffle stage. The entire job took 24 milliseconds to complete.

Screenshot 2024-10-08 at 7 53 31 AM

However, when the query is submitted through FlintREPL, it interacts with Spark using the following code: spark.sql("SELECT * FROM alb_logs LIMIT 10").toJSON.collect() In this case, the execution plan differs. The introduction of the toJSON operator causes Spark to split the execution into two stages, with a shuffle stage in between. which leads to unnecessary overhead.

Screenshot 2024-10-08 at 7 44 32 AM

What solution would you like? I would like a solution that can progressively plan the InputPartition and collect only the necessary dataset.

What alternatives have you considered? n/a

Do you have any additional context? attached

LantaoJin commented 6 days ago

@penghuo I think this is a bug in Spark. I filed a PR https://github.com/apache/spark/pull/48407 to fix it. Before being merged, if this is a very common usage in our Flint, maybe we should remove the toJSON and translate dataset to JSON format inside Flint.