ray-project / ray

Ray is a unified framework for scaling AI and Python applications. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
32.53k stars 5.51k forks source link

[Client] Dataset write_csv AttributeError: ‘Worker’ object has no attribute 'core_worker' #35537

Open China-JasonW opened 1 year ago

China-JasonW commented 1 year ago

What happened + What you expected to happen

When I use ray dataset write to csv with the 2.4.0, it will threw the error "AttributeError: 'Worker' object has no attribute 'core_worker".As is shown in the picture, it called ray is not initialized, but actually has initialized. But when I rolled back the version to 2.3.1, the program executed successfully

Versions / Dependencies

I use the ray client mode, the driver code ran on my mac system, the worker ran on linux system(centos 7.5). The ray version is 2.4.0. I run the program in debug mode of idea software,and submit the remote task to the remote ray cluster.

Reproduction script

class AnalysisService(IMLService):

    def __init__(self, context: dict):
        self.context = context

    def init(self):
        ray_add = "ray://192.168.101.21:10001"
        ray.init(address=ray_add)  

    def start(self):
        # Omit part of code, info_result is ray dataset
        info_result = ray.get(transform.remote(self.context, fuc_infer, ds_test))
        logger.info('The task has finished! {}', type(info_result))

        # The exception is throw here
        info_result.write_csv("/tmp/ddata")
        logger.info("The result has {} records.", info_result.count())

Issue Severity

High: It blocks me from completing my task.

tekumara commented 1 year ago

Given test.py:

import ray.data

ds1 = ray.data.range(100).groupby(lambda x: x % 3).count().show()

When using ray==2.4.0 on the client, and the rayproject/ray:2.4.0-py39-cpu image on the cluster, then:

