ibis-project / ibis

the portable Python dataframe library
https://ibis-project.org
Apache License 2.0
4.42k stars 544 forks source link

feat(pyspark): support udaf #9173

Open ted0928 opened 1 month ago

ted0928 commented 1 month ago

Is your feature request related to a problem?

No response

What is the motivation behind your request?

I was trying to apply a user-defined aggregate function to a groupped table. but only builtin supported in @ibis.udf.agg.

And i use the deprecated annotaion reduction, magically it works!

import ibis
from pyspark.sql import SparkSession
from ibis.legacy.udf.vectorized import reduction

@reduction(output_type=ibis.dtype("float"), input_type=[ibis.dtype("int32")])
def avg(x) -> float:
    return x.mean()

ibis.options.interactive = True
ibis.options.verbose = True
spark = SparkSession.builder \
    .getOrCreate()
connection = ibis.pyspark.connect(spark)

df = connection.create_view('source', ibis.memtable(dict(id1=[1, 2, 3, 1, 2, 1], id2=[4, 5, 6, 2, 3, 4])))
df = df.group_by(df.id1).aggregate(avg_id2=avg(df.id2))
print(df)
SELECT `t0`.`id1`, IBIS_UDF_AVG_12861BCE(`t0`.`id2`) AS `avg_id2` FROM `source` AS `t0` GROUP BY 1 LIMIT 11
┏━━━━━━━┳━━━━━━━━━━┓
┃ id1   ┃ avg_id2  ┃
┡━━━━━━━╇━━━━━━━━━━┩
│ int64 │ float64  │
├───────┼──────────┤
│     1 │ 3.333333 │
│     2 │ 4.000000 │
│     3 │ 6.000000 │
└───────┴──────────┘

Describe the solution you'd like

So is there any plan to migrate this annotation to new @ibis.udf.agg ?

What version of ibis are you running?

main

What backend(s) are you using, if any?

pyspark

Code of Conduct