delta-io / delta

An open-source storage framework that enables building a Lakehouse architecture with compute engines including Spark, PrestoDB, Flink, Trino, and Hive and APIs
https://delta.io
Apache License 2.0
7.65k stars 1.72k forks source link

[BUG] Zordering results in SparkUnsupportedOperationException #1726

Open nityathacker opened 1 year ago

nityathacker commented 1 year ago

Bug

Zordering on String columns does not seem to work. The error does help much either.

Describe the problem

In my dataset, I have 2 columns that I use for my predicate for querying data and as join columns for merging. The performance of merge operation from an EMR cluster to s3 is really poor. As a step towards performance improvement, I tried to zorder on the 2 join columns, which both happen to be String type. But Zordering gives me the following error -

org.apache.spark.SparkUnsupportedOperationException: Cannot evaluate expression: rangepartitionid(input[14, string, true], 1000)

I tried this from EMR using the main dataset as well as locally with a small sample.

Steps to reproduce

I was easily able to replicate this error locally on 1 super small test df. I am not sure if the problem is same with my prod dataset as with the sample code below, but the steps and error match.

This was run on spark-shell -


import io.delta.tables._    
import org.apache.spark.sql.SparkSession

val values1 = List(List("Tom", "10", "12") ,List("Tim", "10", "13"),List("Harry", "11", "14")).map(x =>(x(0), x(1),x(2)))
val df1 = values1.toDF("name","age","marks")

val spark = SparkSession.builder().config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension").config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog").getOrCreate() 
df1.write.format("delta").save("/home/some/path")

val dt_test = DeltaTable.forPath(spark,"/home/some/path")
dt_test.optimize().executeZOrderBy("age")//this is where the error happens

Environment information

Willingness to contribute

The Delta Lake Community encourages bug fix contributions. Would you or another member of your organization be willing to contribute a fix for this bug to the Delta Lake code base?

allisonport-db commented 1 year ago

If you're using spark shell you need to set the delta-related configs when starting the shell. Can you try this following the directions in https://docs.delta.io/latest/quick-start.html#spark-scala-shell?

tdas commented 1 year ago

@nityathacker did you try what Allison suggested? Did that not work?

nityathacker commented 1 year ago

I believe those settings are same as the line#5 (copied below here) of the code snippet in the description - val spark = SparkSession.builder().config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension").config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog").getOrCreate()

vkorukanti commented 1 year ago

@nityathacker I couldn't repro the issue with the test you provided both on Spark 3.3.0 and Spark 3.3.1 with Delta 2.2.0. I used the opensource Apache Spark. Is there anything that is missing from the repro either the environment or test code? Is it reproing only in EMR?

nityathacker commented 1 year ago

@vkorukanti so that code worked for you and was able to zorder? I just tried again, this is the stack trace -

23/05/23 19:21:27 ERROR Executor: Exception in task 2.0 in stage 33.0 (TID 319)
org.apache.spark.SparkUnsupportedOperationException: Cannot evaluate expression: rangepartitionid(input[1, string, true], 1000)
    at org.apache.spark.sql.errors.QueryExecutionErrors$.cannotEvaluateExpressionError(QueryExecutionErrors.scala:73)
    at org.apache.spark.sql.catalyst.expressions.Unevaluable.eval(Expression.scala:344)
    at org.apache.spark.sql.catalyst.expressions.Unevaluable.eval$(Expression.scala:343)
    at org.apache.spark.sql.delta.expressions.RangePartitionId.eval(RangePartitionId.scala:36)
    at org.apache.spark.sql.delta.expressions.InterleaveBits.eval(InterleaveBits.scala:64)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:169)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
    at org.apache.spark.scheduler.Task.run(Task.scala:136)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
