raydp.init_spark fails #404

Open Mayurji opened 2 months ago

Mayurji commented 2 months ago

We are trying to setup RayDP in a Ray Cluster(Remote). We have created a Dockerfile with Java and Ray, and verified if JAVA_HOME is set and Java is installed


FROM rayproject/ray:2.9.0

USER root

SHELL ["/bin/bash", "-c"]

RUN apt-get update && \
      apt-get -y install sudo

RUN pip install playwright && \
    playwright install && \
    playwright install --with-deps

RUN apt-get update && \
    apt-get install -y openjdk-8-jdk && \
    apt-get install -y default-jre && \
    apt-get clean && \
    rm -rf /var/lib/apt/lists/ && \
    rm -rf /var/cache/oracle-jdk8-installer;

ENV JAVA_HOME /usr/lib/jvm/java-8-openjdk-amd64/

ENV PYSPARK_SUBMIT_ARGS="--master local[2] pyspark-shell"

In Local Environment, we install the following packages and execute raydp code:

!pip install ray[all]==2.9.0 raydp==1.6.0 pyspark==3.3.2

### we connect to remote ray cluster, which uses the docker image developed from above Dockerfile.
import ray
         runtime_env={"pip": ["raydp==1.6.0","ipython","pandas==1.2.4",'torch', 'pyspark==3.3.2']})

import raydp

