oap-project / raydp

RayDP provides simple APIs for running Spark on Ray and integrating Spark with AI libraries.
Apache License 2.0
308 stars 68 forks source link

Getting Java gateway process exited before sending its port number during init_spark() #383

Open swapkh91 opened 11 months ago

swapkh91 commented 11 months ago

I'm trying a test using raydp. I have setup Ray Cluster on GKE using the below dockerfile

FROM rayproject/ray:2.7.0-py310-cpu

ARG HTTP_PROXY
ARG HTTPS_PROXY

# set http_proxy & https_proxy
ENV http_proxy=${HTTP_PROXY}
ENV https_proxy=${HTTPS_PROXY}

RUN pip install -U pandas 'dask[complete']

# install java, create workdir and install raydp
# You could change the raydp to raydp-nightly if you want to try the master branch code
RUN sudo http_proxy=${HTTP_PROXY} https_proxy=${HTTPS_PROXY} apt-get update -y \
    && sudo apt-get install -y gcc \
#     && sudo http_proxy=${HTTP_PROXY} https_proxy=${HTTPS_PROXY} apt-get install -y openjdk-8-jdk \
    && sudo http_proxy=${HTTP_PROXY} https_proxy=${HTTPS_PROXY} apt-get install -y openjdk-11-jdk \
    && sudo mkdir /raydp \
    && sudo chown -R ray /raydp \
    && $HOME/anaconda3/bin/pip --no-cache-dir install raydp

WORKDIR /raydp

# unset http_proxy & https_proxy
ENV http_proxy=
ENV https_proxy=

I have port forwarded the gke pod and I'm able to connect to it using ray.init(address="ray://localhost:10001")

When i try to connect raydp through

spark = raydp.init_spark(app_name='RayDP Example2',
                         num_executors=2,
                         executor_cores=2,
                         executor_memory='4GB')

I get the following error Exception: Java gateway process exited before sending its port number

Full stacktrace

(RayDPSparkMaster pid=3236) Error occurred during initialization of VM
(RayDPSparkMaster pid=3236) agent library failed to init: instrument
(RayDPSparkMaster pid=3236) Error opening zip file or JAR manifest missing : /home/swapnesh/.local/lib/python3.10/site-packages/raydp/jars/raydp-agent-1.6.0.jar

Python version: 3.10.8
Ray version:    2.7.0
Dashboard:  http://10.0.0.26:8265
(RayDPSparkMaster pid=3236) Error occurred during initialization of VM
(RayDPSparkMaster pid=3236) agent library failed to init: instrument
(RayDPSparkMaster pid=3236) Error opening zip file or JAR manifest missing : /home/swapnesh/.local/lib/python3.10/site-packages/raydp/jars/raydp-agent-1.6.0.jar
---------------------------------------------------------------------------
RayTaskError                              Traceback (most recent call last)
/home/swapnesh/ray_example/raydp_test.ipynb Cell 6 line 1
----> 1 spark = raydp.init_spark(app_name='RayDP Example2',
      2                          num_executors=2,
      3                          executor_cores=2,
      4                          executor_memory='4GB')

File ~/.local/lib/python3.10/site-packages/raydp/context.py:215, in init_spark(app_name, num_executors, executor_cores, executor_memory, enable_hive, fault_tolerant_mode, placement_group_strategy, placement_group, placement_group_bundle_indexes, configs)
    207 try:
    208     _global_spark_context = _SparkContext(
    209         app_name, num_executors, executor_cores, executor_memory, enable_hive,
    210         fault_tolerant_mode,
   (...)
    213         placement_group_bundle_indexes,
    214         configs)
--> 215     return _global_spark_context.get_or_create_session()
    216 except:
    217     if _global_spark_context is not None:

File ~/.local/lib/python3.10/site-packages/raydp/context.py:121, in _SparkContext.get_or_create_session(self)
    119     return self._spark_session
    120 self._prepare_placement_group()
--> 121 spark_cluster = self._get_or_create_spark_cluster()
    122 self._spark_session = spark_cluster.get_spark_session()
    123 if self._fault_tolerant_mode:

