NVIDIA / spark-rapids

Spark RAPIDS plugin - accelerate Apache Spark with GPUs
https://nvidia.github.io/spark-rapids
Apache License 2.0
821 stars 235 forks source link

[BUG] executor crash intermittantly in scala2.13-built spark332 integration tests #9659

Closed pxLi closed 11 months ago

pxLi commented 1 year ago

Describe the bug After set up regular integrations tests CI for scala2.13-built plugin (built with default jdk8, run with java 17 runtime) we found intermittent failures of executor crashed no clear reasons in spark 332 IT (spark340 passed fine)

  1. 3 runs failed with different test cases
  2. from the executor logs, they showed nothing about why it got killed
  3. from the pod level memory dashboard, the memory footprint is OK (peak ~37GB and the limitation is 60GB)
  4. All 3 failed cases were on different hosts
FAILED ../../src/main/python/window_function_test.py::test_decimal_running_sum_window[Decimal(12,2)][INJECT_OOM, IGNORE_ORDER] - py4j.protocol.Py4JJavaError: An error occurred while calling o1350645.collectToPython.
[2023-11-04T12:55:43.678Z] : org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 24613.0 failed 1 times, most recent failure: Lost task 2.0 in stage 24613.0 (TID 172988) (100.103.204.47 executor 1): ExecutorLostFailure (executor 1 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 163377 ms
FAILED ../../src/main/python/delta_lake_merge_test.py::test_delta_merge_match_delete_only[10-['a', 'b']-True-(range(0, 10), range(0, 20))][INJECT_OOM, IGNORE_ORDER, ALLOW_NON_GPU(DeserializeToObjectExec,ShuffleExchangeExec,FileSourceScanExec,FilterExec,MapPartitionsExec,MapElementsExec,ObjectHashAggregateExec,ProjectExec,SerializeFromObjectExec,SortExec)] - py4j.protocol.Py4JJavaError: An error occurred while calling o21328.save.
[2023-11-07T15:32:31.586Z] E                   Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 6 in stage 1481.0 failed 1 times, most recent failure: Lost task 6.0 in stage 1481.0 (TID 20424) (100.101.238.63 executor 0): ExecutorLostFailure (executor 0 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 130696 ms
FAILED ../../src/main/python/delta_lake_update_test.py::test_delta_update_partitions[['a']-True][INJECT_OOM, IGNORE_ORDER, ALLOW_NON_GPU(DeserializeToObjectExec,ShuffleExchangeExec,FileSourceScanExec,FilterExec,MapPartitionsExec,MapElementsExec,ObjectHashAggregateExec,ProjectExec,SerializeFromObjectExec,SortExec)] - py4j.protocol.Py4JJavaError: An error occurred while calling o28613.save.
[2023-11-07T20:03:22.336Z] : org.apache.spark.SparkException: Job aborted due to stage failure: Task 45 in stage 2227.0 failed 1 times, most recent failure: Lost task 45.0 in stage 2227.0 (TID 27324) (100.102.147.205 executor 0): ExecutorLostFailure (executor 0 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 166113 ms

failed spark332 image image

passed spark340 image

Steps/Code to reproduce bug rerun internal rapids_integration-scala213-dev-github pipeline (~50% reproduciable)

Expected behavior passed the test

Environment details (please complete the following information)

Additional context Add any other context about the problem here.

jlowe commented 1 year ago

Saw another one of these for test_repartition_df_for_round_robin, also running for spark332.

jlowe commented 11 months ago

IIUC this is failing in local mode, and it's pretty odd to get an executor heartbeat timeout when the driver and executor are in the same JVM instance. What might be happening here is that the JVM is GC'ing a lot, causing executor heartbeat timers to expire before the executor can heartbeat in time. We saw instances of this with #9829, and I'm wondering if the fix for it in #9944 would also address this issue. Unfortunately #9944 only went into 24.02 so far, so we'd have to check the 24.02 pipelne for scala2.13 to see if there's any improvement going forward.

NVnavkumar commented 11 months ago

IIUC this is failing in local mode, and it's pretty odd to get an executor heartbeat timeout when the driver and executor are in the same JVM instance. What might be happening here is that the JVM is GC'ing a lot, causing executor heartbeat timers to expire before the executor can heartbeat in time. We saw instances of this with #9829, and I'm wondering if the fix for it in #9944 would also address this issue. Unfortunately #9944 only went into 24.02 so far, so we'd have to check the 24.02 pipelne for scala2.13 to see if there's any improvement going forward.

Unfortunately, I think this failure has started to creep into pre-release pipelines which are running integration tests with jenkins/spark-tests.sh which is not local mode as far as I can tell. So I don't think it's specific to local mode. I'm still trying to evalute the garbage collection behavior in scala 2.12 vs 2.13 for Spark 3.3.2 here.

As far as significant different between the 3.3.2 shim and 3.4.0 shim (where the issue doesn't seem to occur), there are the shimmed versions of RapidsShuffleIterator and RapidsCachingReader

https://github.com/NVIDIA/spark-rapids/blame/branch-23.12/sql-plugin/src/main/spark311/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleIterator.scala

https://github.com/NVIDIA/spark-rapids/blame/branch-23.12/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/rapids/RapidsCachingReader.scala

https://github.com/NVIDIA/spark-rapids/blame/branch-23.12/sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleIterator.scala

https://github.com/NVIDIA/spark-rapids/blame/branch-23.12/sql-plugin/src/main/spark340/scala/org/apache/spark/sql/rapids/RapidsCachingReader.scala

In 3.4.0, RapidsShuffleIterator uses collection.Seq (the mutable Seq in both Scala versions), and in 3.3.2 it uses scala.Seq (which is mutable in Scala 2.12, but immutable in Scala 2.13). Spark made the change in 3.4.0 to start using collection.Seq in the Shuffle

abellina commented 11 months ago

The tests described above I don't think run under the shuffle, so I am not sure that these classes are getting instantiated.

abellina commented 11 months ago

In 3.4.0, RapidsShuffleIterator uses collection.Seq (the mutable Seq in both Scala versions), and in 3.3.2 it uses scala.Seq (which is mutable in Scala 2.12, but immutable in Scala 2.13). Spark made the change in 3.4.0 to start using collection.Seq in the Shuffle

That's news to me, but don't we always use Seq in an immutable way? Even if we relied on some mutable methods, if this changed between scala versions, wouldn't scala 2.13 fail to compile that code?

NVnavkumar commented 11 months ago

In 3.4.0, RapidsShuffleIterator uses collection.Seq (the mutable Seq in both Scala versions), and in 3.3.2 it uses scala.Seq (which is mutable in Scala 2.12, but immutable in Scala 2.13). Spark made the change in 3.4.0 to start using collection.Seq in the Shuffle

That's news to me, but don't we always use Seq in an immutable way? Even if we relied on some mutable methods, if this changed between scala versions, wouldn't scala 2.13 fail to compile that code?

So we don't quite do that in all places in the code. There were a lot of places in the code where we returned something like a scala.collection.ArrayBuffer as a scala.Seq, which would compile in Scala 2.12 but fail to compile in 2.13. Scala 2.13 would definitely fail to compile unless we added a toSeq call to the return value. In 2.13, this could potentially create a new sequence that is copied, in 2.12 this is usually a no-op which returns the existing collection.

With these new sequences being created in 2.13, it's very conceivable that there is a lot more garbage collection going on. It's possible that there will have to be further re-writing of the code for better performance under Scala 2.13.

What is interesting about these Rapids classes is not the plugins' usage of them, but the underlying Spark functionality which these graft onto. One of the big 3.4.0 changes was to start using collection.Seq in the Apache Spark shuffle code to make things a lot more efficient in terms of memory usage and speed when compiled under Scala 2.13. scala.collection.Seq exists in both Scala versions, so the code would behave exactly the same in both Scala languages.

abellina commented 11 months ago

Seeing the commit description here https://github.com/apache/spark/commit/66c6aab8f8b5168a77c86df9fa043068fc8b4b72 they describe something like .length would mean a copy in some cases. That does seem really slow and we should fix this, but I need to look at it a bit more.

Note that the shuffle classes you pointed at (RapidsCachingReader and RapidsShuffleIterator) are only for the UCX shuffle, so I am not sure they related to this issue at hand, not to say that they shouldn't change to use collection.Seq as well, especially if we are building those spark artifacts with scala 2.13. If collection.Seq should be used throughout the code we should file an issue and get it done.

BTW, which artifacts (spark versions) are we going to compile against 2.13?

NVnavkumar commented 11 months ago

Seeing the commit description here apache/spark@66c6aab they describe something like .length would mean a copy in some cases. That does seem really slow and we should fix this, but I need to look at it a bit more.

Note that the shuffle classes you pointed at (RapidsCachingReader and RapidsShuffleIterator) are only for the UCX shuffle, so I am not sure they related to this issue at hand, not to say that they shouldn't change to use collection.Seq as well, especially if we are building those spark artifacts with scala 2.13. If collection.Seq should be used throughout the code we should file an issue and get it done.

BTW, which artifacts (spark versions) are we going to compile against 2.13?

Officially we support Spark 3.3.0 and higher with Scala 2.13

NVnavkumar commented 11 months ago

Seeing the commit description here apache/spark@66c6aab they describe something like .length would mean a copy in some cases. That does seem really slow and we should fix this, but I need to look at it a bit more.

Note that the shuffle classes you pointed at (RapidsCachingReader and RapidsShuffleIterator) are only for the UCX shuffle, so I am not sure they related to this issue at hand, not to say that they shouldn't change to use collection.Seq as well, especially if we are building those spark artifacts with scala 2.13. If collection.Seq should be used throughout the code we should file an issue and get it done.

BTW, which artifacts (spark versions) are we going to compile against 2.13?

I also filed https://github.com/NVIDIA/spark-rapids/issues/9952 to track this update. I do think this needs to be fixed in many places in our code.

jlowe commented 11 months ago

Found an executor log for run 29 of the scala213 dev pipeline, and it's getting shot out of the blue:

23/12/06 10:11:28 INFO CodeGenerator: Code generated in 2.862956 ms
23/12/06 10:11:28 INFO Executor: Finished task 0.0 in stage 30339.0 (TID 96950). 163647 bytes result sent to driver
23/12/06 10:11:40 INFO CoarseGrainedExecutorBackend: Driver commanded a shutdown
23/12/06 10:11:40 INFO RapidsBufferCatalog: Closing storage
23/12/06 10:11:40 WARN GpuDeviceManager: Waiting for outstanding RMM allocations to be released...
23/12/06 10:11:40 ERROR CoarseGrainedExecutorBackend: RECEIVED SIGNAL TERM

@pxLi are we sure there aren't container memory issues? Driver isn't shooting this, so it seems like a cgroup or OOM memory threshold thing that steps in and kills this.

NVnavkumar commented 11 months ago

Found an executor log for run 29 of the scala213 dev pipeline, and it's getting shot out of the blue:

23/12/06 10:11:28 INFO CodeGenerator: Code generated in 2.862956 ms
23/12/06 10:11:28 INFO Executor: Finished task 0.0 in stage 30339.0 (TID 96950). 163647 bytes result sent to driver
23/12/06 10:11:40 INFO CoarseGrainedExecutorBackend: Driver commanded a shutdown
23/12/06 10:11:40 INFO RapidsBufferCatalog: Closing storage
23/12/06 10:11:40 WARN GpuDeviceManager: Waiting for outstanding RMM allocations to be released...
23/12/06 10:11:40 ERROR CoarseGrainedExecutorBackend: RECEIVED SIGNAL TERM

@pxLi are we sure there aren't container memory issues? Driver isn't shooting this, so it seems like a cgroup or OOM memory threshold thing that steps in and kills this.

Should clarify, this is the log that shows this non-graceful kill:

23/12/06 08:53:02 INFO Executor: Running task 0.0 in stage 12267.0 (TID 51059)
23/12/06 08:53:02 INFO TorrentBroadcast: Started reading broadcast variable 14190 with 1 pieces (estimated total size 4.0 MiB)
23/12/06 08:53:02 INFO MemoryStore: Block broadcast_14190_piece0 stored as bytes in memory (estimated size 26.4 KiB, free 1957.1 MiB)
23/12/06 08:53:02 INFO TorrentBroadcast: Reading broadcast variable 14190 took 1 ms
23/12/06 08:53:02 INFO MemoryStore: Block broadcast_14190 stored as values in memory (estimated size 68.8 KiB, free 1957.1 MiB)
23/12/06 08:53:02 ERROR CoarseGrainedExecutorBackend: RECEIVED SIGNAL TERM
in memory (estimated size 68.8 KiB, free 1957.1 MiB)
pxLi commented 11 months ago

Found an executor log for run 29 of the scala213 dev pipeline, and it's getting shot out of the blue:

23/12/06 10:11:28 INFO CodeGenerator: Code generated in 2.862956 ms
23/12/06 10:11:28 INFO Executor: Finished task 0.0 in stage 30339.0 (TID 96950). 163647 bytes result sent to driver
23/12/06 10:11:40 INFO CoarseGrainedExecutorBackend: Driver commanded a shutdown
23/12/06 10:11:40 INFO RapidsBufferCatalog: Closing storage
23/12/06 10:11:40 WARN GpuDeviceManager: Waiting for outstanding RMM allocations to be released...
23/12/06 10:11:40 ERROR CoarseGrainedExecutorBackend: RECEIVED SIGNAL TERM

@pxLi are we sure there aren't container memory issues? Driver isn't shooting this, so it seems like a cgroup or OOM memory threshold thing that steps in and kills this.

From all the failed collected metrics, the memory usage was way below the limitation (60Gi), unless there was a spike which was not caught by the metric collector in time.

Let me ask SRE to help check the failed host to see if any other unexpected processes running which could cause the issue

pxLi commented 11 months ago

I have filed a ticket to SRE, and moved one suspicious bad node out of the pool for now

jlowe commented 11 months ago

It appears the Spark driver+worker is the one responsible for killing the executor. From a recent scala2.13+spark332 run, the worker log has:

23/12/06 21:41:26 INFO Worker: Asked to kill executor app-20231206193627-0000/0
23/12/06 21:41:26 INFO Worker: Asked to launch executor app-20231206193627-0000/1 for rapids spark plugin integration tests (python)
23/12/06 21:41:26 INFO ExecutorRunner: Runner thread for executor app-20231206193627-0000/0 interrupted
23/12/06 21:41:26 INFO ExecutorRunner: Killing process!

and this is what the corresponding executor log has:

23/12/06 21:41:26 INFO CoarseGrainedExecutorBackend: Driver commanded a shutdown
23/12/06 21:41:26 INFO RapidsBufferCatalog: Closing storage
commanded a shutdown

So the issue is that somehow the executor heartbeats are not making it back to the driver. It's likely either the executor heartbeat thread is stuck or died or the driver is somehow not able to receive or process the heartbeat messages in a timely manner.

jlowe commented 11 months ago

Ah, this looks like a smoking gun. In the executor log of the executor that was ultimately killed, I found this:

23/12/06 21:39:18 ERROR Utils: Uncaught exception in thread executor-heartbeater
java.util.ConcurrentModificationException: mutation occurred during iteration
        at scala.collection.mutable.MutationTracker$.checkMutations(MutationTracker.scala:43)
        at scala.collection.mutable.CheckedIndexedSeqView$CheckedIterator.hasNext(CheckedIndexedSeqView.scala:47)
        at scala.collection.mutable.Growable.addAll(Growable.scala:61)
        at scala.collection.mutable.Growable.addAll$(Growable.scala:57)
        at scala.collection.immutable.VectorBuilder.addAll(Vector.scala:1565)
        at scala.collection.immutable.Vector.appendedAll0(Vector.scala:235)
        at scala.collection.immutable.Vector1.appendedAll0(Vector.scala:427)
        at scala.collection.immutable.Vector.appendedAll(Vector.scala:203)
        at scala.collection.immutable.Vector.appendedAll(Vector.scala:113)
        at scala.collection.SeqOps.concat(Seq.scala:187)
        at scala.collection.SeqOps.concat$(Seq.scala:187)
        at scala.collection.AbstractSeq.concat(Seq.scala:1161)
        at scala.collection.IterableOps.$plus$plus(Iterable.scala:726)
        at scala.collection.IterableOps.$plus$plus$(Iterable.scala:726)
        at scala.collection.AbstractIterable.$plus$plus(Iterable.scala:926)
        at org.apache.spark.executor.TaskMetrics.accumulators(TaskMetrics.scala:261)
        at org.apache.spark.executor.Executor.$anonfun$reportHeartBeat$1(Executor.scala:1042)
        at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:563)
        at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:561)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:926)
        at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1036)
        at org.apache.spark.executor.Executor.$anonfun$heartbeater$1(Executor.scala:238)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2066)
        at org.apache.spark.Heartbeater$$anon$1.run(Heartbeater.scala:46)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
        at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
        at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.base/java.lang.Thread.run(Thread.java:840)

