ray-project / ray

Ray is an AI compute engine. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
33.97k stars 5.77k forks source link

Error In loading data in ray.remote function using external cluster #43853

Open Injokerash opened 8 months ago

Injokerash commented 8 months ago

What happened + What you expected to happen

I’m trying to test ray to see what it offers, so if it suits our case, deploy a ray cluster on Kubernetes. currently, I’m using a docker-compose to test it, before deploying kube-ray.

The problem is that I get “ray.exceptions.OwnerDiedError: Failed to retrieve object” sometimes. and it has some kind of decussate pattern in it, First time it runs pretty fine and the next time I run the same code throws this error and this pattern repeats.

Ray examples in the document are not working.

Versions / Dependencies

python 3.10 ray[all]==2.9.3

Reproduction script

Code

import torch
import ray
from ray import train
from ray.train import Checkpoint, ScalingConfig
from ray.train.torch import TorchTrainer

ray.init("ray://ray-head:10001")

use_gpu = False

train_dataset = ray.data.from_items([{"x": [x], "y": [2 * x]} for x in range(200)])

def increment(batch):
    batch["y"] = batch["y"] + 1
    return batch

train_dataset = train_dataset.map_batches(increment)

def train_func(config):
    batch_size = 16

    train_data_shard = train.get_dataset_shard("train")
    train_dataloader = train_data_shard.iter_torch_batches(
        batch_size=batch_size, dtypes=torch.float32
    )

    for epoch_idx in range(1):
        for batch in train_dataloader:
            inputs, labels = batch["x"], batch["y"]
            assert type(inputs) == torch.Tensor
            assert type(labels) == torch.Tensor
            assert inputs.shape[0] == batch_size
            assert labels.shape[0] == batch_size
            break # Only check one batch. Last batch can be partial.

trainer = TorchTrainer(
    train_func,
    datasets={"train": train_dataset},
    scaling_config=ScalingConfig(num_workers=1, use_gpu=use_gpu)
)
result = trainer.fit()
import ray
import pandas as pd
ray.init("ray://ray-head:10001")

@ray.remote(max_retries=3)
def ray_data_task():
    p1 = pd.DataFrame({'a': [3,4] * 10, 'b': [5,6] * 10})
    ds = ray.data.from_pandas(p1)
    return ds.repartition(4).to_pandas()

print(ray.get(ray_data_task.remote()))

Dockerfile

FROM scratch as files

COPY --from=res ./README.md ./pyproject.toml /code/
COPY --from=res ./src /code/src

FROM python:3.10-slim as base

RUN --mount=type=cache,target=/var/lib/apt/lists \
    --mount=type=cache,target=/var/cache/apt \
    rm /etc/apt/apt.conf.d/docker-clean \
    && sed -i 's/deb.debian.org/mirror.arvancloud.ir/g' /etc/apt/sources.list.d/debian.sources \
    && apt-get update \
    && apt-get install python3-venv tini curl apt-transport-https python-is-python3 -y \
    && useradd -m -U --shell /bin/bash user

FROM base as venv

RUN python3 -m venv /opt/venv \
    && chown -R user:user /opt/venv

COPY --from=files --link /code /home/user/code

COPY --from=res ./requirements.txt /home/user/code/

ENV VIRTUAL_ENV="/opt/venv" \
    PATH="/opt/venv/bin:$PATH" \
    PIP_CACHE_DIR=/var/cache/pip

RUN --mount=type=cache,target=/var/cache/pip \
    pip install -r /home/user/code/requirements.txt \
    && pip install -e /home/user/code --no-deps

FROM base

COPY --link --from=files --chown=user:user --chmod=755 /code /home/user/code
COPY --link --from=venv --chown=user:user --chmod=755 /opt/venv /opt/venv
COPY --chmod="0555" --chown=root:root ./entrypoint.sh /usr/bin/entrypoint.sh

USER user

ENV VIRTUAL_ENV="/opt/venv" \
    PATH="/opt/venv/bin:$PATH" 

ENTRYPOINT [ "/usr/bin/tini", "--", "entrypoint.sh" ]

Compose file

