mars-project / mars

Mars is a tensor-based unified framework for large-scale data computation which scales numpy, pandas, scikit-learn and Python functions.
https://mars-project.readthedocs.io
Apache License 2.0
2.68k stars 325 forks source link

[BUG] `test_ray_client` run failed #3345

Closed zhongchun closed 1 year ago

zhongchun commented 1 year ago

Describe the bug

mars/deploy/oscar/tests/test_ray_client.py::test_ray_client run failed.

To Reproduce To help us reproducing this bug, please provide information below:

  1. Your Python version: python 3.8.16
  2. The version of Mars you use: master
  3. Versions of crucial packages, such as numpy, scipy and pandas numpy: 1.24.3 pandas: 1.5.3 ray: 2.4.0
  4. Full stack of the error.
____________________________ test_ray_client[mars] _____________________________

backend = 'mars'

    @require_ray
    @pytest.mark.parametrize(
        "backend",
        [
            "mars",
            "ray",
        ],
    )
    def test_ray_client(backend):
        server_code = """import time
    import ray.util.client.server.server as ray_client_server

    server = ray_client_server.init_and_serve("{address}", num_cpus=20)
    print("OK", flush=True)
    while True:
        time.sleep(1)
    """

        address = "127.0.0.1:50051"

        with tempfile.NamedTemporaryFile(mode="w", suffix=".py") as f:
            f.write(server_code.format(address=address))
            f.flush()

            proc = subprocess.Popen([sys.executable, "-u", f.name], stdout=subprocess.PIPE)

            try:

                def _check_ready(expect_exit=False):
                    while True:
                        line = proc.stdout.readline()
                        if proc.returncode is not None:
                            if expect_exit:
                                break
                            raise Exception(
                                f"Failed to start ray server at {address}, "
                                f"the return code is {proc.returncode}."
                            )
                        if b"OK" in line:
                            break

                # Avoid ray.init timeout.
                _check_ready()

                # Avoid blocking the subprocess when the stdout pipe is full.
                t = threading.Thread(target=_check_ready, args=(True,), daemon=True)
                t.start()
                try:
                    import ray

                    ray.client(address).connect()  # Ray 1.4
                except Exception:
                    try:
                        from ray.util.client import ray

                        ray.connect(address)  # Ray 1.2
                    except Exception:
                        import ray

                        ray.init(f"ray://{address}")  # Ray latest
                ray._inside_client_test = True
                try:
>                   new_ray_session_test(backend=backend)

mars/deploy/oscar/tests/test_ray_client.py:91: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
mars/deploy/oscar/tests/test_ray_cluster_standalone.py:76: in new_ray_session_test
    ds.filter(lambda row: row["a"] > 0.5).show(5)
../../../miniconda/envs/test/lib/python3.8/site-packages/ray/data/dataset.py:2295: in show
    for row in self.take(limit):
../../../miniconda/envs/test/lib/python3.8/site-packages/ray/data/dataset.py:2251: in take
    for row in self.iter_rows():
../../../miniconda/envs/test/lib/python3.8/site-packages/ray/data/dataset_iterator.py:232: in iter_rows
    for batch in self.iter_batches(**iter_batch_args):