Looks like the heartbeat thread died near the time that explains the heartbeat timeout. From the driver exception:

Executor heartbeat timed out after 137823 ms

That means the last heartbeat seen by the driver was approximately at 21:41:26 - 00:02:17 = 21:39:09 which is just before the exception message seen in the executor.

jlowe commented 11 months ago

This is a bug in Spark 3.3.2, fixed in Spark 3.3.3 and Spark 3.4.0 (which explains why we never see it in Spark 3.4), see SPARK-39696. We should move our Scala 2.13 pipelines off of Spark 3.3.2 and use Spark 3.3.3 as the baseline for 3.3.x testing.

NVnavkumar commented 11 months ago

This comment on the PR that fixes the bug confirms the environment (Scala 2.13, JVM 17) this error would show up in https://github.com/apache/spark/pull/37206#issuecomment-1186993269

pxLi commented 11 months ago

This is a bug in Spark 3.3.2, fixed in Spark 3.3.3 and Spark 3.4.0 (which explains why we never see it in Spark 3.4), see SPARK-39696. We should move our Scala 2.13 pipelines off of Spark 3.3.2 and use Spark 3.3.3 as the baseline for 3.3.x testing.

Previously these 2 versions were selected to cover similar dataproc serverless runtimes. Please let us know,

  1. Which shim we should use to cover previous scenarios for DP
  2. Do we also need to remove 333- out of scala213 build and test list https://github.com/NVIDIA/spark-rapids/blob/branch-23.12/scala2.13/pom.xml#L810-L818 https://github.com/NVIDIA/spark-rapids/blob/branch-23.12/scala2.13/pom.xml#L792-L795

