oap-project / raydp

RayDP provides simple APIs for running Spark on Ray and integrating Spark with AI libraries.
Apache License 2.0
308 stars 68 forks source link

RayDP on Databricks #254

Closed LAITRUNGMINHDUC closed 1 year ago

LAITRUNGMINHDUC commented 2 years ago

Hi all,

I read on Databricks websites - and I learn about RayDP: https://databricks.com/session_na21/build-large-scale-data-analytics-and-ai-pipeline-using-raydp

I tried to make this example works, but the system crash at the first line: spark = raydp.init_spark(......)

The error keeps saying:"java gateway process exited before sending its port number".

Could you please share your thinking on how I could setup RayDP on Databricks? Thank you so much.

My current system:

carsonwang commented 2 years ago

Hi @LAITRUNGMINHDUC, did you start a Ray cluster on a Databricks Spark cluster and then want to use RayDP? RayDP is Spark on Ray so it makes it look like Spark on Ray on Spark. Usually you can just start a Ray cluster in cloud and use RayDP to run the Spark program and connect with other Ray components. Could you please share more about your use case that you want to run on Databricks?

LAITRUNGMINHDUC commented 2 years ago

Hi @carsonwang, thank you for your explanation.

My use-case is... I want to test for the Spark SQL implementation on RayDP - cause the current solution on Spark takes so much time to run. My data scientist works mainly on SQL - so migrating the solution into full Python and run-on Ray is hard.

For more details on my setup, I use the Init-script that is shared on this website to initialize the system. I also tried the single-node installation, but nothing works. Link: https://databricks.com/blog/2021/11/19/ray-on-databricks.html

Here is my sample code:

import sys; sys.stdout.fileno = lambda: False
import ray
import raydp
ray.init(address='auto')

spark = raydp.init_spark(
  app_name = "myApp",
  num_executors = 2,
  executor_cores = 4,
  executor_memory = "28GB"
)

And here is the error:

Exception                                 Traceback (most recent call last)
<command-1007474167784282> in <module>
----> 1 spark = raydp.init_spark(
      2   app_name = "myApp",
      3   num_executors = 2,
      4   executor_cores = 4,
      5   executor_memory = "28GB"

/local_disk0/.ephemeral_nfs/envs/pythonEnv-c59a3514-65dc-4193-9978-669f48ca0e72/lib/python3.8/site-packages/raydp/context.py in init_spark(app_name, num_executors, executor_cores, executor_memory, configs)
    124                 _global_spark_context = _SparkContext(
    125                     app_name, num_executors, executor_cores, executor_memory, configs)
--> 126                 return _global_spark_context.get_or_create_session()
    127             except:
    128                 _global_spark_context = None

/local_disk0/.ephemeral_nfs/envs/pythonEnv-c59a3514-65dc-4193-9978-669f48ca0e72/lib/python3.8/site-packages/raydp/context.py in get_or_create_session(self)
     68             return self._spark_session
     69         self.handle = RayDPConversionHelper.options(name=RAYDP_OBJ_HOLDER_NAME).remote()
---> 70         spark_cluster = self._get_or_create_spark_cluster()
     71         self._spark_session = spark_cluster.get_spark_session(
     72             self._app_name,

/local_disk0/.ephemeral_nfs/envs/pythonEnv-c59a3514-65dc-4193-9978-669f48ca0e72/lib/python3.8/site-packages/raydp/context.py in _get_or_create_spark_cluster(self)
     61         if self._spark_cluster is not None:
     62             return self._spark_cluster
---> 63         self._spark_cluster = SparkCluster(self._configs)
     64         return self._spark_cluster
     65 

/local_disk0/.ephemeral_nfs/envs/pythonEnv-c59a3514-65dc-4193-9978-669f48ca0e72/lib/python3.8/site-packages/raydp/spark/ray_cluster.py in __init__(self, configs)
     32         self._app_master_bridge = None
     33         self._configs = configs
---> 34         self._set_up_master(None, None)
     35         self._spark_session: SparkSession = None
     36 

/local_disk0/.ephemeral_nfs/envs/pythonEnv-c59a3514-65dc-4193-9978-669f48ca0e72/lib/python3.8/site-packages/raydp/spark/ray_cluster.py in _set_up_master(self, resources, kwargs)
     38         # TODO: specify the app master resource
     39         self._app_master_bridge = RayClusterMaster(self._configs)
---> 40         self._app_master_bridge.start_up()
     41 
     42     def _set_up_worker(self, resources: Dict[str, float], kwargs: Dict[str, str]):

/local_disk0/.ephemeral_nfs/envs/pythonEnv-c59a3514-65dc-4193-9978-669f48ca0e72/lib/python3.8/site-packages/raydp/spark/ray_cluster_master.py in start_up(self, popen_kwargs)
     52             return
     53         extra_classpath = os.pathsep.join(self._prepare_jvm_classpath())
---> 54         self._gateway = self._launch_gateway(extra_classpath, popen_kwargs)
     55         self._app_master_java_bridge = self._gateway.entry_point.getAppMasterBridge()
     56         self._set_properties()

/local_disk0/.ephemeral_nfs/envs/pythonEnv-c59a3514-65dc-4193-9978-669f48ca0e72/lib/python3.8/site-packages/raydp/spark/ray_cluster_master.py in _launch_gateway(self, class_path, popen_kwargs)
    116 
    117             if not os.path.isfile(conn_info_file):
--> 118                 raise Exception("Java gateway process exited before sending its port number")
    119 
    120             with open(conn_info_file, "rb") as info:

Exception: Java gateway process exited before sending its port number
kira-lin commented 2 years ago

hi @LAITRUNGMINHDUC , RayDP does not has its own SQL implementation for now, I think the performance would be very similar to vanilla spark. If you want to have better SQL performance, maybe you can try our another project, gazelle.

I can't identify the problem with the error you provide. Can you please run ps aux| grep raydp, and shutdown the related process(a java process, main class is AppMasterEntryPoint), then try again? And what's your ray and spark version?

kira-lin commented 1 year ago

close as stale