spark = raydp.init_spark(app_name='RayDP Example 2',

# normal data processesing with Spark
df = spark.createDataFrame([('look',), ('spark',), ('tutorial',), ('spark',), ('look', ), ('python', )], ['word'])
word_count = df.groupBy('word').count()

# stop the spark cluster

While running the above code, we receive the error as follows:

(RayDPSparkMaster pid=504487, ip=192.168.xxx.xxx) Error occurred during initialization of VM
(RayDPSparkMaster pid=504487, ip=192.168.xxx.xxx) agent library failed to init: instrument
(RayDPSparkMaster pid=504487, ip=192.168.xxx.xxx) Error opening zip file or JAR manifest missing : /opt/conda/lib/python3.8/site-packages/raydp/jars/raydp-agent-1.6.0-SNAPSHOT.jar
RayTaskError                              Traceback (most recent call last)
Cell In[2], line 3
      1 import raydp
----> 3 spark = raydp.init_spark(app_name='RayDP Example 2',
      4                          num_executors=2,
      5                          executor_cores=2,
      6                          executor_memory='4GB')
      8 # normal data processesing with Spark
      9 df = spark.createDataFrame([('look',), ('spark',), ('tutorial',), ('spark',), ('look', ), ('python', )], ['word'])

File /opt/conda/lib/python3.8/site-packages/raydp/context.py:215, in init_spark(app_name, num_executors, executor_cores, executor_memory, enable_hive, fault_tolerant_mode, placement_group_strategy, placement_group, placement_group_bundle_indexes, configs)
    207 try:
    208     _global_spark_context = _SparkContext(
    209         app_name, num_executors, executor_cores, executor_memory, enable_hive,
    210         fault_tolerant_mode,
    213         placement_group_bundle_indexes,
    214         configs)
--> 215     return _global_spark_context.get_or_create_session()
    216 except:
    217     if _global_spark_context is not None:

File /opt/conda/lib/python3.8/site-packages/raydp/context.py:121, in _SparkContext.get_or_create_session(self)
    119     return self._spark_session
    120 self._prepare_placement_group()
--> 121 spark_cluster = self._get_or_create_spark_cluster()
    122 self._spark_session = spark_cluster.get_spark_session()
    123 if self._fault_tolerant_mode:

File /opt/conda/lib/python3.8/site-packages/raydp/context.py:86, in _SparkContext._get_or_create_spark_cluster(self)
     84 if self._spark_cluster is not None:
     85     return self._spark_cluster
---> 86 self._spark_cluster = SparkCluster(self._app_name,
     87                                    self._num_executors,
     88                                    self._executor_cores,
     89                                    self._executor_memory,
     90                                    self._enable_hive,
     91                                    self._configs)
     92 return self._spark_cluster

File /opt/conda/lib/python3.8/site-packages/raydp/spark/ray_cluster.py:52, in SparkCluster.__init__(self, app_name, num_executors, executor_cores, executor_memory, enable_hive, configs)
     50 self._configs = configs
     51 self._prepare_spark_configs()
---> 52 self._set_up_master(resources=self._get_master_resources(self._configs), kwargs=None)
     53 self._spark_session: SparkSession = None

File /opt/conda/lib/python3.8/site-packages/raydp/spark/ray_cluster.py:72, in SparkCluster._set_up_master(self, resources, kwargs)
     68 else:
     69     self._spark_master_handle = RayDPSparkMaster.options(name=spark_master_name) \
     70         .remote(self._configs)
---> 72 ray.get(self._spark_master_handle.start_up.remote())

File /opt/conda/lib/python3.8/site-packages/ray/_private/auto_init_hook.py:22, in wrap_auto_init.<locals>.auto_init_wrapper(*args, **kwargs)
     19 @wraps(fn)
     20 def auto_init_wrapper(*args, **kwargs):
     21     auto_init_ray()
---> 22     return fn(*args, **kwargs)

File /opt/conda/lib/python3.8/site-packages/ray/_private/client_mode_hook.py:102, in client_mode_hook.<locals>.wrapper(*args, **kwargs)
     98 if client_mode_should_convert():
     99     # Legacy code
    100     # we only convert init function if RAY_CLIENT_MODE=1
    101     if func.__name__ != "init" or is_client_mode_enabled_by_default:
--> 102         return getattr(ray, func.__name__)(*args, **kwargs)
    103 return func(*args, **kwargs)

File /opt/conda/lib/python3.8/site-packages/ray/util/client/api.py:42, in _ClientAPI.get(self, vals, timeout)
     35 def get(self, vals, *, timeout=None):
     36     """get is the hook stub passed on to replace `ray.get`
     38     Args:
     39         vals: [Client]ObjectRef or list of these refs to retrieve.
     40         timeout: Optional timeout in milliseconds
     41     """
---> 42     return self.worker.get(vals, timeout=timeout)

File /opt/conda/lib/python3.8/site-packages/ray/util/client/worker.py:434, in Worker.get(self, vals, timeout)
    432     op_timeout = max_blocking_operation_time
    433 try:
--> 434     res = self._get(to_get, op_timeout)
    435     break
    436 except GetTimeoutError:

File /opt/conda/lib/python3.8/site-packages/ray/util/client/worker.py:462, in Worker._get(self, ref, timeout)
    460         logger.exception("Failed to deserialize {}".format(chunk.error))
    461         raise
--> 462     raise err
    463 if chunk.total_size > OBJECT_TRANSFER_WARNING_SIZE and log_once(
    464     "client_object_transfer_size_warning"
    465 ):
    466     size_gb = chunk.total_size / 2**30

RayTaskError: ray::RayDPSparkMaster.start_up() (pid=504487, ip=192.168.xxx.xxx, actor_id=db69bf707b74fa0a14343e7818000000, repr=<raydp.spark.ray_cluster_master.RayDPSparkMaster object at 0x7f826c0814f0>)
  File "/opt/conda/lib/python3.8/site-packages/raydp/spark/ray_cluster_master.py", line 68, in start_up
  File "/opt/conda/lib/python3.8/site-packages/raydp/spark/ray_cluster_master.py", line 158, in _launch_gateway
Exception: Java gateway process exited before sending its port number

We have looked at multiple issue with similar error, but it doesn't resolve the issue: RayDp-Fails

akashd11 commented 1 month ago

Maybe try installing raydp in container as well. Then it would be able to find the jars needed to start spark