rapidsai / cudf

cuDF - GPU DataFrame Library
https://docs.rapids.ai/api/cudf/stable/
Apache License 2.0
8.32k stars 886 forks source link

[FEA] Support segmented reductions (MIN/MAX/COUNT DISTINCT) in cuDF `list` accessor #12301

Open infzo opened 1 year ago

infzo commented 1 year ago

Is your feature request related to a problem? Please describe.

Currently, we use cuDF to implement distributed group aggregation operations. That is, GROUP BY + DISTINCT is performed on the local node, and GROUP BY + MIN/MAX/COUNT DISTINCT are performed on the merge node. The LIST is transferred as an intermediate format, but the aggregation operation cannot be implemented.

Describe the solution you'd like

Group + Aggregation supports the list format.

捕获2

Describe alternatives you've considered

Aggregation operators are supported in the list format. For example, the current list supports only len, and aggregation operators such as count, min, max, and avg are expected to be added.

捕获

ttnghia commented 1 year ago

What do you mean groupby + aggregation? If I guessed correctly, we already have MERGE_LIST aggregation that does the merging for lists.

ttnghia commented 1 year ago

If you want to do groupby min/max etc. on lists as the input values then we recently have enough tools to support them---just a matter of time to enable them.

infzo commented 1 year ago

If you want to do groupby min/max etc. on lists as the input values then we recently have enough tools to support them---just a matter of time to enable them.

@ttnghia Yes, something like this; there are a few questions:

  1. How long will this feature be supported in the release version?
  2. Our requirements are urgent. Is there a faster way to implement this function in the current version?

SQL case:

捕获3

mjshare commented 1 year ago

@ttnghia

How do I perform operations such as count and sum after deduplication in the list?

ttnghia commented 1 year ago

Oh sorry, what you've shown in the picture is per-list aggregation on the list elements (I thought that you want aggregation just at the lists level). This needs to implement new aggregations from scratch. I believe that @GregoryKimball can answer more questions about prioritizing for this FEA.

ttnghia commented 1 year ago

After thinking shortly, I realized that this doesn't require any new cudf aggregations. You can have it by:

  1. Merge your lists corresponding to the same key using MERGE_LIST aggregation, then
  2. Call segmented min/max/sum on the list elements using list offsets.

However, I'm not sure how you can leverage these existing (internal) segmented APIs to do that.

shwina commented 1 year ago

@infzo perhaps I'm misunderstanding, but can you not just do this?

In [67]: df1 = cudf.DataFrame({'a': [1, 1, 2], 'b': [1, cudf.NA, 3], 'c': ['x', 'x', 'z']})

In [68]: df2 = cudf.DataFrame({'a': [1, 3, 3], 'b': [1, cudf.NA, 5], 'c': ['x', 'z', cudf.NA]})

In [69]: cudf.concat([df1, df2]).groupby('a').agg({'b': 'max', 'c': 'nunique'})
Out[69]: 
   b  c
a      
1  1  1
2  3  1
3  5  1
mjshare commented 1 year ago

After thinking shortly, I realized that this doesn't require any new cudf aggregations. You can have it by:

  1. Merge your lists corresponding to the same key using MERGE_LIST aggregation, then
  2. Call segmented min/max/sum on the list elements using list offsets.

However, I'm not sure how you can leverage these existing (internal) segmented APIs to do that. @ttnghia ,The problem is that the list does not support count, max, and sum with groupby Does the community have a plan to support it? We have scenarios to support it?

infzo commented 1 year ago

After thinking shortly, I realized that this doesn't require any new cudf aggregations. You can have it by:

  1. Merge your lists corresponding to the same key using MERGE_LIST aggregation, then
  2. Call segmented min/max/sum on the list elements using list offsets.

However, I'm not sure how you can leverage these existing (internal) segmented APIs to do that.

@ttnghia Yes, this is our current solution. Segmented min/max/sum is a requirement for the list method. For details, see Describe alternatives you've considered. The current cudf version does not have these operators.

infzo commented 1 year ago

@infzo perhaps I'm misunderstanding, but can you not just do this?

