ray-project / ray

Ray is a unified framework for scaling AI and Python applications. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
33.3k stars 5.63k forks source link

[data] support streaming_shuffle #37593

Open raulchen opened 1 year ago

raulchen commented 1 year ago

Currently, data supports 2 types of shuffling: 1) global shuffle: ds.random_shuffle(): this approach produces the best randomness but is expensive. 2) local shuffle: ds.iter_batches(local_shuffle_buffer_size=): this approach sometime doesn't produce enough randomness because the shuffle buffer size is bound by the memory of a single node. Also the shuffling happens on the node that calls iter_batches, which is usually a GPU node.

To overcome the drawbacks, we can implement a distributed streaming_shuffle operation. The op should allow users to shuffle data within a specific window, either based on number of rows or sizes of data. Example:

train_ds = train_ds.streaming_shuffle(num_rows=N)

def train_loop_per_worker():
    it = session.get_dataset_shard("train")

    for _ in range(num_epochs):
        # No need of local shuffling here.
        for batch in it.iter_batches(batch_size=N):
            pass

trainer = Trainer(
   train_loop_per_worker,
   datasets={"train": train_ds},
)

Implementation wise, we can start with a simple approach. That is, first shuffle the order of the blocks in the window, then shuffle the data within each block. This doesn't shuffle the data across blocks, but is easier to implement as the first step.

ericl commented 1 year ago

Regarding the shuffle semantics, can we make sure we provide an understandable randomness guarantee, such as "at least as random as reservoir sampling(N)"?

It's unclear what kind of randomness is provided by shuffling only within blocks.

raulchen commented 1 year ago

Regarding the shuffle semantics, can we make sure we provide an understandable randomness guarantee, such as "at least as random as reservoir sampling(N)"?

It's unclear what kind of randomness is provided by shuffling only within blocks.

good point