../../../miniconda/envs/test/lib/python3.8/site-packages/ray/data/dataset_iterator.py:185: in iter_batches
    yield from iter_batches(
../../../miniconda/envs/test/lib/python3.8/site-packages/ray/data/_internal/block_batching/iter_batches.py:179: in iter_batches
    next_batch = next(async_batch_iter)
../../../miniconda/envs/test/lib/python3.8/site-packages/ray/data/_internal/block_batching/util.py:268: in make_async_gen
    raise next_item
../../../miniconda/envs/test/lib/python3.8/site-packages/ray/data/_internal/block_batching/util.py:249: in execute_computation
    for item in fn(thread_safe_generator):
../../../miniconda/envs/test/lib/python3.8/site-packages/ray/data/_internal/block_batching/iter_batches.py:170: in _async_iter_batches
    yield from extract_data_from_batch(batch_iter)
../../../miniconda/envs/test/lib/python3.8/site-packages/ray/data/_internal/block_batching/util.py:193: in extract_data_from_batch
    for batch in batch_iter:
../../../miniconda/envs/test/lib/python3.8/site-packages/ray/data/_internal/block_batching/iter_batches.py:307: in restore_original_order
    for batch in batch_iter:
../../../miniconda/envs/test/lib/python3.8/site-packages/ray/data/_internal/block_batching/iter_batches.py:221: in threadpool_computations
    yield from formatted_batch_iter
../../../miniconda/envs/test/lib/python3.8/site-packages/ray/data/_internal/block_batching/util.py:167: in format_batches
    for batch in block_iter:
../../../miniconda/envs/test/lib/python3.8/site-packages/ray/data/_internal/block_batching/util.py:126: in blocks_to_batches
    for block in block_iter:
../../../miniconda/envs/test/lib/python3.8/site-packages/ray/data/_internal/block_batching/util.py:64: in resolve_block_refs
    current_hit, current_miss, current_unknown = _calculate_ref_hits([block_ref])
../../../miniconda/envs/test/lib/python3.8/site-packages/ray/data/_internal/block_batching/util.py:41: in _calculate_ref_hits
    locs = ray.experimental.get_object_locations(refs)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

obj_refs = [ClientObjectRef(eab74631758179e9ffffffffffffffffffffffff0100000003000000)]
timeout_ms = -1

    def get_object_locations(
        obj_refs: List[ObjectRef], timeout_ms: int = -1
    ) -> Dict[ObjectRef, Dict[str, Any]]:
        """Lookup the locations for a list of objects.

        It returns a dict maps from an object to its location. The dict excludes
        those objects whose location lookup failed.

        Args:
            object_refs (List[ObjectRef]): List of object refs.
            timeout_ms: The maximum amount of time in micro seconds to wait
                before returning. Wait infinitely if it's negative.

        Returns:
            A dict maps from an object to its location. The dict excludes those
            objects whose location lookup failed.

            The location is stored as a dict with following attributes:

            - node_ids (List[str]): The hex IDs of the nodes that have a
              copy of this object.

            - object_size (int): The size of data + metadata in bytes.

        Raises:
            RuntimeError: if the processes were not started by ray.init().
            ray.exceptions.GetTimeoutError: if it couldn't finish the
                request in time.
        """
        if not ray.is_initialized():
            raise RuntimeError("Ray hasn't been initialized.")
>       return ray._private.worker.global_worker.core_worker.get_object_locations(
            obj_refs, timeout_ms
        )
E       AttributeError: 'Worker' object has no attribute 'core_worker'

../../../miniconda/envs/test/lib/python3.8/site-packages/ray/experimental/locations.py:38: AttributeError
_____________________________ test_ray_client[ray] _____________________________

backend = 'ray'

    @require_ray
    @pytest.mark.parametrize(
        "backend",
        [
            "mars",
            "ray",
        ],
    )
    def test_ray_client(backend):
        server_code = """import time
    import ray.util.client.server.server as ray_client_server

    server = ray_client_server.init_and_serve("{address}", num_cpus=20)
    print("OK", flush=True)
    while True:
        time.sleep(1)
    """

        address = "127.0.0.1:50051"

        with tempfile.NamedTemporaryFile(mode="w", suffix=".py") as f:
            f.write(server_code.format(address=address))
            f.flush()

            proc = subprocess.Popen([sys.executable, "-u", f.name], stdout=subprocess.PIPE)

            try:

                def _check_ready(expect_exit=False):
                    while True:
                        line = proc.stdout.readline()
                        if proc.returncode is not None:
                            if expect_exit:
                                break
                            raise Exception(
                                f"Failed to start ray server at {address}, "
                                f"the return code is {proc.returncode}."
                            )
                        if b"OK" in line:
                            break

                # Avoid ray.init timeout.
                _check_ready()

                # Avoid blocking the subprocess when the stdout pipe is full.
                t = threading.Thread(target=_check_ready, args=(True,), daemon=True)
                t.start()
                try:
                    import ray

                    ray.client(address).connect()  # Ray 1.4
                except Exception:
                    try:
                        from ray.util.client import ray

                        ray.connect(address)  # Ray 1.2
                    except Exception:
                        import ray

                        ray.init(f"ray://{address}")  # Ray latest
                ray._inside_client_test = True
                try:
>                   new_ray_session_test(backend=backend)

mars/deploy/oscar/tests/test_ray_client.py:91: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
mars/deploy/oscar/tests/test_ray_cluster_standalone.py:76: in new_ray_session_test
    ds.filter(lambda row: row["a"] > 0.5).show(5)
../../../miniconda/envs/test/lib/python3.8/site-packages/ray/data/dataset.py:2295: in show
    for row in self.take(limit):
../../../miniconda/envs/test/lib/python3.8/site-packages/ray/data/dataset.py:2251: in take
    for row in self.iter_rows():
../../../miniconda/envs/test/lib/python3.8/site-packages/ray/data/dataset_iterator.py:232: in iter_rows
    for batch in self.iter_batches(**iter_batch_args):
../../../miniconda/envs/test/lib/python3.8/site-packages/ray/data/dataset_iterator.py:185: in iter_batches
    yield from iter_batches(
../../../miniconda/envs/test/lib/python3.8/site-packages/ray/data/_internal/block_batching/iter_batches.py:179: in iter_batches
    next_batch = next(async_batch_iter)
../../../miniconda/envs/test/lib/python3.8/site-packages/ray/data/_internal/block_batching/util.py:268: in make_async_gen
    raise next_item
../../../miniconda/envs/test/lib/python3.8/site-packages/ray/data/_internal/block_batching/util.py:249: in execute_computation
    for item in fn(thread_safe_generator):
../../../miniconda/envs/test/lib/python3.8/site-packages/ray/data/_internal/block_batching/iter_batches.py:170: in _async_iter_batches
    yield from extract_data_from_batch(batch_iter)
../../../miniconda/envs/test/lib/python3.8/site-packages/ray/data/_internal/block_batching/util.py:193: in extract_data_from_batch
    for batch in batch_iter:
../../../miniconda/envs/test/lib/python3.8/site-packages/ray/data/_internal/block_batching/iter_batches.py:307: in restore_original_order
    for batch in batch_iter:
../../../miniconda/envs/test/lib/python3.8/site-packages/ray/data/_internal/block_batching/iter_batches.py:221: in threadpool_computations
    yield from formatted_batch_iter
../../../miniconda/envs/test/lib/python3.8/site-packages/ray/data/_internal/block_batching/util.py:167: in format_batches
    for batch in block_iter:
../../../miniconda/envs/test/lib/python3.8/site-packages/ray/data/_internal/block_batching/util.py:126: in blocks_to_batches
    for block in block_iter:
../../../miniconda/envs/test/lib/python3.8/site-packages/ray/data/_internal/block_batching/util.py:64: in resolve_block_refs
    current_hit, current_miss, current_unknown = _calculate_ref_hits([block_ref])
../../../miniconda/envs/test/lib/python3.8/site-packages/ray/data/_internal/block_batching/util.py:41: in _calculate_ref_hits
    locs = ray.experimental.get_object_locations(refs)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

obj_refs = [ClientObjectRef(8c1a916673689fefffffffffffffffffffffffff0100000003000000)]
timeout_ms = -1

    def get_object_locations(
        obj_refs: List[ObjectRef], timeout_ms: int = -1
    ) -> Dict[ObjectRef, Dict[str, Any]]:
        """Lookup the locations for a list of objects.

        It returns a dict maps from an object to its location. The dict excludes
        those objects whose location lookup failed.

        Args:
            object_refs (List[ObjectRef]): List of object refs.
            timeout_ms: The maximum amount of time in micro seconds to wait
                before returning. Wait infinitely if it's negative.

        Returns:
            A dict maps from an object to its location. The dict excludes those
            objects whose location lookup failed.

            The location is stored as a dict with following attributes:

            - node_ids (List[str]): The hex IDs of the nodes that have a
              copy of this object.

            - object_size (int): The size of data + metadata in bytes.

        Raises:
            RuntimeError: if the processes were not started by ray.init().
            ray.exceptions.GetTimeoutError: if it couldn't finish the
                request in time.
        """
        if not ray.is_initialized():
            raise RuntimeError("Ray hasn't been initialized.")
>       return ray._private.worker.global_worker.core_worker.get_object_locations(
            obj_refs, timeout_ms
        )
E       AttributeError: 'Worker' object has no attribute 'core_worker'

../../../miniconda/envs/test/lib/python3.8/site-packages/ray/experimental/locations.py:38: AttributeError
=============================== warnings summary ===============================
../../../miniconda/envs/test/lib/python3.8/site-packages/coverage/inorout.py:460
  /home/runner/miniconda/envs/test/lib/python3.8/site-packages/coverage/inorout.py:460: CoverageWarning: --include is ignored because --source is set (include-ignored)
    self.warn("--include is ignored because --source is set", slug="include-ignored")

../../../miniconda/envs/test/lib/python3.8/site-packages/pkg_resources/__init__.py:121
  /home/runner/miniconda/envs/test/lib/python3.8/site-packages/pkg_resources/__init__.py:121: DeprecationWarning: pkg_resources is deprecated as an API
    warnings.warn("pkg_resources is deprecated as an API", DeprecationWarning)

../../../miniconda/envs/test/lib/python3.8/site-packages/pkg_resources/__init__.py:[2870](https://github.com/mars-project/mars/actions/runs/4826746350/jobs/8599779103?pr=3344#step:6:2871)
  /home/runner/miniconda/envs/test/lib/python3.8/site-packages/pkg_resources/__init__.py:2870: DeprecationWarning: Deprecated call to `pkg_resources.declare_namespace('google')`.
  Implementing implicit namespace packages (as specified in PEP 420) is preferred to `pkg_resources.declare_namespace`. See https://setuptools.pypa.io/en/latest/references/keywords.html#keyword-namespace-packages
    declare_namespace(pkg)

mars/deploy/oscar/tests/test_ray_client.py::test_ray_client[mars]
mars/deploy/oscar/tests/test_ray_client.py::test_ray_client[ray]
  /home/runner/work/mars/mars/mars/deploy/oscar/tests/test_ray_client.py:79: DeprecationWarning: Starting a connection through `ray.client` will be deprecated in future ray versions in favor of `ray.init`. See the docs for more details: https://docs.ray.io/en/latest/cluster/running-applications/job-submission/ray-client.html. You can replace your call to `ray.client().connect()` with the following:
        ray.init("ray://127.0.0.1:50051")

    ray.client(address).connect()  # Ray 1.4

-- Docs: https://docs.pytest.org/en/stable/how-to/capture-warnings.html
============================== slowest durations ===============================
34.81s call     mars/deploy/oscar/tests/test_ray_client.py::test_ray_client[mars]
16.90s call     mars/deploy/oscar/tests/test_ray_client.py::test_ray_client[ray]

(4 durations < 0.005s hidden.  Use -vv to show these durations.)
=========================== short test summary info ============================
FAILED mars/deploy/oscar/tests/test_ray_client.py::test_ray_client[mars] - AttributeError: 'Worker' object has no attribute 'core_worker'
FAILED mars/deploy/oscar/tests/test_ray_client.py::test_ray_client[ray] - AttributeError: 'Worker' object has no attribute 'core_worker'
  1. Minimized code to reproduce the error. Just run pytest -s -v mars/deploy/oscar/tests/test_ray_client.py::test_ray_client.

Expected behavior A clear and concise description of what you expected to happen.

Additional context Add any other context about the problem here.