oap-project / raydp

RayDP provides simple APIs for running Spark on Ray and integrating Spark with AI libraries.
Apache License 2.0
293 stars 66 forks source link

Threads hang on getting from object store when calling `to_spark` #348

Open Deegue opened 1 year ago

Deegue commented 1 year ago

We initialize a spark instance by raydp.init_spark('benchmark', 1, 5, '30G', configs={"spark.default.parallelism": 50}), Then save the dataframe to object store:

    blocks, _ = raydp.spark.dataset._save_spark_df_to_object_store(df, False)
    ds = ray.data.from_arrow_refs(blocks)

Problem occurs when we invoke ds.to_spark(spark), it will hang on getting object from object store:

"Executor task launch worker for task 27.0 in stage 6.0 (TID 179)" #60 daemon prio=5 os_prio=0 tid=0x00007fe38020b800 nid=0x5057 runnable [0x00007ff6563f8000]
   java.lang.Thread.State: RUNNABLE
        at io.ray.runtime.object.NativeObjectStore.nativeGet(Native Method)
        at io.ray.runtime.object.NativeObjectStore.getRaw(NativeObjectStore.java:53)
        at io.ray.runtime.object.ObjectStore.get(ObjectStore.java:131)
        at io.ray.runtime.AbstractRayRuntime.get(AbstractRayRuntime.java:144)
        at io.ray.runtime.AbstractRayRuntime.get(AbstractRayRuntime.java:125)
        at io.ray.runtime.AbstractRayRuntime.get(AbstractRayRuntime.java:120)
        at io.ray.api.Ray.get(Ray.java:98)
        at io.ray.runtime.object.ObjectRefImpl.get(ObjectRefImpl.java:77)
        - locked <0x0000000739b0c2d0> (a io.ray.runtime.object.ObjectRefImpl)
        at org.apache.spark.sql.raydp.ObjectStoreReader$.getBatchesFromStream(ObjectStoreReader.scala:108)
        at org.apache.spark.rdd.RayDatasetRDD.compute(RayDatasetRDD.scala:61)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
        at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
        at org.apache.spark.scheduler.Task.run(Task.scala:136)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
        at org.apache.spark.executor.Executor$TaskRunner$$Lambda$514/236884660.apply(Unknown Source)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

However, when we set spark.default.parallelism to a lower value like 1, the problem gone.