flyteorg / flyte

Scalable and flexible workflow orchestration platform that seamlessly unifies data, ML and analytics stacks.
https://flyte.org
Apache License 2.0
5.4k stars 577 forks source link

[BUG] Failed to run a ray task in the sandbox #5583

Open pingsutw opened 1 month ago

pingsutw commented 1 month ago

Describe the bug

Got the below error in the pod.

Traceback (most recent call last):
  File "/usr/local/bin/ray", line 8, in <module>
    sys.exit(main())
  File "/usr/local/lib/python3.10/site-packages/ray/scripts/scripts.py", line 2615, in main
    return cli()
  File "/usr/local/lib/python3.10/site-packages/click/core.py", line 1157, in __call__
    return self.main(*args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/click/core.py", line 1078, in main
    rv = self.invoke(ctx)
  File "/usr/local/lib/python3.10/site-packages/click/core.py", line 1688, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/usr/local/lib/python3.10/site-packages/click/core.py", line 1688, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/usr/local/lib/python3.10/site-packages/click/core.py", line 1434, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/usr/local/lib/python3.10/site-packages/click/core.py", line 783, in invoke
    return __callback(*args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/ray/dashboard/modules/job/cli_utils.py", line 54, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/ray/autoscaler/_private/cli_logger.py", line 856, in wrapper
    return f(*args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/ray/dashboard/modules/job/cli.py", line 273, in submit
    job_id = client.submit_job(
  File "/usr/local/lib/python3.10/site-packages/ray/dashboard/modules/job/sdk.py", line 254, in submit_job
    self._raise_error(r)
  File "/usr/local/lib/python3.10/site-packages/ray/dashboard/modules/dashboard_sdk.py", line 283, in _raise_error
    raise RuntimeError(
RuntimeError: Request failed with status code 500: Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/ray/dashboard/modules/job/job_head.py", line 298, in submit_job
    resp = await job_agent_client.submit_job_internal(submit_request)
  File "/usr/local/lib/python3.10/site-packages/ray/dashboard/modules/job/job_head.py", line 75, in submit_job_internal
    async with self._session.post(
  File "/usr/local/lib/python3.10/site-packages/aiohttp/client.py", line 1197, in __aenter__
    self._resp = await self._coro
  File "/usr/local/lib/python3.10/site-packages/aiohttp/client.py", line 608, in _request
    await resp.start(conn)
  File "/usr/local/lib/python3.10/site-packages/aiohttp/client_reqrep.py", line 976, in start
    message, payload = await protocol.read()  # type: ignore[union-attr]
  File "/usr/local/lib/python3.10/site-packages/aiohttp/streams.py", line 640, in read
    await self._waiter
aiohttp.client_exceptions.ServerDisconnectedError: Server disconnected
.

Expected behavior

Should successfully run a ray task

Additional context to reproduce

import typing

from flytekit import ImageSpec, Resources, task, workflow

custom_image = ImageSpec(
    registry="ghcr.io/flyteorg",
    packages=["flytekitplugins-ray", "ray[default]==2.30.0"],
    # kuberay operator needs wget for readiness probe.
    apt_packages=["wget"],
    # platform="linux/arm64"
)

import ray
from flytekitplugins.ray import HeadNodeConfig, RayJobConfig, WorkerNodeConfig

@ray.remote
def f(x):
    return x * x

ray_config = RayJobConfig(
    head_node_config=HeadNodeConfig(ray_start_params={"log-color": "True"}),
    worker_node_config=[WorkerNodeConfig(group_name="ray-group", replicas=1)],
    runtime_env={"pip": ["numpy", "pandas"]},
)

@task(
    task_config=ray_config,
    requests=Resources(mem="2Gi", cpu="2"),
    container_image=custom_image,
)
def ray_task(n: int) -> typing.List[int]:
    futures = [f.remote(i) for i in range(n)]
    return ray.get(futures)

@workflow
def ray_workflow(n: int) -> typing.List[int]:
    return ray_task(n=n)

if __name__ == "__main__":
    print(ray_workflow(n=10))

Screenshots

No response

Are you sure this issue hasn't been raised already?

Have you read the Code of Conduct?

runllm[bot] commented 1 month ago

The error you're encountering is related to the Ray plugin integration with Flyte, specifically when running Ray tasks in a sandbox environment. This issue has been observed in various contexts and is often linked to the stability and configuration of the KubeRay operator.

  1. KubeRay Version: Ensure you are using a stable version of KubeRay. Versions 0.3 to 0.6 have known issues. Upgrading to KubeRay 1.0 has resolved similar issues for other users (source).

  2. ClusterRole Permissions: Ensure that the necessary permissions are set for the Ray operator. Missing permissions can cause failures in creating Ray jobs (source).

  3. Configuration and Deployment: Verify that your Flyte deployment is correctly configured to use the Ray plugin. This includes ensuring that the flytekitplugins-ray package is installed and that the Ray cluster is properly set up (source).

Would you like more detailed steps on any of these points?

Sources:


This is an AI-generated response and your feedback is appreciated! Please leave a 👍 if this is helpful and 👎 if it is not.

Mention @runllm to ask follow-up questions. See runllm.com for more!