delta-io / delta

An open-source storage framework that enables building a Lakehouse architecture with compute engines including Spark, PrestoDB, Flink, Trino, and Hive and APIs
https://delta.io
Apache License 2.0
7.65k stars 1.72k forks source link

Fail to merge with PythonUDF #653

Open YannByron opened 3 years ago

YannByron commented 3 years ago

When execute merge with udf by pyspark, the following exception is raised:

Caused by: java.lang.UnsupportedOperationException: Cannot evaluate expression: <lambda>(input[0, string, true]) at org.apache.spark.sql.catalyst.expressions.Unevaluable.eval(Expression.scala:304) at org.apache.spark.sql.catalyst.expressions.Unevaluable.eval$(Expression.scala:303) at org.apache.spark.sql.catalyst.expressions.PythonUDF.eval(PythonUDF.scala:52) at org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection.apply(InterpretedUnsafeProjection.scala:90) at org.apache.spark.sql.delta.commands.MergeIntoCommand$JoinedRowProcessor.processRow$1(MergeIntoCommand.scala:703) at org.apache.spark.sql.delta.commands.MergeIntoCommand$JoinedRowProcessor.$anonfun$processPartition$6(MergeIntoCommand.scala:713) at scala.collection.Iterator$$anon$10.next(Iterator.scala:459) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:512) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage5.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:277) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1473) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:286)

The test code is shown below:


from pyspark.sql.types import *
from pyspark.sql.functions import *

prefix_udf = udf(lambda s: "prefix_" + s, StringType())

table_path = "...."

spark.createDataFrame([{"id": "1", "body": "test_body"}]).write.format("delta").save(table_path)

target = DeltaTable.forPath(spark, table_path)
source = spark.createDataFrame([{"id": "1", "body": "new_body"}])

target.alias("target").merge(source.alias("source"),"source.id = target.id").whenMatchedUpdate(set = { "body": prefix_udf(col("source.body")) }).execute()```
tdas commented 3 years ago

This seems like a real issue. The way we are executing merge internally seem to not support python UDFs. Without fully rewriting the merge execution, this is pretty hard to fix.

Is it possible for you to define the UDF in Scala/Java, register the name, and then use it from python just by name?

YannByron commented 3 years ago

OK, I'll try it as your suggestion.

The current merge execution seems to skip all of the Optimizer's rules. If wanna support it, do you have some practicable idea?

scottsand-db commented 3 years ago

Thanks for bringing this to our attention. We will take a look.

harry19023 commented 2 years ago

Has there been any update here?

zsxwing commented 2 years ago

@harry19023 we don't have an ideal solution for this bug right now. Could you try to use Scala/Java UDFs instead?

harry19023 commented 2 years ago

I was actually just using this library to set up a local development environment for Databricks proper. Since this seems to be supported on the Databricks runtime, I'll have to change my development workflow to only use Databricks runtimes. Thanks for the update though!

jordanyakerstuzo commented 2 years ago

I'm having the same problem and can't use Scala/Java UDF's without changing out entire development workflow. Is any effort being made to fix this?

tdas commented 2 years ago

Yeah, this needs a lot of effort to rewrite merge implementation completely. Some one who really understands how spark handles python udfs and making it work. I really hope someone in the community can spend that time digging into this.

michaelromero commented 1 year ago

I'm just going to bump this. This is a pretty important feature to be able to debug reliably.

jmpalumbo commented 1 year ago

Not the same error, but I just received the following error when trying to merge using a tempView that is created from a PySpark df that has a column generated with a udf() that uses the monotonically_increasing_id() function: py4j.protocol.Py4JJavaError: An error occurred while calling o97.sql. : java.lang.UnsupportedOperationException: Cannot generate code for expression: <lambda>(monotonically_increasing_id())

The code takes a list of lists and turns them into strings so they can be added as a column to the df:

df = df.repartition(1).withColumn("temp", udf(lambda name: ",".join(table_names[name]) if type(table_names[name]) is list else None)(monotonically_increasing_id()))

I am not looking to rewrite the logic for this script. Any updates to the original poster's error? Anything I am missing here on my end? Not much info in terms of debugging.

zsxwing commented 1 year ago

This is a complicated issue and we haven't figured out a solution. One workaround would be persisting the data after evaluating UDFs.