File ~/.local/lib/python3.10/site-packages/raydp/context.py:86, in _SparkContext._get_or_create_spark_cluster(self)
     84 if self._spark_cluster is not None:
     85     return self._spark_cluster
---> 86 self._spark_cluster = SparkCluster(self._app_name,
     87                                    self._num_executors,
     88                                    self._executor_cores,
     89                                    self._executor_memory,
     90                                    self._enable_hive,
     91                                    self._configs)
     92 return self._spark_cluster

File ~/.local/lib/python3.10/site-packages/raydp/spark/ray_cluster.py:52, in SparkCluster.__init__(self, app_name, num_executors, executor_cores, executor_memory, enable_hive, configs)
     50 self._configs = configs
     51 self._prepare_spark_configs()
---> 52 self._set_up_master(resources=self._get_master_resources(self._configs), kwargs=None)
     53 self._spark_session: SparkSession = None

File ~/.local/lib/python3.10/site-packages/raydp/spark/ray_cluster.py:72, in SparkCluster._set_up_master(self, resources, kwargs)
     68 else:
     69     self._spark_master_handle = RayDPSparkMaster.options(name=spark_master_name) \
     70         .remote(self._configs)
---> 72 ray.get(self._spark_master_handle.start_up.remote())

File ~/.local/lib/python3.10/site-packages/ray/_private/auto_init_hook.py:24, in wrap_auto_init.<locals>.auto_init_wrapper(*args, **kwargs)
     21 @wraps(fn)
     22 def auto_init_wrapper(*args, **kwargs):
     23     auto_init_ray()
---> 24     return fn(*args, **kwargs)

File ~/.local/lib/python3.10/site-packages/ray/_private/client_mode_hook.py:102, in client_mode_hook.<locals>.wrapper(*args, **kwargs)
     98 if client_mode_should_convert():
     99     # Legacy code
    100     # we only convert init function if RAY_CLIENT_MODE=1
    101     if func.__name__ != "init" or is_client_mode_enabled_by_default:
--> 102         return getattr(ray, func.__name__)(*args, **kwargs)
    103 return func(*args, **kwargs)

File ~/.local/lib/python3.10/site-packages/ray/util/client/api.py:42, in _ClientAPI.get(self, vals, timeout)
     35 def get(self, vals, *, timeout=None):
     36     """get is the hook stub passed on to replace `ray.get`
     37 
     38     Args:
     39         vals: [Client]ObjectRef or list of these refs to retrieve.
     40         timeout: Optional timeout in milliseconds
     41     """
---> 42     return self.worker.get(vals, timeout=timeout)

File ~/.local/lib/python3.10/site-packages/ray/util/client/worker.py:434, in Worker.get(self, vals, timeout)
    432     op_timeout = max_blocking_operation_time
    433 try:
--> 434     res = self._get(to_get, op_timeout)
    435     break
    436 except GetTimeoutError:

File ~/.local/lib/python3.10/site-packages/ray/util/client/worker.py:462, in Worker._get(self, ref, timeout)
    460         logger.exception("Failed to deserialize {}".format(chunk.error))
    461         raise
--> 462     raise err
    463 if chunk.total_size > OBJECT_TRANSFER_WARNING_SIZE and log_once(
    464     "client_object_transfer_size_warning"
    465 ):
    466     size_gb = chunk.total_size / 2**30

RayTaskError: ray::RayDPSparkMaster.start_up() (pid=3236, ip=10.0.0.26, actor_id=df8587726cf63ef0ef08dc091a000000, repr=<raydp.spark.ray_cluster_master.RayDPSparkMaster object at 0x7829cbe16b00>)
  File "/home/swapnesh/.local/lib/python3.10/site-packages/raydp/spark/ray_cluster_master.py", line 68, in start_up
  File "/home/swapnesh/.local/lib/python3.10/site-packages/raydp/spark/ray_cluster_master.py", line 158, in _launch_gateway
Exception: Java gateway process exited before sending its port number

Libraries on my laptop:

ray version - 2.7.0
raydp version - 1.6.0
java version - openjdk version "11.0.20.1" 2023-08-24
kira-lin commented 11 months ago

Hi @swapkh91 , Sorry for the late reply. Ray 2.7.0 is released recently and might not be compatible with RayDP 1.6.0. Can you try Ray 2.6?

swapkh91 commented 11 months ago

Hi @swapkh91 , Sorry for the late reply. Ray 2.7.0 is released recently and might not be compatible with RayDP 1.6.0. Can you try Ray 2.6?

@kira-lin thanks, I'll try and get back. Any limitation on java version?

kira-lin commented 11 months ago

Sorry for inconvenience. We only tested java 8. Java 11 should be fine though.

swapkh91 commented 11 months ago

@kira-lin I tested it with ray 2.6.2, getting same error. I'll explain how I'm trying to connect, maybe some issue in the process

the ray cluster is on GKE I have port forwarded it on my laptop through kubectl port-forward --address 0.0.0.0 service/raycluster-autoscaler-head-svc 10001:10001

I then connect using

ray.init(address="ray://localhost:10001")
spark = raydp.init_spark(app_name='RayDP Example2',
                         num_executors=2,
                         executor_cores=2,
                         executor_memory='4GB')

Now this init_spark comamnd gives the above error

I checked the logs through dashboard image

:job_id:03000000
:actor_name:RayDPSparkMaster
Error opening zip file or JAR manifest missing : /home/swapnesh/.local/lib/python3.10/site-packages/raydp/jars/raydp-agent-1.6.0.jar

Why is it showing the jar file path of my laptop? It is present there though, I checked

kira-lin commented 11 months ago

Why is it showing the jar file path of my laptop? It is present there though, I checked

Oops, this seems a bug. We'll try to fix this. For now, you can wrap this init_spark and things you want to do with spark in an remote actor, that should be fine. Thanks for identifying this bug.

swapkh91 commented 11 months ago

Why is it showing the jar file path of my laptop? It is present there though, I checked

Oops, this seems a bug. We'll try to fix this. For now, you can wrap this init_spark and things you want to do with spark in an remote actor, that should be fine. Thanks for identifying this bug.

@kira-lin got it, I'll try that. Also, I noticed that raydp has dependency ray >= 2.1.0 as here. So this installs ray 2.7.1 when I do pip install raydp I then have to manually do pip install --force-reinstall ray==2.6.2 to downgrade

psr-ai commented 10 months ago

He @swapkh91 I am also getting the same error. Did you find the solution for this?

kira-lin commented 9 months ago

hi @raiprabh ,

For now, you can wrap this init_spark and things you want to do with spark in an remote actor, that should be fine. Thanks for identifying this bug.

You can try this solution. We don't have enough bandwidth to work on this project now, so you are welcome to submit a PR to fix this if you have a solution @swapkh91 . We just need to use the path of the remote machines.

Mayurji commented 5 months ago

@kira-lin, Is there any update on this issue?

Blarc commented 3 months ago

I also get this error when running the following code:

if __name__ == "__main__":
    import ray
    import raydp

    ray.init(
        address="ray://localhost:10001"
    )

    spark = ray.remote(
        raydp.init_spark("NYCTAXI data processing",
                         num_executors=2,
                         executor_cores=1,
                         executor_memory="500M",
                         configs={"spark.shuffle.service.enabled": "true"})
    )

    data = ray.remote(
        spark.read.format("csv") \
            .option("header", "true") \
            .option("inferSchema", "true") \
            .load(NYC_TRAIN_CSV)
    )

Seems that wrapping the functions into ray.remote doesn't help?

Blarc commented 3 months ago

The following worked for me:

import time

import ray
import raydp
import pandas as pd

@ray.remote
class PySparkDriver:
    def __init__(self):
        self.spark = raydp.init_spark("RayDP Example",
                                      num_executors=2,
                                      executor_cores=1,
                                      executor_memory="1GB")

    def foo(self):
        return self.spark.range(1000).repartition(10).count()

if __name__ == "__main__":
    ray.init(
        address="ray://localhost:10001"
    )

    driver = PySparkDriver.remote()
    print(ray.get(driver.foo.remote()))