ray-project / ray

Ray is an AI compute engine. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
34.04k stars 5.78k forks source link

[Data/Core] Dataset workloads hang although there's sufficient resources + raylet RPC Error #34255

Open v4if opened 1 year ago

v4if commented 1 year ago

What happened + What you expected to happen

With datasets streaming execution, sometimes got RPC Error, but the program ends normally.

run log:

(raylet, ip=172.16.0.26) [2023-04-11 09:55:45,543 E 53 53] (raylet) worker.cc:190: Failed to send wait complete: GrpcUnavailable: RPC Error message: Connection reset by peer; RPC Error details:
(raylet, ip=172.16.0.26) [2023-04-11 09:55:45,597 E 53 53] (raylet) worker.cc:190: Failed to send wait complete: GrpcUnavailable: RPC Error message: failed to connect to all addresses; RPC Error details:

cluster info:

Node status
---------------------------------------------------------------
Healthy:
 1 wg
 1 head-group
Pending:
 (no pending nodes)
Recent failures:
 (no failures)

Resources
---------------------------------------------------------------
Usage:
 0.0/32.0 CPU
 0.0/1.0 GPU
 0B/89.41GiB memory
 0B/26.72GiB object_store_memory

Demands:
 (no resource demands)

How to troubleshoot this problem?

Versions / Dependencies

ray, version 3.0.0.dev0 nightly build from 20230410

Reproduction script

import ray
import time
from ray.util.placement_group import (
    placement_group,
    placement_group_table,
    remove_placement_group,
)
from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy

assert (
    ray.__version__ >= "2.3.0"
), f"The version of ray must be greater than 2.3.0, the current version is {ray.__version__}"

ray.init()

class Process1:
    def __call__(self, df):
        time.sleep(0.2)
        return df

class Process2:
    def __call__(self, df):
        time.sleep(0.1)
        return df

@ray.remote
def task(idx, pg):
    ctx = ray.data.context.DatasetContext.get_current()
    ctx.execution_options.resource_limits.cpu = 3
    ctx.execution_options.resource_limits.gpu = 0.2
    ctx.execution_options.resource_limits.object_store_memory = 1 * 1024 * 1024 * 1024
    ctx.scheduling_strategy = None

    ds = ray.data.range_tensor(1000, shape=(3, 1024, 1024), parallelism=1000)
    pipe = ds.map_batches(
        Process1,
        batch_size=1,
        num_cpus=0.5,
        compute=ray.data.ActorPoolStrategy(1, 1),
        placement_group=pg,
        placement_group_capture_child_tasks=True,
    ).map_batches(
        Process2,
        batch_size=1,
        num_gpus=0.045,
        compute=ray.data.ActorPoolStrategy(2, 2),
        placement_group=pg,
        placement_group_capture_child_tasks=True,
    )

    for batch in pipe.iter_batches(batch_size=1):
        ...

NUM_TASKS = 5

groups = []
for i in range(NUM_TASKS):
    pg = placement_group([{"CPU": 3, "GPU": 0.2}], strategy="STRICT_PACK")
    ray.get(pg.ready())
    groups.append(pg)

results = [
    task.options(
        num_cpus=0.001,
        scheduling_strategy=PlacementGroupSchedulingStrategy(
            placement_group=pg,
            placement_group_bundle_index=-1,
            placement_group_capture_child_tasks=True,
        ),
    ).remote(i, groups[i])
    for i in range(NUM_TASKS)
]
ray.get(results)

Issue Severity

High: It blocks me from completing my task.

rkooo567 commented 1 year ago

@c21 when I tried this in my local machine, it hangs. Can you guys triage and fix lmk the root cause first before we try reproducing the RPC error itself?

c21 commented 1 year ago

Thanks @rkooo567, will triage in Data team.

anyscalesam commented 1 year ago

@c21 can you retry repro with latest Ray?