delta-io / delta

An open-source storage framework that enables building a Lakehouse architecture with compute engines including Spark, PrestoDB, Flink, Trino, and Hive and APIs
https://delta.io
Apache License 2.0
7.31k stars 1.64k forks source link

[BUG] Delta zorder operation fails with Cannot evaluate expression: rangepartitionid exception #1311

Open mtNachiket opened 1 year ago

mtNachiket commented 1 year ago

Bug

zorder command fails to run with Caused by: java.lang.UnsupportedOperationException: Cannot evaluate expression: rangepartitionid(input[28, bigint, true], 1000) exception. This is observed with Delta OSS 2.0.0 and Spark 3.2.1

Describe the problem

I have a delta table available in s3 and when I tried to apply zorder on that table by using latest Delta OSS 2.0.0 ad Spark 3.2.1 (Pyspark), operation failed with Caused by: java.lang.UnsupportedOperationException: Cannot evaluate expression: rangepartitionid(input[28, bigint, true], 1000) exception. I tried with different columns but still error was persistent. Referred delta documentation for Zordering

Steps to reproduce

Use source csv attached with this bug and run below commands in pyspark shell. source_data.csv

  1. Run pyspark --packages io.delta:delta-core_2.12:2.0.0 with Spark 3.2.1 source_csv_path = "<file path>" df = spark.read.format("csv").load(source_csv_path) df.write.format("delta").mode("overwrite").partitionBy("invitedOnPartition").save("s3a://mybucket/OSSDelta/") from delta.tables import * deltaTable = DeltaTable.forPath(spark, "s3a://mybucket/OSSDelta/") deltaTable.optimize().executeZOrderBy("companyId")

Observed results

Below error is thrown, 22/08/04 14:12:33 WARN ObjectStore: Failed to get database delta, returning NoSuchObjectException 22/08/04 14:12:34 ERROR Executor: Exception in task 1.0 in stage 54.0 (TID 504) java.lang.UnsupportedOperationException: Cannot evaluate expression: rangepartitionid(input[0, bigint, true], 1000) at org.apache.spark.sql.errors.QueryExecutionErrors$.cannotEvaluateExpressionError(QueryExecutionErrors.scala:79) at org.apache.spark.sql.catalyst.expressions.Unevaluable.eval(Expression.scala:309) at org.apache.spark.sql.catalyst.expressions.Unevaluable.eval$(Expression.scala:308) at org.apache.spark.sql.delta.expressions.RangePartitionId.eval(RangePartitionId.scala:36) at org.apache.spark.sql.delta.expressions.InterleaveBits.eval(InterleaveBits.scala:64) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_0_6$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:169) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) 22/08/04 14:12:34 WARN TaskSetManager: Lost task 1.0 in stage 54.0 (TID 504) (ip-10-0-3-146.ap-southeast-1.compute.internal executor driver): java.lang.UnsupportedOperationException: Cannot evaluate expression: rangepartitionid(input[0, bigint, true], 1000) at org.apache.spark.sql.errors.QueryExecutionErrors$.cannotEvaluateExpressionError(QueryExecutionErrors.scala:79) at org.apache.spark.sql.catalyst.expressions.Unevaluable.eval(Expression.scala:309) at org.apache.spark.sql.catalyst.expressions.Unevaluable.eval$(Expression.scala:308) at org.apache.spark.sql.delta.expressions.RangePartitionId.eval(RangePartitionId.scala:36) at org.apache.spark.sql.delta.expressions.InterleaveBits.eval(InterleaveBits.scala:64) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_0_6$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:169) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748)

Further details

Environment information

Willingness to contribute

The Delta Lake Community encourages bug fix contributions. Would you or another member of your organization be willing to contribute a fix for this bug to the Delta Lake code base?

tdas commented 1 year ago

Taking a look.

tdas commented 1 year ago

did you set up the delta session extensions when starting pyspark?

pyspark --packages io.delta:delta-core_2.12:2.0.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"

https://docs.delta.io/latest/quick-start.html#pyspark-shell

mtNachiket commented 1 year ago

Hi @tdas,

No I did not setup delta session extensions when starting pyspark. After using above command, it worked. Can you share reason behind this error? also I understand that using extensions is mentioned in quick start documentation but is there any way to make these extensions available default?

Thanks for quick help.

scottsand-db commented 1 year ago

Hi @mtNachiket - so it seems that you have run into a very common issue (delta session extensions not included when loading pyspark). Would you be interested in helping us fix this?

This is described in this issue here: https://github.com/delta-io/delta/issues/1144. Let me know and thanks!