apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.48k stars 2.44k forks source link

[SUPPORT] - Hoodi Delta Streamer - Do not allowing to perform upsert/insert in same partition when there is new column added in source data #7344

Open masthanmca123 opened 2 years ago

masthanmca123 commented 2 years ago

Tips before filing an issue

Describe the problem you faced my source is - ParquetDFSSource Deltastreamer used to consume data from parquet source (GCS bucket) and load into GCS target path with partition . First time it is working as expected . I have inserted new data with one extra column in source directory but same partition , in this case Delta streamer throwing an exception .

A clear and concise description of the problem.

To Reproduce

Steps to reproduce the behavior:

1.Deltastreamer (hudi-utilities-bundle_2.12-0.11.1.jar )

  1. Parquet DFS source
  2. Insert new data with new field(schema change) post first run in same partition
  3. execute the delta stream utility and try to load the data into same partition in hudi table too

Expected behavior Record should be inserted with latest schema(new field)

A clear and concise description of what you expected to happen.

Environment Description

Additional context

Add any other context about the problem here.

Stacktrace

Add the stacktrace of the error.

ERROR org.apache.spark.deploy.yarn.Client: Application diagnostics message: User class threw exception: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 32.0 failed 4 times, most recent failure: Lost task 0.3 in stage 32.0 (TID 59) (m0s0d56-test-m.us-central1.us.walmart.net executor 1): org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType UPDATE for partition :0 at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:329) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:244) at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102) at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:915) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:915) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:386) at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1440) at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1350) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1414) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1237) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384) at org.apache.spark.rdd.RDD.iterator(RDD.scala:335) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:498) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:501) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Caused by: org.apache.hudi.exception.HoodieException: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: operation has failed at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:149) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:358) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:349) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:322) ... 28 more Caused by: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: operation has failed at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:161) at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:147) ... 31 more Caused by: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: operation has failed at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:155) ... 32 more Caused by: org.apache.hudi.exception.HoodieException: operation has failed at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.throwExceptionIfFailed(BoundedInMemoryQueue.java:248) at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.readNextRecord(BoundedInMemoryQueue.java:226) at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.access$100(BoundedInMemoryQueue.java:52) at org.apache.hudi.common.util.queue.BoundedInMemoryQueue$QueueIterator.hasNext(BoundedInMemoryQueue.java:278) at org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:36) at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:135) at java.util.concurrent.FutureTask.run(FutureTask.java:266) ... 3 more Caused by: java.lang.NoSuchMethodError: org.apache.avro.Schema$Field.defaultValue()Lorg/codehaus/jackson/JsonNode; at org.apache.parquet.avro.AvroRecordConverter.(AvroRecordConverter.java:168) at org.apache.parquet.avro.AvroRecordConverter.(AvroRecordConverter.java:95) at org.apache.parquet.avro.AvroRecordMaterializer.(AvroRecordMaterializer.java:33) at org.apache.parquet.avro.AvroReadSupport.prepareForRead(AvroReadSupport.java:138) at org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:186) at org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:156) at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:135) at org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:48) at org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45) at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:106) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

masthanmca123 commented 2 years ago