@sameerz @SurajAralihalli @NVnavkumar @GaryShen2008 cc @NvTimLiu

NVnavkumar commented 11 months ago

This is a bug in Spark 3.3.2, fixed in Spark 3.3.3 and Spark 3.4.0 (which explains why we never see it in Spark 3.4), see SPARK-39696. We should move our Scala 2.13 pipelines off of Spark 3.3.2 and use Spark 3.3.3 as the baseline for 3.3.x testing.

Previously these 2 versions were selected to cover similar dataproc serverless runtimes. Please let us know,

1. Which shim we should use to cover previous scenarios for DP

2. Do we also need to remove 333- out of scala213 build and test list
   https://github.com/NVIDIA/spark-rapids/blob/branch-23.12/scala2.13/pom.xml#L810-L818
   https://github.com/NVIDIA/spark-rapids/blob/branch-23.12/scala2.13/pom.xml#L792-L795

@sameerz @SurajAralihalli @NVnavkumar @GaryShen2008 cc @NvTimLiu

I think the plan now would be just to replace Spark 3.3.2 scala 2.13 integration test run with Spark 3.3.3, and keep 3.4.0 and 3.5.0 runs as is. I don't think we need to update the pom files at this time (correct me if I'm wrong @jlowe)

jlowe commented 11 months ago

I don't think we need to remove 3.3.2 support, but we cannot reliably test against Apache Spark 3.3.2 due to the known issue

Re: Dataproc serverless, testing against 3.3.3 should be close enough, or we would need to run against a custom Apache Spark 3.3.2 version that has the fix for SPARK-39696 applied.

Re: premerge, I do think we need to modify the pom file to adjust the premerge so we're doing a premerge against 333 instead of 332 due to the known issue with running with Spark 3.3.2 + Scala 2.1.3. I'll post a PR to do that shortly.

pxLi commented 11 months ago

I don't think we need to remove 3.3.2 support, but we cannot reliably test against Apache Spark 3.3.2 due to the known issue

Re: Dataproc serverless, testing against 3.3.3 should be close enough, or we would need to run against a custom Apache Spark 3.3.2 version that has the fix for SPARK-39696 applied.

Re: premerge, I do think we need to modify the pom file to adjust the premerge so we're doing a premerge against 333 instead of 332 due to the known issue with running with Spark 3.3.2 + Scala 2.1.3. I'll post a PR to do that shortly.

Thanks for the update!

Let me close this ticket. Feel free to reopen if require some other ops.