In [67]: df1 = cudf.DataFrame({'a': [1, 1, 2], 'b': [1, cudf.NA, 3], 'c': ['x', 'x', 'z']})

In [68]: df2 = cudf.DataFrame({'a': [1, 3, 3], 'b': [1, cudf.NA, 5], 'c': ['x', 'z', cudf.NA]})

In [69]: cudf.concat([df1, df2]).groupby('a').agg({'b': 'max', 'c': 'nunique'})
Out[69]: 
   b  c
a      
1  1  1
2  3  1
3  5  1

@shwina A use case given above. In actual scenarios, more than one billion rows of data are queried in the table. As a result, the following problems occur:

  1. The GPU of a single machine cannot accommodate such a large amount of data, and the cuDF DataFrame has an upper limit on the number of rows.
  2. A large amount of data consumes a long time for network transmission, which reduces processing efficiency.
shwina commented 1 year ago

Thanks for the context, @infzo. I think I understand. How about doing it this way then?

The following happens on the local nodes:

In [79]: df1 = cudf.DataFrame({'a': [1, 1, 2], 'b': [1, cudf.NA, 3], 'c': ['x', 'x', 'z']})

In [80]: df2 = cudf.DataFrame({'a': [1, 3, 3], 'b': [1, cudf.NA, 5], 'c': ['x', 'z', cudf.NA]})

In [81]: df1
Out[81]: 
   a     b  c
0  1     1  x
1  1  <NA>  x
2  2     3  z

In [82]: df2
Out[82]: 
   a     b     c
0  1     1     x
1  3  <NA>     z
2  3     5  <NA>

In [83]: df1_unique = df1.groupby('a', as_index=False).unique()

In [84]: df2_unique = df2.groupby('a', as_index=False).unique()

The following happens on the merge node:

In [85]: df_merged = cudf.concat([df1_unique, df2_unique], ignore_index=True)

In [86]: b_max = df_merged[['a', 'b']].explode('b').groupby('a').max()

In [87]: c_nunique = df_merged[['a', 'c']].explode('c').groupby('a').nunique()

In [88]: result = cudf.concat([b_max, c_nunique], axis=1)

In [89]: result
Out[89]: 
   b  c
a      
1  1  1
2  3  1
3  5  1
mjshare commented 1 year ago

Thanks for the context, @infzo. I think I understand. How about doing it this way then?

The following happens on the local nodes:

In [79]: df1 = cudf.DataFrame({'a': [1, 1, 2], 'b': [1, cudf.NA, 3], 'c': ['x', 'x', 'z']})

In [80]: df2 = cudf.DataFrame({'a': [1, 3, 3], 'b': [1, cudf.NA, 5], 'c': ['x', 'z', cudf.NA]})

In [81]: df1
Out[81]: 
   a     b  c
0  1     1  x
1  1  <NA>  x
2  2     3  z

In [82]: df2
Out[82]: 
   a     b     c
0  1     1     x
1  3  <NA>     z
2  3     5  <NA>

In [83]: df1_unique = df1.groupby('a', as_index=False).unique()

In [84]: df2_unique = df2.groupby('a', as_index=False).unique()

The following happens on the merge node:

In [85]: df_merged = cudf.concat([df1_unique, df2_unique], ignore_index=True)

In [86]: b_max = df_merged[['a', 'b']].explode('b').groupby('a').max()

In [87]: c_nunique = df_merged[['a', 'c']].explode('c').groupby('a').nunique()

In [88]: result = cudf.concat([b_max, c_nunique], axis=1)

In [89]: result
Out[89]: 
   b  c
a      
1  1  1
2  3  1
3  5  1

@shwina This method may have a risk: If there are multi-column aggregation and multi-column groupby, explode expansion may cause insufficient GPU memory. Is there any solution to this problem?

shwina commented 1 year ago

If there are multi-column aggregation and multi-column groupby, explode expansion may cause insufficient GPU memory

You're right that the call to explode() does incur additional memory cost although that cost should be linear with the number of elements in the list column ('b' or 'c' in this case).

What I'm proposing here is a workaround that is compatible with the existing Pandas/cuDF API. Segmented reduction would be the appropriate solution to the problem, but those are not currently available via the Python API.