opensearch-project / opensearch-spark

Spark Accelerator framework ; It enables secondary indices to remote data stores.
Apache License 2.0
16 stars 26 forks source link

[BUG] Fail to record error when result is too large to write back #259

Open kaituo opened 7 months ago

kaituo commented 7 months ago

What is the bug? Large result size can cause write result fail. But we don't log error in statement index.

2024-02-15T14:30:53.615Z 24/02/15 14:30:53 ERROR FlintREPL: Fail to write result of FlintCommand(state=running, query=select * from a.b.c, statementId=RHVJYXVyTUlVNnplcm9fZXRsX3Rlc3Q=, queryId=RHVJYXVyTUlVNnplcm9fZXRsX3Rlc3Q=, submitTime=1708007424612, error=Some()), cause: Writing job aborted 2024-02-15T14:30:53.615Z org.apache.spark.SparkException: Writing job aborted 2024-02-15T14:30:53.615Z at org.apache.spark.sql.errors.QueryExecutionErrors$.writingJobAbortedError(QueryExecutionErrors.scala:767) ~[spark-catalyst_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0] 2024-02-15T14:30:53.615Z at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:409) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0] 2024-02-15T14:30:53.615Z at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:353) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0] 2024-02-15T14:30:53.615Z at org.apache.spark.sql.execution.datasources.v2.AppendDataExec.writeWithV2(WriteToDataSourceV2Exec.scala:244) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0] 2024-02-15T14:30:53.615Z at org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run(WriteToDataSourceV2Exec.scala:332) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0] 2024-02-15T14:30:53.615Z at org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run$(WriteToDataSourceV2Exec.scala:331) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0] 2024-02-15T14:30:53.615Z at org.apache.spark.sql.execution.datasources.v2.AppendDataExec.run(WriteToDataSourceV2Exec.scala:244) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0] 2024-02-15T14:30:53.615Z at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0] 2024-02-15T14:30:53.615Z at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0] 2024-02-15T14:30:53.615Z at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0] 2024-02-15T14:30:53.615Z at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:104) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0] 2024-02-15T14:30:53.615Z at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107) ~[spark-catalyst_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0] 2024-02-15T14:30:53.615Z at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0] 2024-02-15T14:30:53.615Z at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:114) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0] 2024-02-15T14:30:53.615Z at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$7(SQLExecution.scala:139) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0] 2024-02-15T14:30:53.615Z at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107) ~[spark-catalyst_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0] 2024-02-15T14:30:53.615Z at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0] 2024-02-15T14:30:53.615Z at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:139) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0] 2024-02-15T14:30:53.615Z at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:245) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0] 2024-02-15T14:30:53.615Z at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:138) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0] 2024-02-15T14:30:53.615Z at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) ~[spark-sql_2.12-3.3.2-amzn-0.jar:0.1.0-SNAPSHOT] 2024-02-15T14:30:53.615Z at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0] 2024-02-15T14:30:53.615Z at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:101) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0] 2024-02-15T14:30:53.615Z at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:97) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0] 2024-02-15T14:30:53.615Z at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:626) ~[spark-catalyst_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0] 2024-02-15T14:30:53.615Z at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:179) ~[spark-catalyst_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0] 2024-02-15T14:30:53.615Z at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:626) ~[spark-catalyst_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0] 2024-02-15T14:30:53.615Z at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31) ~[spark-catalyst_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0] 2024-02-15T14:30:53.615Z at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267) ~[spark-catalyst_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0] 2024-02-15T14:30:53.615Z at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263) ~[spark-catalyst_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0] 2024-02-15T14:30:53.615Z at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31) ~[spark-catalyst_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0] 2024-02-15T14:30:53.615Z at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31) ~[spark-catalyst_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0] 2024-02-15T14:30:53.615Z at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:602) ~[spark-catalyst_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0] 2024-02-15T14:30:53.615Z at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:97) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0] 2024-02-15T14:30:53.615Z at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:84) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0] 2024-02-15T14:30:53.615Z at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:82) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0] 2024-02-15T14:30:53.615Z at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:125) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0] 2024-02-15T14:30:53.615Z at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860) ~[spark-sql_2.12-3.3.2-amzn-0.jar:0.1.0-SNAPSHOT] 2024-02-15T14:30:53.615Z at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:311) ~[spark-sql_2.12-3.3.2-amzn-0.jar:0.1.0-SNAPSHOT] 2024-02-15T14:30:53.615Z at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239) ~[spark-sql_2.12-3.3.2-amzn-0.jar:0.1.0-SNAPSHOT] 2024-02-15T14:30:53.615Z at org.apache.spark.sql.FlintJobExecutor.writeData(FlintJobExecutor.scala:97) ~[opensearch-spark-sql-application_2.12-0.1.0.jar:0.1.0-SNAPSHOT] 2024-02-15T14:30:53.615Z at org.apache.spark.sql.FlintJobExecutor.writeDataFrameToOpensearch(FlintJobExecutor.scala:115) ~[opensearch-spark-sql-application_2.12-0.1.0.jar:0.1.0-SNAPSHOT] 2024-02-15T14:30:53.615Z at org.apache.spark.sql.FlintJobExecutor.writeDataFrameToOpensearch$(FlintJobExecutor.scala:110) ~[opensearch-spark-sql-application_2.12-0.1.0.jar:0.1.0-SNAPSHOT] 2024-02-15T14:30:53.615Z at org.apache.spark.sql.FlintREPL$.writeDataFrameToOpensearch(FlintREPL.scala:42) ~[opensearch-spark-sql-application_2.12-0.1.0.jar:0.1.0-SNAPSHOT] 2024-02-15T14:30:53.615Z at org.apache.spark.sql.FlintREPL$.$anonfun$finalizeCommand$1(FlintREPL.scala:563) ~[opensearch-spark-sql-application_2.12-0.1.0.jar:0.1.0-SNAPSHOT] 2024-02-15T14:30:53.615Z at org.apache.spark.sql.FlintREPL$.$anonfun$finalizeCommand$1$adapted(FlintREPL.scala:563) ~[opensearch-spark-sql-application_2.12-0.1.0.jar:0.1.0-SNAPSHOT] 2024-02-15T14:30:53.615Z at scala.Option.foreach(Option.scala:407) ~[scala-library-2.12.15.jar:?] 2024-02-15T14:30:53.615Z at org.apache.spark.sql.FlintREPL$.finalizeCommand(FlintREPL.scala:563) ~[opensearch-spark-sql-application_2.12-0.1.0.jar:0.1.0-SNAPSHOT] 2024-02-15T14:30:53.615Z at org.apache.spark.sql.FlintREPL$.processCommands(FlintREPL.scala:534) ~[opensearch-spark-sql-application_2.12-0.1.0.jar:0.1.0-SNAPSHOT] 2024-02-15T14:30:53.615Z at org.apache.spark.sql.FlintREPL$.queryLoop(FlintREPL.scala:317) ~[opensearch-spark-sql-application_2.12-0.1.0.jar:0.1.0-SNAPSHOT] 2024-02-15T14:30:53.615Z at org.apache.spark.sql.FlintREPL$.$anonfun$main$4(FlintREPL.scala:156) ~[opensearch-spark-sql-application_2.12-0.1.0.jar:0.1.0-SNAPSHOT] 2024-02-15T14:30:53.615Z at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) ~[scala-library-2.12.15.jar:?] 2024-02-15T14:30:53.615Z at org.apache.spark.sql.FlintREPL$.exponentialBackoffRetry(FlintREPL.scala:1004) ~[opensearch-spark-sql-application_2.12-0.1.0.jar:0.1.0-SNAPSHOT] 2024-02-15T14:30:53.615Z at org.apache.spark.sql.FlintREPL$.main(FlintREPL.scala:156) ~[opensearch-spark-sql-application_2.12-0.1.0.jar:0.1.0-SNAPSHOT] 2024-02-15T14:30:53.615Z at org.apache.spark.sql.FlintREPL.main(FlintREPL.scala) ~[opensearch-spark-sql-application_2.12-0.1.0.jar:0.1.0-SNAPSHOT] 2024-02-15T14:30:53.615Z at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?] 2024-02-15T14:30:53.615Z at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[?:?] 2024-02-15T14:30:53.615Z at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?] 2024-02-15T14:30:53.615Z at java.lang.reflect.Method.invoke(Method.java:568) ~[?:?] 2024-02-15T14:30:53.615Z at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) ~[spark-core_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0] 2024-02-15T14:30:53.615Z at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1006) ~[spark-core_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0] 2024-02-15T14:30:53.615Z at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180) ~[spark-core_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0] 2024-02-15T14:30:53.615Z at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203) ~[spark-core_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0] 2024-02-15T14:30:53.615Z at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90) ~[spark-core_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0] 2024-02-15T14:30:53.615Z at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1095) ~[spark-core_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0] 2024-02-15T14:30:53.615Z at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1104) ~[spark-core_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0] 2024-02-15T14:30:53.615Z at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) ~[spark-core_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0] 2024-02-15T14:30:53.615Z Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 11) ([2600:1f10:4f29:c700:1cdc:11e0:e7e9:5e18] executor 1): OpenSearchStatusException[Unable to parse response body]; nested: ResponseException[method [POST], host [a.b.c.com/], URI [/_bulk?refresh=wait_for&timeout=1m], status line [HTTP/1.1 413 Request Entity Too Large] 2024-02-15T14:30:53.615Z {"Message":"Request size exceeded 10485760 bytes"}]; 2024-02-15T14:30:53.615Z at org.opensearch.client.RestHighLevelClient.parseResponseException(RestHighLevelClient.java:2208) 2024-02-15T14:30:53.615Z at org.opensearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1924) 2024-02-15T14:30:53.615Z at org.opensearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1877) 2024-02-15T14:30:53.615Z at org.opensearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:1845) 2024-02-15T14:30:53.615Z at org.opensearch.client.RestHighLevelClient.bulk(RestHighLevelClient.java:364) 2024-02-15T14:30:53.615Z at org.opensearch.flint.core.storage.OpenSearchWriter.flush(OpenSearchWriter.java:61) 2024-02-15T14:30:53.615Z at com.fasterxml.jackson.core.json.WriterBasedJsonGenerator.flush(WriterBasedJsonGenerator.java:967) 2024-02-15T14:30:53.615Z at org.apache.spark.sql.flint.json.FlintJacksonGenerator.flush(FlintJacksonGenerator.scala:258) 2024-02-15T14:30:53.615Z at org.apache.spark.sql.flint.FlintPartitionWriter.commit(FlintPartitionWriter.scala:70) 2024-02-15T14:30:53.615Z at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$1(WriteToDataSourceV2Exec.scala:453) 2024-02-15T14:30:53.615Z at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1550) 2024-02-15T14:30:53.615Z at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:480) 2024-02-15T14:30:53.615Z at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:381) 2024-02-15T14:30:53.615Z at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) 2024-02-15T14:30:53.615Z at org.apache.spark.scheduler.Task.run(Task.scala:138) 2024-02-15T14:30:53.615Z at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548) 2024-02-15T14:30:53.615Z at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1516) 2024-02-15T14:30:53.615Z at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551) 2024-02-15T14:30:53.615Z at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) 2024-02-15T14:30:53.615Z at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) 2024-02-15T14:30:53.615Z at java.base/java.lang.Thread.run(Thread.java:840) 2024-02-15T14:30:53.615Z Suppressed: ParsingException[Failed to parse object: expecting field with name [error] but found [Message]] 2024-02-15T14:30:53.615Z at org.opensearch.common.xcontent.XContentParserUtils.ensureFieldName(XContentParserUtils.java:63) 2024-02-15T14:30:53.615Z at org.opensearch.OpenSearchException.failureFromXContent(OpenSearchException.java:642) 2024-02-15T14:30:53.615Z at org.opensearch.rest.BytesRestResponse.errorFromXContent(BytesRestResponse.java:199) 2024-02-15T14:30:53.615Z at org.opensearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:2228) 2024-02-15T14:30:53.615Z at org.opensearch.client.RestHighLevelClient.parseResponseException(RestHighLevelClient.java:2205) 2024-02-15T14:30:53.615Z ... 20 more 2024-02-15T14:30:53.615Z Caused by: org.opensearch.client.ResponseException: method [POST], host [a.b.c.com/], URI [/_bulk?refresh=wait_for&timeout=1m], status line [HTTP/1.1 413 Request Entity Too Large] 2024-02-15T14:30:53.615Z {"Message":"Request size exceeded 10485760 bytes"} 2024-02-15T14:30:53.615Z at org.opensearch.client.RestClient.convertResponse(RestClient.java:375) 2024-02-15T14:30:53.615Z at org.opensearch.client.RestClient.performRequest(RestClient.java:345) 2024-02-15T14:30:53.615Z at org.opensearch.client.RestClient.performRequest(RestClient.java:320) 2024-02-15T14:30:53.615Z at org.opensearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1911) 2024-02-15T14:30:53.615Z ... 19 more 2024-02-15T14:30:53.615Z Driver stacktrace:

Our statement index:

{"_id": "blah=", "_index": ".query_execution_request_blah", "_primary_term": 2, "_seq_no": 1317, "_source": {"applicationId": "blah", "dataSourceName": "blah", "error": "", "jobId": "blah", "lang": "sql", "query": "select * from a.b.c", "queryId": "blah=", "sessionId": "blah=", "state": "failed", "statementId": "blah=", "submitTime": 1708007424612, "type": "statement", "version": "1.0"}, "_version": 3, "found": true}

How can one reproduce the bug? Steps to reproduce the behavior:

  1. select from a large table without limit.

What is the expected behavior? We should log write failure in statement index.

noCharger commented 7 months ago

Discuss offline with @kaituo ; we could log the failure in the error field, but I'm also curious about the DF size limit and whether we could further partition it.

dai-chen commented 2 weeks ago

Network limit for AOS domain: https://docs.aws.amazon.com/opensearch-service/latest/developerguide/limits.html