oap-project / raydp

RayDP provides simple APIs for running Spark on Ray and integrating Spark with AI libraries.
Apache License 2.0
308 stars 68 forks source link

ray.get() dead-locks in applyInPandas() #275

Closed Hoeze closed 1 year ago

Hoeze commented 2 years ago

The following code snipped dead-locks:

import numpy as np
import pandas as pd
import ray
import raydp

import pyspark
import pyspark.sql.types as t
import pyspark.sql.functions as f

spark = raydp.init_spark(num_executors=1, executor_cores=1, executor_memory="4G", app_name="raydp-test")

df = spark.createDataFrame(pd.DataFrame({"x": np.arange(10), "y": np.arange(10) % 3}))
df.toPandas()

obj_ref = ray.put("asdf")
obj_ref

groupby_columns = ["y"]
mapped_df = df.groupby(groupby_columns).applyInPandas(
    func=lambda df: df.assign(z=ray.get(obj_ref)),
    schema=t.StructType([
        *[df.schema[k] for k in groupby_columns],
        t.StructField("z", t.StringType()),
    ])
)

mapped_df.toPandas()

grafik

What is my mistake?

Hoeze commented 2 years ago

It seems like the applyInPandas function starts a new local cluster in every evaluation. How can I make it re-use the actors' cluster connection?

kira-lin commented 1 year ago

hi @Hoeze, applyInPandas will start python workers, and these workers are not connected to ray. Actor itself is a process, so it's not quite possible to 'reuse' its session. In addition, I think connecting to ray in each python worker is fine, the problem should be that they are not able to get the obj_ref, because it's not registered in their session. To solve this problem, I suggest defining an actor to hold all the object refs, and let the pyspark python workers connect to ray using the same namespace as the driver progoram. Then, these workers can get the actor by name and get the objects from it.

You can refer to our _convert_by_udf function in python/raydp/spark/dataset.py.

kira-lin commented 1 year ago

close as stale