databricks / koalas

Koalas: pandas API on Apache Spark
Apache License 2.0
3.32k stars 356 forks source link

groupby api .agg behavior changes depending upon the way the dataframe is created #2208

Closed nitinmnsn closed 2 years ago

nitinmnsn commented 2 years ago

pyspark pandas groupby aggregate function API depends upon whether the dataframe is pyspark.sql.dataframe.DataFrame or pyspark.pandas.frame.DataFrame. Is this intended behaviour? Also, How do I run groupby .agg if the dataframe is pyspark.pandas.frame.DataFrame? Seems like registering pandas_udf is necessary to run them.

from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql import SparkSession
from pyspark import pandas as ps
import pandas as pd
spark = SparkSession.builder.getOrCreate()
df = ps.DataFrame({'A': 'a a b'.split(),
                   'B': [1, 2, 3],
                   'C': [4, 6, 5]}, columns=['A', 'B', 'C'])

from pyspark.sql.functions import pandas_udf, PandasUDFType

@pandas_udf('float', PandasUDFType.GROUPED_AGG)
def agg_a(x):
    return (x**2).mean()

@pandas_udf('float', PandasUDFType.GROUPED_AGG)
def agg_b(x):
    return x.mean()
df.groupby('A').agg(agg_a('B'), agg_b('C')).show()

Output: ValueError: aggs must be a dict mapping from column name to aggregate functions (string or list of strings).

However, if i create the dataframe without using pandas from pyspark the exact same code works without any errors

from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql import SparkSession
from pyspark import pandas as ps
import pandas as pd
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame(pd.DataFrame(
{
'A': 'a a b'.split(),
'B': [10, 20, 30],
'C': [4, 6, 5]
},
columns=['A', 'B', 'C']
))
from pyspark.sql.functions import pandas_udf, PandasUDFType

@pandas_udf('float', PandasUDFType.GROUPED_AGG)
def agg_a(x):
    return (x**2).mean()

@pandas_udf('float', PandasUDFType.GROUPED_AGG)
def agg_b(x):
    return x.mean()
df.groupby('A').agg(agg_a('B'), agg_b('C')).show()
Output:
+---+--------+--------+
|  A|agg_a(B)|agg_b(C)|
+---+--------+--------+
|  a|   250.0|     5.0|
|  b|   900.0|     5.0|
itholic commented 2 years ago

Let me close since it's duplicated with https://github.com/databricks/koalas/issues/2201.

And this is technically a bug in the pandas API on Spark (pyspark.pandas), not in Koalas, so it should be filed in Apache Spark JIRA.

Of course, Koalas and pandas on Spark have almost the same behavior, but they should be treated as different projects as pandas on Spark is much more actively updated now.