Eventual-Inc / Daft

Distributed DataFrame for Python designed for the cloud, powered by Rust
https://getdaft.io
Apache License 2.0
2.08k stars 141 forks source link

Add global shuffle #2612

Open sagiahrac opened 1 month ago

sagiahrac commented 1 month ago

Is your feature request related to a problem? Please describe. Daft does not provide a native operation for global shuffle. The .sample method doesn't change the rows order, and .repartition keeps the original order of rows with respect to the original dataframe in each partition.

df_a = daft.from_pydict({"char": [f"a{i}" for i in range(10)]})
df_b = daft.from_pydict({"char": [f"b{i}" for i in range(10)]})

df = df_a.concat(df_b)
df.repartition(1).to_pydict()
{'char': ['a0',
  'a1',
  'a2',
  'a3',
  'a4',
  'a5',
  'a6',
  'a7',
  'a8',
  'a9',
  'b0',
  'b1',
  'b2',
  'b3',
  'b4',
  'b5',
  'b6',
  'b7',
  'b8',
  'b9']}

Describe the solution you'd like After assigning each row a random partition, shuffle each partition. Add an optional way to control the seed.

Describe alternatives you've considered

  1. Add a column of random integers and sort the dataframe by it (requires calculating the size of that dataframe)
  2. Materializing the dataframe as shuffle is a global operation anyways and shuffle with polars (with .sample)
  3. Iterating over partitions and shuffling each partition:
class MyDataset(torch.utils.data.IterableDataset):
    df: daft.DataFrame
    ...

    def __iter__(self):
        df = self.df.repartition(None)
        for partition in df.iter_partitions():
            samples = partition.to_pylist()
            np.random.shuffle(samples)
            yield from samples
ccmao1130 commented 2 weeks ago

Slack thread on Daft for more context: https://dist-data.slack.com/archives/C041NA2RBFD/p1720707378027859