23/05/23 19:21:27 ERROR Executor: Exception in task 0.0 in stage 33.0 (TID 317)
org.apache.spark.SparkUnsupportedOperationException: Cannot evaluate expression: rangepartitionid(input[1, string, true], 1000)
    at org.apache.spark.sql.errors.QueryExecutionErrors$.cannotEvaluateExpressionError(QueryExecutionErrors.scala:73)
    at org.apache.spark.sql.catalyst.expressions.Unevaluable.eval(Expression.scala:344)
    at org.apache.spark.sql.catalyst.expressions.Unevaluable.eval$(Expression.scala:343)
    at org.apache.spark.sql.delta.expressions.RangePartitionId.eval(RangePartitionId.scala:36)
    at org.apache.spark.sql.delta.expressions.InterleaveBits.eval(InterleaveBits.scala:64)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:169)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
    at org.apache.spark.scheduler.Task.run(Task.scala:136)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
allisonport-db commented 1 year ago

I believe those settings are same as the line#5 (copied below here) of the code snippet in the description - val spark = SparkSession.builder().config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension").config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog").getOrCreate()

It's not the same since they're static configs. Did you try it following the instructions in the quickstart documentation? Please confirm if you are still seeing the same error when setting the configs upon shell start.

abhijeetnaib commented 1 year ago

Hi,

We have been facing the same issue while running Z-order from spark shell , we have followed what @allisonport-db has suggested . The column is also part of the first 32 columns.

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 8.0 failed 1 times, most recent failure: Lost task 0.0 in stage 8.0 (TID 106) (app1.amethyst.c.ia55.net executor driver): org.apache.spark.SparkUnsupportedOperationException: Cannot evaluate expression: rangepartitionid(input[0, timestamp, true], 1000)
    at org.apache.spark.sql.errors.QueryExecutionErrors$.cannotEvaluateExpressionError(QueryExecutionErrors.scala:73)
vkorukanti commented 1 year ago

@nityathacker @abhijeetnaib What version and flavor (EMR or vanilla Apache Spark)? The reason I am asking is: ZOrder uses a Spark expression rangepartitionid. If this is changed in the Spark version you are using, then ZOrder may not work. It helps if you provide the Spark environment (EMR, vanilla Apache Spark etc.) in debugging the issue.

cdagraca commented 1 year ago

I get the same error on EMR 6.6.0 (Spark 3.2.0-amzn-0), with io.delta:delta-core_2.12:2.2.0 installed (and configured in the session)

But it works using on EMR 6.10.0 (Spark 3.3.1-amzn-0), which comes with delta 2.2.0 (and i used the same session config for it)

So it does look like something has changed, probably in the spark version - which is an annoyingly breaking change for a minor version. I'm just using the newer EMR to get around this.

Tomiwa-dev commented 1 year ago

HI @nityathacker were you able to resolve this?

nityathacker commented 1 year ago

@nityathacker @abhijeetnaib What version and flavor (EMR or vanilla Apache Spark)? The reason I am asking is: ZOrder uses a Spark expression rangepartitionid. If this is changed in the Spark version you are using, then ZOrder may not work. It helps if you provide the Spark environment (EMR, vanilla Apache Spark etc.) in debugging the issue.

The versions I used are there in the description -

Delta Lake version: 2.2.0
Spark version: 3.3.0
Scala version: 2.12

@Tomiwa-dev I have not been able to get past this yet.

Tomiwa-dev commented 1 year ago

@nityathacker @abhijeetnaib What version and flavor (EMR or vanilla Apache Spark)? The reason I am asking is: ZOrder uses a Spark expression rangepartitionid. If this is changed in the Spark version you are using, then ZOrder may not work. It helps if you provide the Spark environment (EMR, vanilla Apache Spark etc.) in debugging the issue.

emr version: 6.12.0 pyspark: 3.4.1 delta-spark: 2.4.0

pawankukreja01 commented 1 year ago

The error message you received indicates that the rangepartitionid function cannot evaluate the expression for a string column. This is because rangepartitionid is designed to work with numeric data types One possible workaround is to convert the string columns to numeric data types before applying Z-ordering