apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.32k stars 2.41k forks source link

[SUPPORT] Change drop.partitionpath behavior #10878

Closed VitoMakarevich closed 5 months ago

VitoMakarevich commented 5 months ago

Tips before filing an issue

Describe the problem you faced

Hudi 0.12.2 version. Hello, we are now using Hudi COW with hoodie.datasource.write.drop.partition.columns=true(non-default). And recently we wanted to run clustering for this dataset. The problem we observe is that bulk_insert used in clustering does not honor this setting and parquet files are created with partition columns. As far as I checked it's not a concern for reading via Spark, but it causes problem for upserting for those partitions where we had clustering. If an update comes to such file, in 0.12.x we see failures like Parquet/Avro schema mismatch: Avro field 'partition1' not found. Reproduction here I went deeper and found that in Hudi 0.13.x and 0.14.x clustering do not honor flag hoodie.datasource.write.drop.partition.columns=true, but there it does not cause problems with upsert - partitions can udpated. Given this, we probably cannot upgrade now, but would like to execute clustering. So I need help with the following question:

  1. Is it safe to change hoodie.datasource.write.drop.partition.columns from true to false? As far as I observed it looks to be working, but.
  2. How it can be changed? Since changing it in write options does not help, because it's written into hoodie.properties file - if I change the contents of this file - it works this way: subsequent update appends partition column value, but only for changed rows, old remains with null. However, in the read path, old rows have this field populated(some magic). And once clustering is run - all rows have field populated. Important: we would not be able to execute clustering all table at once - likely, so if we run it in batches, is it safe to have part of the files including partition columns and part not?

Nice to answer questions:

  1. Do you know what might cause change of behavior in 0.12.3 -> 0.13.0 so that the write works after clustering?

To Reproduce

Reproduction here

Environment Description

Additional context

Add any other context about the problem here.

Stacktrace

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 128.0 failed 1 times, most recent failure: Lost task 0.0 in stage 128.0 (TID 531) (192.168.0.143 executor driver): org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType UPDATE for partition :0
    at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:329)
    at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:244)
    at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102)
    at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:907)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:907)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:378)
    at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1525)
    at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1435)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1499)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1322)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:376)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:327)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:136)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.hudi.exception.HoodieException: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: operation has failed
    at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:166)
    at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:358)
    at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:349)
    at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:322)
    ... 28 more
Caused by: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: operation has failed
    at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:161)
    at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:164)
    ... 31 more
Caused by: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: operation has failed
    at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    at java.util.concurrent.FutureTask.get(FutureTask.java:192)
    at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:155)
    ... 32 more
Caused by: org.apache.hudi.exception.HoodieException: operation has failed
    at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.throwExceptionIfFailed(BoundedInMemoryQueue.java:248)
    at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.readNextRecord(BoundedInMemoryQueue.java:226)
    at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.access$100(BoundedInMemoryQueue.java:52)
    at org.apache.hudi.common.util.queue.BoundedInMemoryQueue$QueueIterator.hasNext(BoundedInMemoryQueue.java:278)
    at org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:36)
    at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:135)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    ... 3 more
Caused by: org.apache.hudi.exception.HoodieException: unable to read next record from parquet file 
    at org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:53)
    at org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45)
    at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:106)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    ... 4 more
Caused by: org.apache.parquet.io.InvalidRecordException: Parquet/Avro schema mismatch: Avro field 'partition1' not found
    at org.apache.parquet.avro.AvroRecordConverter.getAvroField(AvroRecordConverter.java:221)
    at org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:126)
    at org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:91)
    at org.apache.parquet.avro.AvroRecordMaterializer.<init>(AvroRecordMaterializer.java:33)
    at org.apache.parquet.avro.AvroReadSupport.prepareForRead(AvroReadSupport.java:142)
    at org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:185)
    at org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:156)
    at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:135)
    at org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:48)
    ... 8 more

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2228)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2249)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2268)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2293)
    at org.apache.spark.rdd.RDD.count(RDD.scala:1274)
    at org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:693)
    at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:345)
    at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:145)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
    at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
    at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:116)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
    at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
    at com.example.hudi.HudiClusteringPartitionColumn$.$anonfun$write$1(HudiClusteringPartitionColumn.scala:50)
    at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158)
    at com.example.hudi.HudiClusteringPartitionColumn$.write(HudiClusteringPartitionColumn.scala:40)
    at com.example.hudi.HudiClusteringPartitionColumn$.delayedEndpoint$com$example$hudi$HudiClusteringPartitionColumn$1(HudiClusteringPartitionColumn.scala:57)
    at com.example.hudi.HudiClusteringPartitionColumn$delayedInit$body.apply(HudiClusteringPartitionColumn.scala:6)
    at scala.Function0.apply$mcV$sp(Function0.scala:39)
    at scala.Function0.apply$mcV$sp$(Function0.scala:39)
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:17)
    at scala.App.$anonfun$main$1$adapted(App.scala:80)
    at scala.collection.immutable.List.foreach(List.scala:431)
    at scala.App.main(App.scala:80)
    at scala.App.main$(App.scala:78)
    at com.example.hudi.HudiClusteringPartitionColumn$.main(HudiClusteringPartitionColumn.scala:6)
    at com.example.hudi.HudiClusteringPartitionColumn.main(HudiClusteringPartitionColumn.scala)