name: ray-distributed

x-ray-local: &x-ray-local
  image: ray-local
  build:
    context: ./
    dockerfile: Dockerfile
    additional_contexts:
      res: ../
  restart: unless-stopped
  environment:
    RAY_record_ref_creation_sites: 1
    RAY_BACKEND_LOG_LEVEL: debug
    RAY_ENABLE_RECORD_ACTOR_TASK_LOGGING: 1 
  volumes:
    - /dev/shm:/dev/shm
  develop:
    watch:
      - action: sync+restart
        path: ../src
        target: /home/user/code/src
        ignore:
          - "**/__pycache__"
  networks:
    - ray

x-ray-worker: &x-ray-worker
  <<: *x-ray-local
  command: ray start --address=ray-head:6379 --block
  mem_limit: 2048m
  cpu_count: 2
  # shm_size: 2048m
  depends_on:
    ray-head:
      condition: service_healthy

services:
  # bunch of other containers
  ray-head:
    <<: *x-ray-local
    container_name: ray-head
    ports:
      - 8265:8265
    command: |
      bash -c 'ray start --head \
                --port=6379 \
                --node-ip-address=0.0.0.0 \
                --ray-client-server-port=10001 \
                --dashboard-host=0.0.0.0 \
                --dashboard-port=8265 \
                --block'
    healthcheck:
      test: curl -f http://localhost:8265 || exit 1
      interval: 15s
      retries: 3
      start_period: 5s
      timeout: 5s

  ray-worker-1:
    <<: *x-ray-worker
    container_name: ray-worker-1

  ray-worker-2:
    <<: *x-ray-worker
    container_name: ray-worker-2

  ray-client:
    <<: *x-ray-local
    container_name: ray-client
    tty: true
    stdin_open: true
    entrypoint: sleep infinity
    working_dir: /tmp
    volumes:
      - ../src:/src
      - ./test_client.sh:/tmp/test_client.sh

networks:
  ray:

Error

(ray_data_task pid=3206) Executing DAG InputDataBuffer[Input] -> AllToAllOperator[Repartition]
(ray_data_task pid=3206) Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), exclude_resources=ExecutionResources(cpu=0, gpu=0, object_store_memory=0), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
(ray_data_task pid=3206) Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
(ray_data_task pid=3206) [dataset]: Run `pip install tqdm` to enable progress reporting.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/runpy.py", line 196, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "/usr/local/lib/python3.10/runpy.py", line 86, in _run_code
    exec(code, run_globals)
  File "/src/playground/test.py", line 17, in <module>
    print(ray.get(ray_data_task.remote()))
  File "/opt/venv/lib/python3.10/site-packages/ray/_private/auto_init_hook.py", line 22, in auto_init_wrapper
    return fn(*args, **kwargs)
  File "/opt/venv/lib/python3.10/site-packages/ray/_private/client_mode_hook.py", line 102, in wrapper
    return getattr(ray, func.__name__)(*args, **kwargs)
  File "/opt/venv/lib/python3.10/site-packages/ray/util/client/api.py", line 42, in get
    return self.worker.get(vals, timeout=timeout)
  File "/opt/venv/lib/python3.10/site-packages/ray/util/client/worker.py", line 434, in get
    res = self._get(to_get, op_timeout)
  File "/opt/venv/lib/python3.10/site-packages/ray/util/client/worker.py", line 462, in _get
    raise err