spark-submit \ --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \ --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \ --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \ --packages org.apache.hudi:hudi-spark3.1.2-bundle_2.12:0.11.1 --name hudi-test \ --deploy-mode client \ --jars gs:///df-techmod/m0s0d56/hudi/hudi-utilities-bundle_2.12-0.11.1.jar,gs://hnw-artifacts-ba16423s78g6f2/df-techmod/m0s0d56/hudi/hudi-spark3.1-bundle_2.12-0.12.1.jar,gs://hnw-artifacts-ba16423s78g6f2/df-techmod/hudi/spark-avro_2.12-3.1.3.jar \ --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \ gs://hnw-artifacts-ba16423s78g6f2/df-techmod/m0s0d56/hudi/hudi-utilities-bundle_2.12-0.11.1.jar \ --table-type COPY_ON_WRITE \ --op UPSERT \ --enable-sync \ --sync-tool-classes org.apache.hudi.hive.HiveSyncTool \ --target-base-path gs:///us_rx_rdm/my_hudi_test_3 \ --target-table testschema.my_hudi_test_3 \ --source-class org.apache.hudi.utilities.sources.ParquetDFSSource \ --transformer-class org.apache.hudi.utilities.transform.SqlQueryBasedTransformer \ --source-ordering-field load_ts \ --hoodie-conf hoodie.datasource.write.recordkey.field=id \ --hoodie-conf hoodie.datasource.write.precombine.field=load_ts \ --hoodie-conf hoodie.datasource.write.partitionpath.field=load_dt \ --hoodie-conf hoodie.datasource.hive_sync.support_timestamp=true \ --hoodie-conf hoodie.datasource.hive_sync.ignore_exceptions=true \ --hoodie-conf hoodie.datasource.hive_sync.mode=hms \ --hoodie-conf hoodie.datasource.write.hive_style_partitioning=true \ --hoodie-conf hoodie.datasource.write.keygenerator.consistent.logical.timestamp.enabled=true \ --hoodie-conf hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.SimpleKeyGenerator \ --hoodie-conf hoodie.datasource.hive_sync.partition_fields=load_dt \ --hoodie-conf hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.HiveStylePartitionValueExtractor \ --hoodie-conf hoodie.datasource.hive_sync.database=us_rx_dm \ --hoodie-conf hoodie.datasource.hive_sync.table=my_hudi_test_3 \ --hoodie-conf hoodie.datasource.hive_sync.use_jdbc=false \ --hoodie-conf hoodie.upsert.shuffle.parallelism=2 \ --hoodie-conf hoodie.finalize.write.parallelism=2 \ --hoodie-conf hoodie.insert.shuffle.parallelism=2 \ --hoodie-conf hoodie.deltastreamer.source.dfs.root=gs:///hudi-poc/input-dir/test-data/load_dt=2022-11-29/ \ --hoodie-conf "\"hoodie.deltastreamer.transformer.sql=select *,current_date as load_dt FROM \"" \ --hoodie-conf hoodie.datasource.write.reconcile.schema=true \ --hoodie-conf hoodie.index.type=SIMPLE \ --hoodie-conf hoodie.schema.on.read.enable=true

xushiyan commented 1 year ago

@lokeshj1703 can you try to replicate the issue first? then we can discuss what might be the root cause.

lokeshj1703 commented 1 year ago

spark-submit ... --packages org.apache.hudi:hudi-spark3.1.2-bundle_2.12:0.11.1

@masthanmca123 hudi-spark3.1.2-bundle_2.12:0.11.1 is not available in the central repo(https://search.maven.org/). Can you please recheck the artifact id and version?

masthanmca123 commented 1 year ago

As per my understanding, below reported bundle jar is optional in case of delta streamer.

Did you tested delta streamer with schema changes ?

On Tue, 13 Dec, 2022, 16:16 Lokesh Jain, @.***> wrote:

spark-submit ... --packages org.apache.hudi:hudi-spark3.1.2-bundle_2.12:0.11.1

@masthanmca123 https://github.com/masthanmca123 hudi-spark3.1.2-bundle_2.12:0.11.1 is not available in the central repo( https://search.maven.org/). Can you please recheck the artifact id and version?

— Reply to this email directly, view it on GitHub https://github.com/apache/hudi/issues/7344#issuecomment-1348170215, or unsubscribe https://github.com/notifications/unsubscribe-auth/A4P4YDP6PVXPULTR4HLAFPDWNBHXLANCNFSM6AAAAAASP3P3UM . You are receiving this because you were mentioned.Message ID: @.***>

lokeshj1703 commented 1 year ago

Can you please try using the hudi-utilities-slim-bundle along with the spark bundle? This approach is mentioned in https://hudi.apache.org/docs/hoodie_deltastreamer/#deltastreamer for avoiding compatibility issues after release 0.11.0.