Eventual-Inc / Daft

Distributed data engine for Python/SQL designed for the cloud, powered by Rust
https://getdaft.io
Apache License 2.0
2.32k stars 163 forks source link

Introduce a map_partition #3187

Open MisterKloudy opened 1 week ago

MisterKloudy commented 1 week ago

Is your feature request related to a problem?

Certain operations which require shuffles are often very expensive but there are many cases whereby we can avoid these expensive global shuffles. Sometimes, we have already strategically partitioned data which can use local sorts/aggregations to produce the same results as the global shuffle with much less resources. For example, if I have 26 partitions which are partitioned on the first alphabet of a text column, sorting within each partition will give the same results as sorting globally.

Describe the solution you'd like

Expose a map_partitions API which can take each partition in as a separate daft dataframe so that we can reuse any logic that is applicable for the unpartitioned DF on each partition separately.

For example, if I have the following processes for a DF:

df = df.with_column("x", udf("z"))
df = df.sort(col('x') + col('y'))

then this could be equivalent to, but more optimized if I could run

def process_df(df):
    df = df.with_column("x", udf("z"))
    df = df.sort(col('x') + col('y'))

df = df.map_partitions(process_df)

The risk would be that users need to know that this could end up with different results if the right partitioning was not done beforehand but this would be a big timesaver for cases where global shuffles are way too expensive.

Describe alternatives you've considered

groupby+map_groups is also possible but it is also a global shuffle. It's also a lot less convenient to work with grouped DFs as compared to DFs due to the limited APIs available for grouped DFs.

iter_partitions is difficult to use because the Table/ObjectRef is not as user-friendly as a DF and probably more useful for cases where we do not require the DF to be returned back after the transformation, like in pyspark's forEachPartition.

Additional Context

Mapping transformations by partition to return a transformed DF like in https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.mapPartitions.html

Would you like to implement a fix?

No

jaychia commented 1 week ago

I'm sensing that perhaps the underlying problem is more so that Daft cannot effectively understand the user's logical partitioning of the data in storage?

In the above example, if Daft was able to understand that your data is partitioned by the first alphabet of a text column when reading the data, then it should understand that it can elide the shuffle for a sort.

Would love to explore some potential APIs for users to provide hints here.

Note that Daft gives no behavior guarantees about how it reads the data (it could split files or coalesce files into partitions depending on various settings) so it is relatively dangerous for users to depend on partitioning behavior.

MisterKloudy commented 1 week ago

Yes hints would also help, but being able to do this explicitly would probably be the most straightforward since that I would then be able to easily visualize what is happening and make use of these optimizations when writing my transformations. Hints would actually be harder to read and I think it would be less flexible than being able to map partitions and treating each partition as a new DF. This is actually more of syntactic sugar for me to perform a read on each partition independently and perform the required transformations in parallel.

The difficulty in understanding various settings which could affect the planning is actually a bigger barrier for me. It would be great if the parameters for these settings like the morsel size, shuffle default number of partitions etc are dependent on variables which represent the state of the resources available or the pending workload, such as the size of the parquet, the number of rows in the parquet, the cluster size, etc. instead of a fixed value which is hard to tune for.