ray-project / ray

Ray is an AI compute engine. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
33.64k stars 5.72k forks source link

[Data] DataIterator destructor triggered in ray actor remote call #41417

Open Zandew opened 11 months ago

Zandew commented 11 months ago

What happened + What you expected to happen

Copied from #41299:

When adding a lock to _get_or_create_stats_actor(), the result was a deadlock.

Somehow it looks like theDataIterator.__del__ of an old iterator belonging to a previous test is triggered somewhere while calling _StatsActor.remote(). I don't know why it wasn't gc'd before this. This is the stack trace of the main thread, I labelled the two entrances:

File "/Users/andrewxue/fork/ray/python/ray/data/tests/test_map.py", line 422, in test_map_batches_extra_args
    ds = ray.data.range(5)
File "/Users/andrewxue/fork/ray/python/ray/data/read_api.py", line 226, in range
    return read_datasource(datasource, parallelism=parallelism)
File "/Users/andrewxue/fork/ray/python/ray/_private/auto_init_hook.py", line 25, in auto_init_wrapper
    return fn(*args, **kwargs)
File "/Users/andrewxue/fork/ray/python/ray/data/read_api.py", line 371, in read_datasource
    block_list = LazyBlockList(
File "/Users/andrewxue/fork/ray/python/ray/data/_internal/lazy_block_list.py", line 102, in __init__
     FIRST ENTRANCE self._stats_actor = _get_or_create_stats_actor()                          
File "/Users/andrewxue/fork/ray/python/ray/data/_internal/stats.py", line 363, in _get_or_create_stats_actor
    return _StatsActor.options(
File "/Users/andrewxue/fork/ray/python/ray/actor.py", line 687, in remote
    return actor_cls._remote(args=args, kwargs=kwargs, **updated_options)
File "/Users/andrewxue/fork/ray/python/ray/_private/auto_init_hook.py", line 25, in auto_init_wrapper
    return fn(*args, **kwargs)
File "/Users/andrewxue/fork/ray/python/ray/util/tracing/tracing_helper.py", line 388, in _invocation_actor_class_remote_span
    return method(self, args, kwargs, *_args, **_kwargs)
File "/Users/andrewxue/fork/ray/python/ray/actor.py", line 787, in _remote
    return self._remote(args, kwargs, **updated_options)
File "/Users/andrewxue/fork/ray/python/ray/_private/auto_init_hook.py", line 25, in auto_init_wrapper
    return fn(*args, **kwargs)
File "/Users/andrewxue/fork/ray/python/ray/util/tracing/tracing_helper.py", line 388, in _invocation_actor_class_remote_span
    return method(self, args, kwargs, *_args, **_kwargs)
File "/Users/andrewxue/fork/ray/python/ray/actor.py", line 849, in _remote
    ray.get_actor(name, namespace=namespace)
File "/Users/andrewxue/fork/ray/python/ray/_private/auto_init_hook.py", line 25, in auto_init_wrapper
    return fn(*args, **kwargs)
File "/Users/andrewxue/fork/ray/python/ray/data/iterator.py", line 855, in __del__
    StatsManager.clear_iteration_metrics(self._get_dataset_tag())
File "/Users/andrewxue/fork/ray/python/ray/data/_internal/stats.py", line 517, in clear_iteration_metrics
    self._stats_actor().clear_iteration_metrics.remote(dataset_tag)
File "/Users/andrewxue/fork/ray/python/raydata/_internal/stats.py", line 416, in _stats_actor
    SECOND ENTRANCE self._stats_actor_handle = _get_or_create_stats_actor()                    

I'm not sure how it jumps from fn(*args, **kwargs) to StatsManager.clear_iteration_metrics(self._get_dataset_tag()). This part is flaky too, sometimes __del__ is called from someplace in pickle.dumps while serializing the _StatsActor.

This is sometimes reproducible by running python -m pytest test_map.py::test_map_batches_basic test_map.py::test_map_batches_extra_args. The DataIterator that is deleted belongs to the last iterator in test_map_batches_basic:

    # Test the wrong return value raises an exception.
    ds = ray.data.read_parquet(str(tmp_path))
    with pytest.raises(ValueError):
        ds_list = ds.map_batches(
            lambda df: 1, batch_size=2, batch_format="pyarrow"
        ).take()

This error does not happen if we remove this part of the test or make it not raise an error. It also doesn't happen if we gc.collect() after this.

This was temporarily solved by using an rlock.

Versions / Dependencies

https://github.com/ray-project/ray/commit/ebc7a39808878050ff94af6d7106d47e93a2e1e3

Reproduction script

above

Issue Severity

Low: It annoys or frustrates me.

anyscalesam commented 10 months ago

@c21 @Zandew can you follow up on target ray release priority and size?

Zandew commented 10 months ago

I think we were going to revisit this once LazyBlockList is deprecated, which is scheduled for 2.10. Although I think this error can potentially happen in other places as well.

anyscalesam commented 10 months ago

priority @Zandew and sizing of work here?