opensearch-project / opensearch-spark

Spark Accelerator framework ; It enables secondary indices to remote data stores.
Apache License 2.0
14 stars 23 forks source link

[FEATURE]PPL Add implicit limit rewrite for performance #705

Open YANG-DB opened 5 days ago

YANG-DB commented 5 days ago

Is your feature request related to a problem? In OpenSearch engine there is a default (implicit) limit of 10k results that has to be explicitly changed to return a larger amount of rows. In spark this is not the case, this may cause to large execution time and costs.

What solution would you like? We would like to add this functionality as a default behaviour for spark, so that any non-limited query will be rewritten to have a default limit.

This limit may be manually changed in each and every query separately or by updating some config default MAX_ RESULTS as part of the PPL/SQL command / setting API.

Setting MAX_RESULTS:

Options-1 Use flint's spark job submit parameters as shown here:

aws emr-serverless start-job-run \
    --region <region-name> \
    --application-id <application-id> \
    --execution-role-arn <execution-role>  \
    --job-driver '{"sparkSubmit": {"entryPoint": "<flint-job-s3-path>", \
      "entryPointArguments":["'<sql-query>'", "<result-index>", "<data-source-name>"], \
      "sparkSubmitParameters":"--class org.opensearch.sql.FlintJob \
        --conf spark.hadoop.fs.s3.customAWSCredentialsProvider=com.amazonaws.emr.AssumeRoleAWSCredentialsProvider \
        --conf spark.emr-serverless.driverEnv.ASSUME_ROLE_CREDENTIALS_ROLE_ARN=<role-to-access-s3-and-opensearch> \
        --conf spark.executorEnv.ASSUME_ROLE_CREDENTIALS_ROLE_ARN=<role-to-access-s3-and-opensearch> \
        --conf spark.hadoop.aws.catalog.credentials.provider.factory.class=com.amazonaws.glue.catalog.metastore.STSAssumeRoleSessionCredentialsProviderFactory \
        --conf spark.hive.metastore.glue.role.arn=<role-to-access-s3-and-opensearch> \
        --conf spark.jars=<path-to-AWSGlueDataCatalogHiveMetaStoreAuth-jar> \
        --conf spark.jars.packages=<flint-spark-integration-jar-name> \
        --conf spark.jars.repositories=<path-to-download_spark-integration-jar> \
        --conf spark.emr-serverless.driverEnv.JAVA_HOME=<java-home-in-emr-serverless-host> \
        --conf spark.executorEnv.JAVA_HOME=<java-home-in-emr-serverless-host> \
        --conf spark.datasource.flint.host=<opensearch-url> \
        --conf spark.datasource.flint.port=<opensearch-port> \
        --conf spark.datasource.flint.scheme=<http-or-https> \
        --conf spark.datasource.flint.auth=<auth-type> \
        --conf spark.datasource.flint.region=<region-name> \
        --conf spark.datasource.flint.customAWSCredentialsProvider=com.amazonaws.emr.AssumeRoleAWSCredentialsProvider \
        --conf spark.sql.extensions=org.opensearch.flint.spark.FlintSparkExtensions \
        --conf spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory "}}'
    <data-source-name>

Options-2 Global Spark Configuration, modify Spark's configuration to apply a default limit globally by setting parameters in the spark-defaults.conf file. This file applies to all Spark jobs and sessions across the cluster. In a similar way to the application properties

How to set the default MAX_ RESULTS should be further discussed but IMO the first iteration can be just a number that can be updated explicitly on each run and also use the help command to inform the user on the current settings values.

Do you have any additional context?

YANG-DB commented 5 days ago

@LantaoJin @penghuo @dai-chen I would be happy to get your feedback here since IMO this also may apply to SQL (direct) queries

LantaoJin commented 4 days ago

How to set the default MAX_ RESULTS should be further discussed

@YANG-DB I think this question should be discussed and designed as the first step since Spark is quite different with OpenSearch SQL engine. The fetch size may affect the performance or cost(uncertain) of OpenSearch, but in Spark, it is a different story. The factors affecting Spark's performance and cost are not just the Scan size (definitely not the number of rows), but more importantly, the number of Spark partitions (the number of tasks, not table partition). For example: table t1 with 1 million rows is 100MB in size, and query SELECT * FROM t1 submits 10 tasks, table t2 with 100k rows is 2GB in size, query SELECT * FROM t2 submits 100 tasks. Therefore, simply limiting the Scan size doesn't work to control the cost. In addition, a Spark job has multiple stages. Even if two different queries act on the same table, due to they have different query plans, their costs are totally different. As an example, Query 1 may submit 1000 tasks in Stage 1 and 10 tasks in Stage 2. Query 2 submits same 1000 tasks in Stage 1 and 1000 tasks in Stage 2. In this case, the cost of Query 2 is significantly higher than Query 1. Unfortunately, Spark do not have any public configuration to limit the total submitted partition number or Scan size. What I know is there is a configuration spark.sql.files.maxPartitionNum to limit the partition number of one Scan Stage.

BTW, MAX_ RESULTS is not what we needs. The computing cost is not depended on how much output data (results). SELECT ... limit 1 could cost 10x time and resources than SELECT ... limit 10000.

LantaoJin commented 4 days ago

I don't have the whole page of Flint, but from my perspective, an alternative solution is adding a custom SparkListener. Following is a POC to limit input size from all tasks in a stage per query. Besides taskMetrics, we can get more runtime metrics in stage level, even in SQL level via SQLMetrics. Any thoughts? @YANG-DB @penghuo @dai-chen

import org.apache.spark.SparkContext
import org.apache.spark.scheduler.{SparkListener, SparkListenerStageCompleted}
import org.apache.spark.sql.SparkSession

class FailingJobSparkListenerTest(inputSizeLimitBytes: Long) extends SparkListener {
  override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = {
    val stageInfo = stageCompleted.stageInfo

    // Get the total input size from all tasks in the stage
    val totalInputSize = stageInfo.taskMetrics.inputMetrics.bytesRead

    // Example condition: Fail the job if total input size exceeds the given limit
    if (totalInputSize > inputSizeLimitBytes) {
      throw new RuntimeException(s"Failing because input size" +
        s" $totalInputSize Byte exceeds limit of" +
        s" $inputSizeLimitBytes Byte.")
    }
  }
}

object FailingJobSparkListenerTest {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("FailingJobExample")
      .getOrCreate()

    val sc: SparkContext = spark.sparkContext

    // Add the custom listener to the SparkContext
    val listener1 = new CustomFailingSparkListenerTest(1024)
    sc.addSparkListener(listener1)
    // sc.removeSparkListener(listener)

    // Some sample Spark job with a scan
    val usersDF = spark.read.load("examples/src/main/resources/users.parquet")
    usersDF.show()
    // ERROR AsyncEventQueue: Listener FailingJobSparkListenerTest threw an exception
    // java.lang.RuntimeException: Failing because input size 1117 Byte exceeds limit of 1024 Byte.

    sc.removeSparkListener(listener1)
    val listener2 = new CustomFailingSparkListenerTest(10240)
    sc.addSparkListener(listener2)
    usersDF.show()
    // no error
    // +------+--------------+----------------+
    // |  name|favorite_color|favorite_numbers|
    // +------+--------------+----------------+
    // |Alyssa|          NULL|  [3, 9, 15, 20]|
    // |   Ben|           red|              []|
    // +------+--------------+----------------+
    spark.stop()
  }
}
YANG-DB commented 1 day ago

@LantaoJin I totally agree - Can we first define the most common commands we suspect are subject to such potential high cost ?

lets plan this in more details...