ray-project / ray

Ray is a unified framework for scaling AI and Python applications. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
31.89k stars 5.43k forks source link

[Data] Allow split by column value in Dataset #45634

Open terraflops1048576 opened 1 month ago

terraflops1048576 commented 1 month ago

Description

Allow ray.data.Dataset to be grouped by and then split into separate Datasets by a column value. In particular, ray.data.Dataset should have a split_by_key function that splits the Dataset into a dict or list of separate Datasets based on a particular column value. This is basically the groupby of Pandas.

Use case

Currently, the use case is trying to take a Ray Dataset and split it into shards by some column value to write to separate files using a Ray DataSink. Currently, this is not possible, because the groupby operation only returns a GroupedData, from which you have to use map_groups. The current solution is to write some custom file writing logic inside map_groups and call materialize() on the resulting Dataset, which is not a good API and prevents other use cases, like passing different Datasets to different workers, for example.

wingkitlee0 commented 4 weeks ago

Is writing a partitioned datasets the primary use case? if so, this may be related to #42228 and #42288

If I understand your example correctly, writing a dataset into groups (or partitions) will be like

for ds_by_group in ds.split_by_key("group_key"):
  ds_by_group.write_parquet("target")

Afaik, the ray data write_* api are blocking in the current design.

In #42288, I proposed a solution by align the blocks with keys. In that case,

ds.repartition_by_key("group_key").write_parquet("target")

will write Ngroup files.

terraflops1048576 commented 3 weeks ago

Well, I would also like the ability to treat each group as a separate Dataset, for example applying map_batches to each, but yes, ultimately the use case I'm targeting right now is writing.

pinduzera commented 2 weeks ago

To add to this, I am looking for the similar feature for general data processing, not necessarily for model training, but to stream the data to each node by group (single or multiple keys). So, imagine I have g1, g2, g3, g4, g5 (all bank data of a given user), I want to be able to process each user independently (or even send some groups together efficiently). And send those groups to each node in one (or both) of the following options:

Assuming I have 1 node for each group, I can just do something like shards = ds.streaming_split(key="KeyColumn", n=5) and each node would receive an iterable with a single group.

If you don't have enough nodes (n=3) for for each group, maybe send (g1,g2) to shard 1, and (g3) to shard 2, and (g3,g4) to shard 3, somewhat split according to the number of rows.