oap-project / raydp

RayDP provides simple APIs for running Spark on Ray and integrating Spark with AI libraries.
Apache License 2.0
293 stars 66 forks source link

Executor lost triggered by JVM OOM can't recover and cause subsequent steps stuck #364

Closed pang-wu closed 6 months ago

pang-wu commented 11 months ago

We found that if during execution the task get OutOfMemoryError and executor was lost, the Spark driver won't request new executors to backfill the ones killed by OOM. The following step could reproduce this error:

  1. Create a dataset (could be a table, or parquet file) with large column which can't fit into your spark job's memory.
  2. Start a small cluster, say with 3 executors with small memory that you know will OOM when persist the rdd. Load the data and persist:
    
    from pyspark import StorageLevel

df = spark.sql("select * from some_big_table") rdd = df.rdd rdd.persist(StorageLevel.MEMORY_AND_DISK) rdd.count()


You should see errors similar to following:

23/07/10 04:54:29 WARN TaskSetManager: Lost task 1.0 in stage 8.0 (TID 105) (10.191.8.91 executor 6): java.lang.OutOfMemoryError: Java heap space at org.apache.parquet.bytes.BytesInput$StreamBytesInput.toByteArray(BytesInput.java:285) at org.apache.parquet.bytes.BytesInput.toByteBuffer(BytesInput.java:237) ...

23/07/10 04:54:29 ERROR TaskSchedulerImpl: Lost executor 6 on 10.191.8.91: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.

.... 23/07/10 04:54:29 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_13_6 ! 23/07/10 04:54:29 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_13_7 ! 23/07/10 04:54:29 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_13_8 !



Cluster will stuck in a bad state where it shows there are active tasks but all executors dead (if OOM happens on all executor, otherwise there would be executor without task scheduled alive):

<img width="1708" alt="Screenshot 2023-07-09 at 10 05 52 PM" src="https://github.com/oap-project/raydp/assets/104795337/6131c59f-5444-46e4-9a74-15051d115142">

This problem will impact some of the fault tolerant feature like `raydp.spark.from_spark_recoverable(df)` as OOM could happen inside the call -- this is how we found a way to reproduce this issue.

We can reproduce this using the following setup:
RayDP build from master branch 
Spark 3.3.2
Ray 2.4.0/2.3.1

@carsonwang
kira-lin commented 11 months ago

Hi @pang-wu , Are you using fault tolerant mode? Is dynamic allocation enabled? If using fault tolerant mode, executors will by restarted by Ray if failed. What behavior did you expect?

pang-wu commented 11 months ago

@kira-lin I tried with both fault tolerant mode with/out dynamic allocation, the result is the same. You can use the example code to reproduce the issue, it is actually not related to from_spark_recoverable.

Here is the code on how I init spark, we have other spark properties specify in the spark property file, I can provide that if needed:

executor_count = 3
spark = raydp.init_spark(app_name='RayDP Example',
                         num_executors=executor_count,
                         executor_cores=3,
                         executor_memory=1 * 1024 * 1024 * 1024,
                         enable_hive=True,  # Required for AWS GlueCatalog
                         configs = {
                             'spark.dynamicAllocation.enabled': 'true',
                             'spark.dynamicAllocation.maxExecutors': executor_count,
                             'spark.dynamicAllocation.shuffleTracking.enabled':'true',
                             'spark.dynamicAllocation.cachedExecutorIdleTimeout': '10min'
                         })
pang-wu commented 11 months ago

It looks like once a spark executor is lost, the driver won't request another one from Ray. The only way to exit this state is call raydp.stop_spark()

kira-lin commented 11 months ago

In order to use from_spark_recoverable(), you need to set fault_tolerant_mode=True when calling init_spark. Did you try it?

Even when fault_tolerant_mode is set, executors is not requested by driver, it's restarted by Ray. Ray found that an actor(executor) is lost, it'll try to restart it when resource is available.

pang-wu commented 11 months ago

@kira-lin Fault tolerant is one example usecase trigger this scenario, but the problem is not related fault tolerant: We observe this issue happening in our production jobs with different queries that trigger OOM. In the example code the issue -- a persist+count call can throw the cluster out.

Ray found that an actor(executor) is lost, it'll try to restart it when resource is available.

This doesn't seem to happen, at least not in the OOM caused by Spark executor case. We do see Ray actor recreate on other failures like node lost though.

kira-lin commented 11 months ago

Yes, RayDP won't require new executors if executors are lost. The implementation of schedule function does not require more executors when there are available resources.

When is this needed? Previously I thought if an executor gets OOM, then it'll get OOM again if restarted.

pang-wu commented 11 months ago

@kira-lin

The implementation of schedule function does not require more executors when there are available resources.

When you say available resources, do you mean there are available executors? What I observed is after all executors get lost, the Spark cluster has no active executor actors except those from driver/AM, and any new jobs/stages submitted to that spark cluster won't trigger new executor allocation request, those new stages/jobs will stuck there -- I think this is not what we want?

When the tasks get OOM, it will retry X times (X depends on task retry config) if all tasks failed, it will trigger stage retry Y times (again, Y depends on config), then the stage will abort -- and you are right, if the task OOM no matter how many times it retries, it will get the same error. But after the job abort, shouldn't Spark requests new executors to backfill the dead ones so the number of executor is either the minExecutor(if dynamic allocation enabled) or total executor set in the configuration? If I remember correctly this is how the behavior on other cluster manager like YARN (Please correct me if I am wrong here)?

kira-lin commented 11 months ago

If I remember correctly this is how the behavior on other cluster manager like YARN

Yes, I think you are right. I remember that their schedule function will be called periodically, and if number of executors is less than configured, new ones will be required.

So I think the main problem is that it'll get stuck instead of abort, right? When you say new job, do you mean you are submitting multiple jobs to one RayDP session? How did you connect to it?

pang-wu commented 11 months ago

@kira-lin

So I think the main problem is that it'll get stuck instead of abort, right?

The stage/job will abort, but the main problem is the cluster is no longer usable without explicitly stop/start Spark after the stage aborted. This is problematic in the following scenarios:

When you say new job, do you mean you are submitting multiple jobs to one RayDP session? How did you connect to it?

There are two scenarios:

  1. User use the same spark session in a Jupyter notebook for interactive development, when a cell caused Spark cluster stuck, user cannot submit other spark code unless calling stop and init_spark again.
  2. For people running Thrift on Spark (using RayDP) and use thrift cluster as a query engine serving BI tools as a client like Apache Superset via connector like pyhive, it will cause all subsequent query hanging because all executor died in Thrift cluster.

We are observed both of them during our usage of the library.

I remember that their schedule function will be called periodically, and if number of executors is less than configured, new ones will be required.

Can we do the same, like maintaining a counter loop in driver actor? We can work together to test this out -- I am willing to contribute as well.

kira-lin commented 11 months ago

To align with Spark's behavior, we can refer to the implementation of standalone mode. Namely, we need to monitor how many executors are dead, and require more executors if needed in schedule() function. The schedule function should be called in more functions. Are you willing to submit a PR? @pang-wu

pang-wu commented 11 months ago

@kira-lin Yes I am glad to, let me take a look at the code first then I will get back to you

pang-wu commented 6 months ago

This issue is fixed by https://github.com/oap-project/raydp/pull/391 closing.