ray-project / ray

Ray is an AI compute engine. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
34.04k stars 5.78k forks source link

[Datasets] Support pipelining on data blocks within a single file (fully streaming reads) #24264

Closed GuillaumeDesforges closed 11 months ago

GuillaumeDesforges commented 2 years ago

Description

The file based datasource should only read block by block, and not block for a whole list of files.

My understanding is that there are only as many ReadTasks as parallelism: https://github.com/ray-project/ray/blob/6560a2bf5df02ba7a975c249698f90bd850c2390/python/ray/data/datasource/file_based_datasource.py#L331-L333

and a ReadTask is ran from start to end in one go.

Use case

I have many (>1000) image files of big images (>1GB) from which I can generate ~500 "patch" images (using openslide for instance), which I use in a machine learning pipeline.

I want to start training my model as data is loading, so the snippet

class SomeDatasource(ray.data.datasource.file_based_datasource.FileBasedDatasource):
    def _read_stream(
        self,
        native_file: pyarrow.NativeFile,
        native_file_path: str,
        **reader_args,
    ) -> Iterator[ray.data.block.Block]:
        for patches in gen_patches(native_file):
            yield pyarrow.Table.from_pydict(
                {"value": ray.data.extensions.TensorArray(some_data)}
            )

ds = ray.data.read_datasource(
        SomeDatasource(),
        parallelism=1,
        paths=paths,
        filesystem=filesystem,
        ray_remote_args={"num_gpus": 1},
    )

some_batch = next(ds.iter_batches(batch_size=1))

should not load all the data before yielding one batch.

GuillaumeDesforges commented 2 years ago

From conversations on Slack with @clarkzinzow :

Clark Zinzow (Ray team) :
Datasets always kicks off reading of the first block, so if you’re giving parallelism=1, that means that it will read all provided paths into one block, which will always be eagerly read. We do this eager read so users can rapidly introspect schema and other metadata, along with the first few rows of the data. If you were to provide parallelism=len(paths), this should read at most one file.

Guillaume Desforges :
I don't get access to len(paths) as those are computed inside of FileBasedDataSource.prepare_read
https://github.com/ray-project/ray/blob/6560a2bf5df02ba7a975c249698f90bd850c2390/python/ray/data/datasource/file_based_datasource.py#L270
Users will pass a single path which is the top directory of the data directory
So the actual len(paths) is not available at the time of calling read_datasource. I could ask users to expand the path outside and set parallelism, but i believe the UX of ray would just benefit from making a block per expanded file path instead of parallelism

GuillaumeDesforges commented 2 years ago

Using a very high value for parallelism crashes unexpectedly: #24296

GuillaumeDesforges commented 2 years ago

From my investigations, a datasource can't be lazily generating data.

A datasource creates a list of ReadTask which are then consumed by ray.data.read_datasource.
https://github.com/ray-project/ray/blob/026849cd272013003212159c1a12de163449b761/python/ray/data/read_api.py#L227
https://github.com/ray-project/ray/blob/026849cd272013003212159c1a12de163449b761/python/ray/data/read_api.py#L259
https://github.com/ray-project/ray/blob/026849cd272013003212159c1a12de163449b761/python/ray/data/read_api.py#L263-L264

In ReadTask's __call__, all blocks are evaluated until exhaustion from the read_fn.
https://github.com/ray-project/ray/blob/026849cd272013003212159c1a12de163449b761/python/ray/data/datasource/datasource.py#L156-L159

So, when calling dataset.iter_batches, even if the passed read_fn is designed to yield results, like in FileBasedDatasource, ray will compute all the blocks until it (read_fn) is done.

Also, since read_datasource relies on LazyBlockList,
https://github.com/ray-project/ray/blob/026849cd272013003212159c1a12de163449b761/python/ray/data/read_api.py#L259
in order for laziness to be possible, one would need LazyBlockList.iter_blocks_with_metadata not to expect 1 task to produce 1 block
https://github.com/ray-project/ray/blob/026849cd272013003212159c1a12de163449b761/python/ray/data/impl/lazy_block_list.py#L403-L406

So there would be some core changes to do in order to achieve laziness of Datasource at the task level

clarkzinzow commented 2 years ago

@GuillaumeDesforges What you're looking for here is for each ReadTask to be able to yield one or more blocks and for downstream operations to be able to start execution on those individual blocks; i.e., you're wanting to be able to pipeline execution within a single ReadTask's blocks.

in order for laziness to be possible, one would need LazyBlockList.iter_blocks_with_metadata not to expect 1 task to produce 1 block

This aspect is already supported via dynamic block splitting (disabled by default due to some remaining fault-tolerance and performance work), where a ReadTask is able to dynamically (at read task execution time) produce more than one block. See the code path here, where we treat the ReadTask output as a partition containing one or more blocks.

What's missing is allowing the read stage to yield control to downstream operations and pipeline execution on these blocks.

GuillaumeDesforges commented 1 year ago

Hi! In ray 2.4 changelog i can see https://github.com/ray-project/ray/releases/tag/ray-2.4.0

We've implemented asynchronous batch prefetching of Dataset.iter_batches (doc), improving performance by fetching data in parallel while the main thread continues processing, thus reducing waiting time.

Does it address this issue?

GuillaumeDesforges commented 11 months ago

Hey @anyscalesam, I'm excited to see this completed!

Out of curiosity, would you mind linking the commit that implements it? I'm curious to know how you've handled this :eyes: