oap-project / raydp

RayDP provides simple APIs for running Spark on Ray and integrating Spark with AI libraries.
Apache License 2.0
293 stars 66 forks source link

Spark DF to Ray Dataset Error #268

Closed andreapiso closed 1 year ago

andreapiso commented 1 year ago

Hi, I am on SparkDP nightly (as i wanted to query hive).

I am not able to convert sparkdp dataframes to ray datasets. Have this error even for simple ones.

for example:

df1 = spark.range(0, 1000)
ds1 = ray.data.from_spark(df1)

Produces:

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
Input In [41], in <cell line: 1>()
----> 1 ds1 = ray.data.from_spark(df1)

File ~/.local/lib/python3.9/site-packages/ray/data/read_api.py:860, in from_spark(df, parallelism)
    846 """Create a dataset from a Spark dataframe.
    847 
    848 Args:
   (...)
    856     Dataset holding Arrow records read from the dataframe.
    857 """
    858 import raydp
--> 860 return raydp.spark.spark_dataframe_to_ray_dataset(df, parallelism)

File ~/.local/lib/python3.9/site-packages/raydp/spark/dataset.py:178, in spark_dataframe_to_ray_dataset(df, parallelism, _use_owner)
    176     if parallelism != num_part:
    177         df = df.repartition(parallelism)
--> 178 blocks, _ = _save_spark_df_to_object_store(df, False, _use_owner)
    179 return from_arrow_refs(blocks)

File ~/.local/lib/python3.9/site-packages/raydp/spark/dataset.py:152, in _save_spark_df_to_object_store(df, use_batch, _use_owner)
    150 jvm = df.sql_ctx.sparkSession.sparkContext._jvm
    151 jdf = df._jdf
--> 152 object_store_writer = jvm.org.apache.spark.sql.raydp.ObjectStoreWriter(jdf)
    154 if _use_owner is True:
    155     records = object_store_writer.save(use_batch, RAYDP_OBJ_HOLDER_NAME)

TypeError: 'JavaPackage' object is not callable

Do I have something wrong in my Spark configuration? Other conversions, like pandas on spark, work fine.

carsonwang commented 1 year ago

Looks like the JAR was not correctly loaded. Did you install RayDP and pySpark through pip? If not, can you please share how you setup the environment?

andreapiso commented 1 year ago

Thanks for getting back so quickly!! Yes I install both with PIP.

My environment is a cluster where there is a pre-installed Spark on K8s setup. However Spark is 2.4 so instead I am using pip to install pyspark 3.2 and raydp nightly in my conda environment - and setting spark conf to a different folder so that I can use local spark instead.

These are the variables I am setting:

SPARK_HOME: pointing to .local/lib/python3.9/site-packages/pyspark/ which i installed with pip PYSPARK_DRIVER_PYTHON=python3 PYSPARK_PYTHON=python3 #to get these to point to the same python 3.9 where i installed pyspark and sparkDP SPARK_CONF_DIR=/home/ray_spark/conf/ #my custom spark conf where I have spark-defaults.conf and spark-env.sh where i can add all the hadoop environment variables and kerberos authentication settings.

The connection to hive works perfectly, and I can extract the data. Issue happens when i try to convert to Ray dataset.

Do these JARs need to be loaded manually? (if so, which JARs should i load?) I try to print the CLASSPATH in the notebook after seeing the error and i see it's empty (not sure if the spark process sees something different...)

andreapiso commented 1 year ago

Hi - I tried to load the RayDP JAR manually in spark-defaults.conf:

spark.driver.extraClassPath=/home/.local/lib/python3.9/site-packages/raydp/jars/raydp-0.5.0-SNAPSHOT.jar
spark.executor.extraClassPath=/home/.local/lib/python3.9/site-packages/raydp/jars/raydp-0.5.0-SNAPSHOT.jar

Now the error is different:

Py4JError: org.apache.spark.sql.raydp.ObjectStoreWriter does not exist in the JVM

Is there a list of JARs that need to be included?

andreapiso commented 1 year ago

Looks like we solved the problem! Apparently the issue was that i was calling once SparkContext() before setting up the SparkDP - apparently that messes things up.

kira-lin commented 1 year ago

Glad you solved it! Do you mean you create a SparkContext before using raydp to create it?

andreapiso commented 1 year ago

yes - if you somehow write:

from pyspark import SparkContext

SparkContext()

import ray
import raydp

ray.init()
spark = raydp.init_spark(...)

things work when you stay in the spark world but will break when you try to interact with Ray.

PS: I had no reason to use spark context - i was just using it to make sure spark was running properly (e.g. my local spark instead of the kubernetes one in the cluster).