delta-io / delta

An open-source storage framework that enables building a Lakehouse architecture with compute engines including Spark, PrestoDB, Flink, Trino, and Hive and APIs
https://delta.io
Apache License 2.0
6.98k stars 1.6k forks source link

[BUG]Optimize command fails when using liquid clustering on local Delta Lake & PySpark #3087

Closed donielix closed 2 weeks ago

donielix commented 2 weeks ago

Bug

Which Delta project/connector is this regarding?

Describe the problem

When attempting to optimize a Delta Table configured with liquid clustering, an error occurs during the execution of the optimize().executeCompaction() method.

Steps to reproduce

from pyspark.sql import SparkSession
from delta.pip_utils import configure_spark_with_delta_pip

# Initializes a SparkSession configured with Delta
builder = (
    SparkSession.builder.config(
        "spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension"
    )
    .config(
        "spark.sql.catalog.spark_catalog",
        "org.apache.spark.sql.delta.catalog.DeltaCatalog",
    )
    .enableHiveSupport()
)
spark= configure_spark_with_delta_pip(
    spark_session_builder=builder
).getOrCreate()

# Initialize an empty Delta Table with liquid clustering
dt = (
    DeltaTable.createIfNotExists(spark)
    .tableName("testtable")
    .addColumn("id", dataType="bigint", nullable=False)
    .addColumn("date", dataType="date", nullable=False)
    .addColumn("name", dataType="string", nullable=False)
    .addColumn("amount", dataType="double")
    .addColumn("year_month", dataType="string", nullable=False)
    .clusterBy("year_month")
    .execute()
)

# Pushes some test data into newly created Delta table
spark.sql(
    """
    INSERT INTO testtable VALUES
    (1, '2024-01-01', 'Jack', 30.5, '2024-01'),
    (2, '2024-02-10', 'Claude', 11.2, '2024-02'),
    (3, '2024-02-25', 'Mick', 10.1, '2024-02')
    """
)

# Optimizes the Delta Table (this triggers the error)
dt.optimize().executeCompaction()

Observed results

When running above snippet, I get an extended error traceback.


Py4JJavaError Traceback (most recent call last) Cell In[9], line 1 ----> 1 dt.optimize().executeCompaction()

File ~/projects/ecoempy/.venv/lib/python3.10/site-packages/delta/tables.py:1391, in DeltaOptimizeBuilder.executeCompaction(self) 1382 @since(2.0) # type: ignore[arg-type] 1383 def executeCompaction(self) -> DataFrame: 1384 """ 1385 Compact the small files in selected partitions. 1386 1387 :return: DataFrame containing the OPTIMIZE execution metrics 1388 :rtype: pyspark.sql.DataFrame 1389 """ 1390 return DataFrame( -> 1391 self._jbuilder.executeCompaction(), 1392 getattr(self._spark, "_wrapped", self._spark) # type: ignore[attr-defined] 1393 )

File ~/projects/ecoempy/.venv/lib/python3.10/site-packages/py4j/java_gateway.py:1322, in JavaMember.call(self, *args) 1316 command = proto.CALL_COMMAND_NAME +\ 1317 self.command_header +\ 1318 args_command +\ 1319 proto.END_COMMAND_PART 1321 answer = self.gateway_client.send_command(command) -> 1322 return_value = get_return_value( 1323 answer, self.gateway_client, self.target_id, self.name) 1325 for temp_arg in temp_args: 1326 if hasattr(temp_arg, "_detach"):

File ~/projects/ecoempy/.venv/lib/python3.10/site-packages/pyspark/errors/exceptions/captured.py:179, in capture_sql_exception..deco(*a, kw) 177 def deco(*a: Any, *kw: Any) -> Any: 178 try: --> 179 return f(a, kw) 180 except Py4JJavaError as e: 181 converted = convert_exception(e.java_exception)

File ~/projects/ecoempy/.venv/lib/python3.10/site-packages/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name) 324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client) 325 if answer[1] == REFERENCE_TYPE: --> 326 raise Py4JJavaError( 327 "An error occurred while calling {0}{1}{2}.\n". 328 format(target_id, ".", name), value) 329 else: 330 raise Py4JError( 331 "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n". 332 format(target_id, ".", name, value))