❯ python -m test
2023-05-20 05:19:49,091 INFO worker.py:1314 -- Using address ray://127.0.0.1:10001 set in the environment variable RAY_ADDRESS
2023-05-20 05:19:54,129 INFO streaming_executor.py:83 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadRange] -> AllToAllOperator[Aggregate]
2023-05-20 05:19:54,130 INFO streaming_executor.py:84 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
Running: 0.0/6.0 CPU, 0.0/0.0 GPU, 0.0 MiB/536.34 MiB object_store_memory 0:   8%|████▏                                                  | 1/13 [00:06<01:22,  6.90s/itTraceback (most recent call last):0.0 MiB objects, 0 output 1: 100%|█████████████████████████████████████████████████████████████████████| 13/13 [00:06<00:00,  6.90s/it]
  File "/usr/lib/python3.9/runpy.py", line 197, in _run_module_as_main                                                                   | 1/13 [00:06<01:18,  6.51s/it]
    return _run_code(code, main_globals, None,                                                                                           | 1/13 [00:06<01:19,  6.63s/it]
  File "/usr/lib/python3.9/runpy.py", line 87, in _run_code                                                                              | 1/13 [00:06<01:21,  6.78s/it]
    exec(code, run_globals)
  File "/tmp/tekumara/code/ray-demo/rayexample/data/groupby.py", line 3, in <module>
    ds1 = ray.data.range(100).groupby(lambda x: x % 3).count().show()
  File "/tmp/tekumara/code/ray-demo/.venv/lib/python3.9/site-packages/ray/data/dataset.py", line 2295, in show
    for row in self.take(limit):
  File "/tmp/tekumara/code/ray-demo/.venv/lib/python3.9/site-packages/ray/data/dataset.py", line 2251, in take
    for row in self.iter_rows():
  File "/tmp/tekumara/code/ray-demo/.venv/lib/python3.9/site-packages/ray/data/dataset_iterator.py", line 232, in iter_rows
    for batch in self.iter_batches(**iter_batch_args):
  File "/tmp/tekumara/code/ray-demo/.venv/lib/python3.9/site-packages/ray/data/dataset_iterator.py", line 185, in iter_batches
    yield from iter_batches(
  File "/tmp/tekumara/code/ray-demo/.venv/lib/python3.9/site-packages/ray/data/_internal/block_batching/iter_batches.py", line 179, in iter_batches
    next_batch = next(async_batch_iter)
  File "/tmp/tekumara/code/ray-demo/.venv/lib/python3.9/site-packages/ray/data/_internal/block_batching/util.py", line 268, in make_async_gen
    raise next_item
  File "/tmp/tekumara/code/ray-demo/.venv/lib/python3.9/site-packages/ray/data/_internal/block_batching/util.py", line 249, in execute_computation
    for item in fn(thread_safe_generator):
  File "/tmp/tekumara/code/ray-demo/.venv/lib/python3.9/site-packages/ray/data/_internal/block_batching/iter_batches.py", line 170, in _async_iter_batches
    yield from extract_data_from_batch(batch_iter)
  File "/tmp/tekumara/code/ray-demo/.venv/lib/python3.9/site-packages/ray/data/_internal/block_batching/util.py", line 193, in extract_data_from_batch
    for batch in batch_iter:
  File "/tmp/tekumara/code/ray-demo/.venv/lib/python3.9/site-packages/ray/data/_internal/block_batching/iter_batches.py", line 307, in restore_original_order
    for batch in batch_iter:
  File "/tmp/tekumara/code/ray-demo/.venv/lib/python3.9/site-packages/ray/data/_internal/block_batching/iter_batches.py", line 221, in threadpool_computations
    yield from formatted_batch_iter
  File "/tmp/tekumara/code/ray-demo/.venv/lib/python3.9/site-packages/ray/data/_internal/block_batching/util.py", line 167, in format_batches
    for batch in block_iter:
  File "/tmp/tekumara/code/ray-demo/.venv/lib/python3.9/site-packages/ray/data/_internal/block_batching/util.py", line 126, in blocks_to_batches
    for block in block_iter:
  File "/tmp/tekumara/code/ray-demo/.venv/lib/python3.9/site-packages/ray/data/_internal/block_batching/util.py", line 64, in resolve_block_refs
    current_hit, current_miss, current_unknown = _calculate_ref_hits([block_ref])
  File "/tmp/tekumara/code/ray-demo/.venv/lib/python3.9/site-packages/ray/data/_internal/block_batching/util.py", line 41, in _calculate_ref_hits
    locs = ray.experimental.get_object_locations(refs)
  File "/tmp/tekumara/code/ray-demo/.venv/lib/python3.9/site-packages/ray/experimental/locations.py", line 38, in get_object_locations
    return ray._private.worker.global_worker.core_worker.get_object_locations(
AttributeError: 'Worker' object has no attribute 'core_worker'

Interestingly though, if I start a local ray 2.4.0 instance it works:

❯ python -m test
2023-05-20 05:23:14,011 INFO worker.py:1616 -- Started a local Ray instance. View the dashboard at 127.0.0.1:8266 
2023-05-20 05:23:16,011 INFO streaming_executor.py:83 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadRange] -> AllToAllOperator[Aggregate]
2023-05-20 05:23:16,012 INFO streaming_executor.py:84 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
amogkam commented 1 year ago

Unfortunately, these cross platform client setups are not very robust. I would use both linux for both client and the server, or just not use Ray client and instead run directly on the cluster.

tekumara commented 1 year ago

I'm using both Linux for the client and server, on the same machine.

amogkam commented 1 year ago

ah sorry you said in the original message that driver is on mac and worker is on linux

I use the ray client mode, the driver code ran on my mac system, the worker ran on linux system(centos 7.5).

China-JasonW commented 1 year ago

Recently I tested it on the linux cluster, both driver and worker on linux, it has the same error message.

China-JasonW commented 1 year ago

I'm using both Linux for the client and server, on the same machine.

I would like to know if there is a way to avoid this error or if there is a plan to fix it in the near future

amogkam commented 1 year ago

I would recommend not using Ray client, and instead either running directly on the head node of the cluster or using Ray production jobs.

Amitg1 commented 1 year ago

Thanks, @amogkam. Going forward is it something that should be stable?

ericl commented 1 year ago

We do not recommend using Ray client, especially for general workloads. As a workaround for this use case, you can probably try wrapping the code in a task, such as:

@ray.remote
def run_remotely():
   result = ray.data.range(100).groupby(lambda x: x % 3).count()
   return result

ray.get(run_remotely.remote())

This is much more optimized for Ray client usage, as it avoids most of the network traffic and API calls over the client connection.