types.RayTaskError(OwnerDiedError): ray::ray_data_task() (pid=3206, ip=0.0.0.0)
  File "/src/playground/test.py", line 15, in ray_data_task
  File "/opt/venv/lib/python3.10/site-packages/ray/data/dataset.py", line 4436, in to_pandas
    count = self.count()
  File "/opt/venv/lib/python3.10/site-packages/ray/data/dataset.py", line 2606, in count
    [get_num_rows.remote(block) for block in self.get_internal_block_refs()]
  File "/opt/venv/lib/python3.10/site-packages/ray/data/dataset.py", line 4779, in get_internal_block_refs
    blocks = self._plan.execute().get_blocks()
  File "/opt/venv/lib/python3.10/site-packages/ray/data/_internal/plan.py", line 628, in execute
    blocks = execute_to_legacy_block_list(
  File "/opt/venv/lib/python3.10/site-packages/ray/data/_internal/execution/legacy_compat.py", line 126, in execute_to_legacy_block_list
    block_list = _bundles_to_block_list(bundles)
  File "/opt/venv/lib/python3.10/site-packages/ray/data/_internal/execution/legacy_compat.py", line 411, in _bundles_to_block_list
    for ref_bundle in bundles:
  File "/opt/venv/lib/python3.10/site-packages/ray/data/_internal/execution/interfaces/executor.py", line 37, in __next__
    return self.get_next()
  File "/opt/venv/lib/python3.10/site-packages/ray/data/_internal/execution/streaming_executor.py", line 145, in get_next
    item = self._outer._output_node.get_output_blocking(
  File "/opt/venv/lib/python3.10/site-packages/ray/data/_internal/execution/streaming_executor_state.py", line 320, in get_output_blocking
    raise self._exception
  File "/opt/venv/lib/python3.10/site-packages/ray/data/_internal/execution/streaming_executor.py", line 212, in run
    while self._scheduling_loop_step(self._topology) and not self._shutdown:
  File "/opt/venv/lib/python3.10/site-packages/ray/data/_internal/execution/streaming_executor.py", line 299, in _scheduling_loop_step
    update_operator_states(topology)
  File "/opt/venv/lib/python3.10/site-packages/ray/data/_internal/execution/streaming_executor_state.py", line 528, in update_operator_states
    op.all_inputs_done()
  File "/opt/venv/lib/python3.10/site-packages/ray/data/_internal/execution/operators/base_physical_operator.py", line 93, in all_inputs_done
    self._output_buffer, self._stats = self._bulk_fn(self._input_buffer, ctx)
  File "/opt/venv/lib/python3.10/site-packages/ray/data/_internal/planner/repartition.py", line 70, in split_repartition_fn
    return scheduler.execute(refs, num_outputs, ctx)
  File "/opt/venv/lib/python3.10/site-packages/ray/data/_internal/planner/exchange/split_repartition_task_scheduler.py", line 68, in execute
    split_return = _split_at_indices(
  File "/opt/venv/lib/python3.10/site-packages/ray/data/_internal/split.py", line 283, in _split_at_indices
    ] = _split_all_blocks(
  File "/opt/venv/lib/python3.10/site-packages/ray/data/_internal/split.py", line 196, in _split_all_blocks
    per_block_split_metadata = ray.get(per_block_split_metadata_futures)
ray.exceptions.RayTaskError(OwnerDiedError): ray::_split_single_block() (pid=483, ip=172.22.0.4)
  At least one of the input arguments for this task could not be computed:
ray.exceptions.OwnerDiedError: Failed to retrieve object 002e96ffd8909c19ffffffffffffffffffffffff0c00000002e1f505. To see information about where this ObjectRef was created in Python, set the environment variable RAY_record_ref_creation_sites=1 during `ray start` and `ray.init()`.

The object's owner has exited. This is the Python worker that first created the ObjectRef via `.remote()` or `ray.put()`. Check cluster logs (`/tmp/ray/session_latest/logs/*51d71de8d044b3224b430ed92f10eed2b627e83bb35cbb2ae95c5a75*` at IP address 0.0.0.0) for more information about the Python worker failure.

Issue Severity

High: It blocks me from completing my task.

scottjlee commented 4 months ago

For the second example:

import ray
import pandas as pd
ray.init("ray://ray-head:10001")

@ray.remote(max_retries=3)
def ray_data_task():
    p1 = pd.DataFrame({'a': [3,4] * 10, 'b': [5,6] * 10})
    ds = ray.data.from_pandas(p1)
    return ds.repartition(4).to_pandas()

print(ray.get(ray_data_task.remote()))

You only need to do one of the following:

So if you keep the decorator, you can directly call ray.get(ray_data_task). See the docs for more usage - https://docs.ray.io/en/latest/ray-core/api/doc/ray.remote.html