Caused by: org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType UPDATE for partition :0
    at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:329)
    at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:244)
    at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102)
    at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:907)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:907)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:378)
    at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1525)
    at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1435)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1499)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1322)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:376)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:327)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:136)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.hudi.exception.HoodieException: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: operation has failed
    at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:166)
    at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:358)
    at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:349)
    at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:322)
    ... 28 more
Caused by: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: operation has failed
    at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:161)
    at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:164)
    ... 31 more
Caused by: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: operation has failed
    at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    at java.util.concurrent.FutureTask.get(FutureTask.java:192)
    at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:155)
    ... 32 more
Caused by: org.apache.hudi.exception.HoodieException: operation has failed
    at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.throwExceptionIfFailed(BoundedInMemoryQueue.java:248)
    at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.readNextRecord(BoundedInMemoryQueue.java:226)
    at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.access$100(BoundedInMemoryQueue.java:52)
    at org.apache.hudi.common.util.queue.BoundedInMemoryQueue$QueueIterator.hasNext(BoundedInMemoryQueue.java:278)
    at org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:36)
    at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:135)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    ... 3 more
Caused by: org.apache.hudi.exception.HoodieException: unable to read next record from parquet file 
    at org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:53)
    at org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45)
    at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:106)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    ... 4 more
Caused by: org.apache.parquet.io.InvalidRecordException: Parquet/Avro schema mismatch: Avro field 'partition1' not found
    at org.apache.parquet.avro.AvroRecordConverter.getAvroField(AvroRecordConverter.java:221)
    at org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:126)
    at org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:91)
    at org.apache.parquet.avro.AvroRecordMaterializer.<init>(AvroRecordMaterializer.java:33)
    at org.apache.parquet.avro.AvroReadSupport.prepareForRead(AvroReadSupport.java:142)
    at org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:185)
    at org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:156)
    at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:135)
    at org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:48)
    ... 8 more
danny0405 commented 5 months ago

I went deeper and found that in Hudi 0.13.x and 0.14.x clustering do not honor flag hoodie.datasource.write.drop.partition.columns=true,

Is this a bug?

VitoMakarevich commented 5 months ago

It looks to be, and in this version(0.12.x) it causes Upsert unresolvable exception

danny0405 commented 5 months ago

Does a higher release 0.14.x solve the problem already? Did you have a chance to try that?

VitoMakarevich commented 5 months ago

0.13.x solves this, I mean it's writing files with a partition column but it does not cause an exception for upsert path. Unfortunately we cannot upgrade, so I'm asking for a workaround way:

  1. Is it safe to change hoodie.datasource.write.drop.partition.columns from true to false? As far as I observed it looks to be working, but.
  2. How it can be changed? Since changing it in write options does not help, because it's written into hoodie.properties file - if I change the contents of this file - it works this way: subsequent update appends partition column value, but only for changed rows, old remains with null. However, in the read path, old rows have this field populated(some magic). And once clustering is run - all rows have field populated.
danny0405 commented 5 months ago

because it's written into hoodie.properties file - if I change the contents of this file

yeah, it's a table configuration instead of write config. So the impact is global for read and write. cc @nsivabalan any ideas?

VitoMakarevich commented 5 months ago

In the meantime, it looks like I found a workaround "hoodie.datasource.write.row.writer.enable": "false" for the time of clustering does the trick - once it's done, it likely can be switched back(will check). This setting makes writing a bit slower but files are produced without partitionpath column as per the initial configuration.

VitoMakarevich commented 5 months ago

I also checked on some test tables that manually changing property in hoodie.properties works, but then you must ensure that clustering touches all files - which may be problematic if you have a lot of partitions. Since it creates Npartitions * (clustering group per partition) - so in our case, it will be submitting ~5k spark jobs which 99% will blow up the driver. And there is no yet way to limit this number if you want to run clustering on all partitions(I see in newer Hudi versions there is executor service which handles this).