rapidsai / cudf

cuDF - GPU DataFrame Library
https://docs.rapids.ai/api/cudf/stable/
Apache License 2.0
8.3k stars 886 forks source link

[FEA] Introduce distributed computing stages, then combine groupby and reduction aggregations #10920

Open ttnghia opened 2 years ago

ttnghia commented 2 years ago

Background. Distributed computing aggregations are typically performed in 3 stages:

  1. Update: Computes intermediate results at each node.
  2. Merge: Merge multiple intermediate results of the update stages from different nodes.
  3. Evaluate: Compute the final result of the aggregation.

Only the result of the last stage is what the users want to get. The intermediate results are typically used internally by the library and do not need to be exposed to the users.

However, currently in libcudf, for several aggregations, we have implemented separate public aggregations for each of these stages. Let's look at several aggregations:

These aggregations generate only (intermediate) results that must be used together to generate the final result. Thus, it makes more sense to unify them together so the intermediate results of one aggregation class can be processed by the same class in the next stage.

Describe the solution We should only provide just one public aggregation for each kind of operation that has the right and meaningful name. For example, just STANDARD_DEVIATION aggregation that can perform all the Update, Merge, and Evaluate stages. Upon constructing an instance of the aggregation, we pass in a parameter specifying which stage the aggregation should do its job. Such parameter can be something like this:

enum class distributed_computing_stage {
UPDATE,
MERGE,
EVALUATE,
ALL_IN_ONE // Generate the final result directly in just one pass (no distributed computing supported)
};

So we will construct the aggregation like this:

template <typename Base = aggregation>
std::unique_ptr<Base> make_std_aggregation(distributed_computing_stage stage = ALL_IN_ONE, size_type ddof = 1);

Benefits The architecture I propose here can make the aggregations sound more meaningful. For example, we have a STANDARD_DEVIATION aggregation that will produce its own intermediate results, which will be merged by the same STANDARD_DEVIATION aggregation class, and the final result can be computed by the same STANDARD_DEVIATION aggregation class. It makes much more sense than computing the intermediate results by calling M2 aggregation, then calling MERGE_M2 aggregation, then evaluating the final result.

It also can simplify the implementation of aggregations a lot. It allows to reduce the number of classes, reducing the number of factory methods (make_xxx_aggregation), reducing the number of related methods (like std::vector<std::unique_ptr<aggregation>> simple_aggregations_collector::visit and void aggregation_finalizer::visit) etc.

jrhemstad commented 2 years ago

Note that this discussion isn't unique to distributed computing. It applies to any form of "partitioned" input, e.g., even if you're doing "out of core" computing on a single GPU.

I've also disliked the need for the separate MERGE_* aggregations. While I'm not convinced on the proposed spelling, I do think that something like parameterizing the aggregation is a good direction to explore.

The other thing I'd note is that our primary consumers of aggregation APIs do their own logic for implementing partitioned aggregations.

If we tried to hide too much of the details of the partitioned aggregation internals from them, it would become unusable. For instance, neither Dask nor Spark call libcudf's mean_aggregation. They compute the sum/count themselves and do the elementwise division. Obviously we would never eliminate sum/count as independent aggregations, but it does illustrate that a consumer of libcudf may have a preferred way of deriving their final result that would differ from the proposal here.

I'll have to think on this some more and come back with a more complete response.

github-actions[bot] commented 2 years ago

This issue has been labeled inactive-30d due to no recent activity in the past 30 days. Please close this issue if no further response or action is needed. Otherwise, please respond with a comment indicating any updates or changes to the original issue and/or confirm this issue still needs to be addressed. This issue will be labeled inactive-90d if there is no activity in the next 60 days.

github-actions[bot] commented 2 years ago

This issue has been labeled inactive-90d due to no recent activity in the past 90 days. Please close this issue if no further response or action is needed. Otherwise, please respond with a comment indicating any updates or changes to the original issue and/or confirm this issue still needs to be addressed.