ibis-project / ibis

the portable Python dataframe library
https://ibis-project.org
Apache License 2.0
5.33k stars 599 forks source link

bug: UDFs not present on pyspark workers #10273

Open NickCrews opened 1 month ago

NickCrews commented 1 month ago

What happened?

Discovered in https://github.com/NickCrews/mismo/issues/64. CC @jstammers. Here is a more minimal reproducer.

Run with uv run script.py to get uv to install the deps automatically, or install them manually and then run python script.py.

# /// script
# requires-python = ">=3.11"
# dependencies = [
#     "ibis-framework[duckdb]",
#     "pyspark",
# ]
# ///
from __future__ import annotations

import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

import ibis

spark = SparkSession.builder.master("local").appName("My App").getOrCreate()

@ibis.udf.scalar.python
def ibis_udf_global(x: int) -> int:
    return x + 1

@F.pandas_udf(returnType="int")
def spark_udf(inp: pd.Series) -> pd.Series:
    @ibis.udf.scalar.python
    def ibis_udf_local(x: int) -> int:
        return x + 1

    t = ibis.memtable({"inp": inp})
    t = t.mutate(out=ibis_udf_global(t.inp))
    return t.out.to_pandas()

df = spark.createDataFrame(pd.DataFrame({"inp": (1, 2, 3)}))
print(df.withColumn("prediction", spark_udf(F.col("inp"))).toPandas())

This gives the below AttributeError. If I swap out the call to ibis_udf_global with the call to ibis_udf_local, then this script works.

Traceback ```python-traceback File "/Users/nc/code/ibis/.scratch/bug_udf_pyspark.py", line 36, in print(df.withColumn("prediction", spark_udf(F.col("inp"))).toPandas()) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/Users/nc/.cache/uv/archive-v0/jW0IZYuX57G6RMPQ9lhv3/lib/python3.11/site-packages/pyspark/sql/pandas/conversion.py", line 202, in toPandas rows = self.collect() ^^^^^^^^^^^^^^ File "/Users/nc/.cache/uv/archive-v0/jW0IZYuX57G6RMPQ9lhv3/lib/python3.11/site-packages/pyspark/sql/dataframe.py", line 1263, in collect sock_info = self._jdf.collectToPython() ^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/Users/nc/.cache/uv/archive-v0/jW0IZYuX57G6RMPQ9lhv3/lib/python3.11/site-packages/py4j/java_gateway.py", line 1322, in __call__ return_value = get_return_value( ^^^^^^^^^^^^^^^^^ File "/Users/nc/.cache/uv/archive-v0/jW0IZYuX57G6RMPQ9lhv3/lib/python3.11/site-packages/pyspark/errors/exceptions/captured.py", line 185, in deco raise converted from None pyspark.errors.exceptions.captured.PythonException: An exception was thrown from the Python worker. Please see the stack trace below. Traceback (most recent call last): File "/Users/nc/code/ibis/.scratch/bug_udf_pyspark.py", line 31, in spark_udf t = t.mutate(out=ibis_udf_global(t.inp)) ^^^^^^^^^^^^^^^^^^^^^^ File "/Users/nc/.cache/uv/archive-v0/jW0IZYuX57G6RMPQ9lhv3/lib/python3.11/site-packages/ibis/common/deferred.py", line 613, in inner return func(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^ File "/Users/nc/.cache/uv/archive-v0/jW0IZYuX57G6RMPQ9lhv3/lib/python3.11/site-packages/ibis/expr/operations/udf.py", line 165, in construct return node(*args, **kwargs).to_expr() ^^^^^^^^^^^^^^^^^^^^^ File "/Users/nc/code/ibis/ibis/common/bases.py", line 72, in __call__ return cls.__create__(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/Users/nc/code/ibis/ibis/common/grounds.py", line 120, in __create__ return super().__create__(**kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/Users/nc/code/ibis/ibis/common/grounds.py", line 199, in __init__ object.__setattr__(self, name, value) AttributeError: 'ibis_udf_global_0' object has no attribute 'x' ```

What version of ibis are you using?

main

What backend(s) are you using, if any?

pyspark

Relevant log output

No response

Code of Conduct

cpcloud commented 2 weeks ago

This is definitely important, if not to implement then to understand why UDFs aren't present.