uber / RemoteShuffleService

Remote shuffle service for Apache Spark to store shuffle data on remote servers.
Other
323 stars 100 forks source link

Why this cannot support speculation #27

Open boneanxs opened 4 years ago

boneanxs commented 4 years ago

Is there any spark issues that I can track this random task attempt ids problem? I test a lot of cases, but I can not reproduce this. Thanks in advance. https://github.com/uber/RemoteShuffleService/blob/7220c23694e0175e01719621707680a2718173cf/src/main/scala/org/apache/spark/shuffle/RssShuffleManager.scala#L79

hiboyang commented 3 years ago

It may already work with speculation in latest version. Do you get any issue when running with speculation?

shikuiye commented 3 years ago

Presumption is not supported at this time. Turn on presumption for possible data duplication. When will Spark conjecture be supported?

shikuiye commented 3 years ago

@hiboyang Is there now speculation implemented in Uber's internal RSS?

Lobo2008 commented 2 years ago

It may already work with speculation in latest version. Do you get any issue when running with speculation?

Hi @hiboyang ,seems speculation is still not supported yet, app always throw exception when I set spark.shuffle.service.enabled=true

spark 2.4.5 + rss master branch

22/09/27 16:55:25 ERROR FileFormatWriter: Aborting job 06b0682a-c848-4b0e-a78c-754774e500ff.
org.apache.spark.SparkException: Adaptive execution failed due to stage materialization failures.
    at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.org$apache$spark$sql$execution$adaptive$AdaptiveSparkPlanExec$$cleanUpAndThrowException(AdaptiveSparkPlanExec.scala:540)
    at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec$$anonfun$getFinalPhysicalPlan$1$$anonfun$apply$4.apply(AdaptiveSparkPlanExec.scala:169)
    at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec$$anonfun$getFinalPhysicalPlan$1$$anonfun$apply$4.apply(AdaptiveSparkPlanExec.scala:156)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec$$anonfun$getFinalPhysicalPlan$1.apply(AdaptiveSparkPlanExec.scala:156)
    at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec$$anonfun$getFinalPhysicalPlan$1.apply(AdaptiveSparkPlanExec.scala:142)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:776)
    at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:142)
    at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.doExecute(AdaptiveSparkPlanExec.scala:258)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:163)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:159)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:187)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:184)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:159)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:143)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:163)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:159)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:187)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:184)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:159)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:85)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:83)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:79)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:126)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:74)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
    at main.scala.MyQuery$.outputDF(MyQuery.scala:42)
    at main.scala.MyQuery$.executeQuery(MyQuery.scala:57)
    at main.scala.MyQuery$.main(MyQuery.scala:105)
    at main.scala.MyQuery.main(MyQuery.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
    at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:852)
    at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
    at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
    at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
    at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:927)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:936)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.apache.spark.SparkException: Early failed query stage found: ShuffleQueryStage 0
+- Exchange hashpartitioning(l_returnflag#51, l_linestatus#52, 401), true
   +- *(1) HashAggregate(keys=[l_returnflag#51, l_linestatus#52], functions=[partial_sum(l_quantity#47), partial_sum(l_extendedprice#48), partial_sum(UDF(l_extendedprice#48, l_discount#49)), partial_sum(UDF(UDF(l_extendedprice#48, l_discount#49), l_tax#50)), partial_avg(l_quantity#47), partial_avg(l_extendedprice#48), partial_avg(l_discount#49), partial_count(1)], output=[l_returnflag#51, l_linestatus#52, sum#262, sum#263, sum#264, sum#265, sum#266, count#267L, sum#268, count#269L, sum#270, count#271L, count#272L])
      +- *(1) Project [l_quantity#47, l_extendedprice#48, l_discount#49, l_tax#50, l_returnflag#51, l_linestatus#52]
         +- *(1) Filter (isnotnull(l_shipdate#53) && (l_shipdate#53 <= 1998-09-02))
            +- *(1) SerializeFromObject [assertnotnull(input[0, main.scala.Lineitem, true]).l_orderkey AS l_orderkey#43L, assertnotnull(input[0, main.scala.Lineitem, true]).l_partkey AS l_partkey#44L, assertnotnull(input[0, main.scala.Lineitem, true]).l_suppkey AS l_suppkey#45L, assertnotnull(input[0, main.scala.Lineitem, true]).l_linenumber AS l_linenumber#46L, assertnotnull(input[0, main.scala.Lineitem, true]).l_quantity AS l_quantity#47, assertnotnull(input[0, main.scala.Lineitem, true]).l_extendedprice AS l_extendedprice#48, assertnotnull(input[0, main.scala.Lineitem, true]).l_discount AS l_discount#49, assertnotnull(input[0, main.scala.Lineitem, true]).l_tax AS l_tax#50, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, main.scala.Lineitem, true]).l_returnflag, true, false) AS l_returnflag#51, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, main.scala.Lineitem, true]).l_linestatus, true, false) AS l_linestatus#52, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, main.scala.Lineitem, true]).l_shipdate, true, false) AS l_shipdate#53, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, main.scala.Lineitem, true]).l_commitdate, true, false) AS l_commitdate#54, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, main.scala.Lineitem, true]).l_receiptdate, true, false) AS l_receiptdate#55, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, main.scala.Lineitem, true]).l_shipinstruct, true, false) AS l_shipinstruct#56, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, main.scala.Lineitem, true]).l_shipmode, true, false) AS l_shipmode#57, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, main.scala.Lineitem, true]).l_comment, true, false) AS l_comment#58]
               +- Scan[obj#42]

    at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec$$anonfun$getFinalPhysicalPlan$1$$anonfun$apply$4.apply(AdaptiveSparkPlanExec.scala:167)
    ... 51 more
Caused by: com.uber.rss.exceptions.RssException: Do not support speculation in Remote Shuffle Service
    at org.apache.spark.shuffle.RssShuffleManager.registerShuffle(RssShuffleManager.scala:83)
    at org.apache.spark.ShuffleDependency.<init>(Dependency.scala:93)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.prepareShuffleDependency(ShuffleExchangeExec.scala:294)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.shuffleDependency$lzycompute(ShuffleExchangeExec.scala:77)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.shuffleDependency(ShuffleExchangeExec.scala:76)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.mapOutputStatisticsFuture$lzycompute(ShuffleExchangeExec.scala:66)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.mapOutputStatisticsFuture(ShuffleExchangeExec.scala:62)
    at org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.doMaterialize(QueryStageExec.scala:148)
    at org.apache.spark.sql.execution.adaptive.QueryStageExec$$anonfun$materialize$1.apply(QueryStageExec.scala:77)
    at org.apache.spark.sql.execution.adaptive.QueryStageExec$$anonfun$materialize$1.apply(QueryStageExec.scala:77)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:187)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:184)
    at org.apache.spark.sql.execution.adaptive.QueryStageExec.materialize(QueryStageExec.scala:76)
    at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec$$anonfun$getFinalPhysicalPlan$1$$anonfun$apply$4.apply(AdaptiveSparkPlanExec.scala:158)
    ... 51 more
hiboyang commented 2 years ago

This is caused by this line of code: https://github.com/uber/RemoteShuffleService/blob/607358d97dc17e1d4e8d29b145e43248ac9ef6dd/src/main/scala/org/apache/spark/shuffle/RssShuffleManager.scala#L83

Please feel free to remove it and submit a PR. It was added in very early version of RSS, and now should be safe to remove it.