Py4JJavaError: An error occurred while calling o123.executeCompaction. : org.apache.spark.SparkException: Exception thrown in awaitResult: at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56) at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310) at org.apache.spark.util.ThreadUtils$.parmap(ThreadUtils.scala:387) at org.apache.spark.sql.delta.commands.OptimizeExecutor.$anonfun$optimize$1(OptimizeTableCommand.scala:276) at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:168) at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:166) at org.apache.spark.sql.delta.commands.OptimizeExecutor.recordFrameProfile(OptimizeTableCommand.scala:217) at org.apache.spark.sql.delta.metering.DeltaLogging.$anonfun$recordDeltaOperationInternal$1(DeltaLogging.scala:136) at com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:128) at com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:117) at org.apache.spark.sql.delta.commands.OptimizeExecutor.recordOperation(OptimizeTableCommand.scala:217) at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperationInternal(DeltaLogging.scala:135) at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:125) at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:115) at org.apache.spark.sql.delta.commands.OptimizeExecutor.recordDeltaOperation(OptimizeTableCommand.scala:217) at org.apache.spark.sql.delta.commands.OptimizeExecutor.optimize(OptimizeTableCommand.scala:255) at org.apache.spark.sql.delta.commands.OptimizeTableCommand.run(OptimizeTableCommand.scala:180) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73) at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437) at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98) at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85) at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83) at org.apache.spark.sql.Dataset.(Dataset.scala:220) at org.apache.spark.sql.Dataset$.$anonfun$ofRows$1(Dataset.scala:92) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900) at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:89) at org.apache.spark.sql.delta.util.AnalysisHelper.toDataset(AnalysisHelper.scala:92) at org.apache.spark.sql.delta.util.AnalysisHelper.toDataset$(AnalysisHelper.scala:91) at io.delta.tables.DeltaOptimizeBuilder.toDataset(DeltaOptimizeBuilder.scala:43) at io.delta.tables.DeltaOptimizeBuilder.$anonfun$execute$1(DeltaOptimizeBuilder.scala:100) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900) at org.apache.spark.sql.delta.DeltaTableUtils$.withActiveSession(DeltaTable.scala:470) at io.delta.tables.DeltaOptimizeBuilder.execute(DeltaOptimizeBuilder.scala:85) at io.delta.tables.DeltaOptimizeBuilder.executeCompaction(DeltaOptimizeBuilder.scala:67) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:568) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182) at py4j.ClientServerConnection.run(ClientServerConnection.java:106) at java.base/java.lang.Thread.run(Thread.java:840) Caused by: java.util.concurrent.ExecutionException: Boxed Error at scala.concurrent.impl.Promise$.resolver(Promise.scala:87) at scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolveTry(Promise.scala:79) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284) at scala.concurrent.Promise.complete(Promise.scala:53) at scala.concurrent.Promise.complete$(Promise.scala:52) at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:187) at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64) at java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1395) at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373) at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182) at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655) at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622) at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165) Caused by: java.lang.AssertionError: assertion failed: Cannot do Hilbert clustering by zero or one column! at scala.Predef$.assert(Predef.scala:223) at org.apache.spark.sql.delta.skipping.HilbertClustering$.getClusteringExpression(MultiDimClustering.scala:108) at org.apache.spark.sql.delta.skipping.SpaceFillingCurveClustering.cluster(MultiDimClustering.scala:78) at org.apache.spark.sql.delta.skipping.SpaceFillingCurveClustering.cluster$(MultiDimClustering.scala:68) at org.apache.spark.sql.delta.skipping.HilbertClustering$.cluster(MultiDimClustering.scala:106) at org.apache.spark.sql.delta.skipping.MultiDimClustering$.cluster(MultiDimClustering.scala:59) at org.apache.spark.sql.delta.commands.OptimizeExecutor.runOptimizeBinJob(OptimizeTableCommand.scala:428) at org.apache.spark.sql.delta.commands.OptimizeExecutor.$anonfun$optimize$6(OptimizeTableCommand.scala:277) at org.apache.spark.util.ThreadUtils$.$anonfun$parmap$2(ThreadUtils.scala:384) at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659) at scala.util.Success.$anonfun$map$1(Try.scala:255) at scala.util.Success.map(Try.scala:213) at scala.concurrent.Future.$anonfun$map$1(Future.scala:292) at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33) ... 8 more

Expected results

I'd expect to run the optimize command successfully

Further details

Environment information

Willingness to contribute

The Delta Lake Community encourages bug fix contributions. Would you or another member of your organization be willing to contribute a fix for this bug to the Delta Lake code base?

vkorukanti commented 2 weeks ago

@donielix Could you paste the full error callstack? cc. @zedtang

donielix commented 2 weeks ago

@donielix Could you paste the full error callstack? cc. @zedtang

Updated the issue with traceback

zedtang commented 2 weeks ago

Thanks for reporting! It's due to hilbert clustering not supporting clustering on 1 column, and we should fall back to use zorder in that case.

I sent out a fix: https://github.com/delta-io/delta/pull/3109