apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.34k stars 2.42k forks source link

org.apache.avro.AvroRuntimeException: Not a valid schema field: IncidentId,EntryTime,ModifiedTime #11744

Open prat0082 opened 1 month ago

prat0082 commented 1 month ago

Tips before filing an issue

Describe the problem you faced

Hudi Upsert fails with AvroRuntimeException: Not a valid schema field ,

I have completed the full load of the data using hudi bulk insert with same set of hudi recordkey , partition and precombine fields once the full load is completed , incremental load is set to run to update the data for past 7 days , but it fails with Schema Evolution issue

am using AWS Glue 4.0

'hoodie.table.name': hudi_table_name , 'hoodie.datasource.write.storage.type': 'COPY_ON_WRITE', 'hoodie.datasource.write.table.type': 'COPY_ON_WRITE', 'hoodie.datasource.write.operation': 'upsert', 'hoodie.upsert.shuffle.parallelism' : 200, 'hoodie.insert.shuffle.parallelism' : 200, 'hoodie.bloom.index.parallelism' : 320, 'hoodie.parquet.small.file.limit' : '20971520', 'hoodie.datasource.write.recordkey.field': recordkey, 'hoodie.datasource.write.precombine.field': precombinedfield, 'hoodie.datasource.write.partitionpath.field': partitionfield, 'hoodie.datasource.write.hive_style_partitioning': 'true', 'hoodie.datasource.write.table.name': hudi_table_name, 'path': s3_output_path, 'hoodie.datasource.hive_sync.enable': 'true', 'hoodie.datasource.hive_sync.database': hudi_database_name, 'hoodie.datasource.hive_sync.table': hudi_table_name, 'hoodie.datasource.hive_sync.partition_fields': partitionfield, 'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor', 'hoodie.datasource.hive_sync.use_jdbc': 'false', 'hoodie.datasource.hive_sync.mode': 'hms', 'hoodie.meta.sync.client.tool.class':'org.apache.hudi.aws.sync.AwsGlueCatalogSyncTool', 'hoodie.avro.schema.external.transformation' : 'true', 'hoodie.schema.on.read.enable' : 'true', 'hoodie.datasource.write.reconcile.schema' : 'true', 'hoodie.avro.schema.validate':'true', 'hoodie.datasource.write.schema.allow.auto.evolution.column.drop':'true', 'hoodie.datasource.hive_sync.support_timestamp': 'true', 'hoodie.write.lock.provider' : 'org.apache.hudi.hive.HiveMetastoreBasedLockProvider', 'hoodie.datasource.write.keygenerator.class': 'org.apache.hudi.keygen.ComplexKeyGenerator', 'hoodie.write.lock.hivemetastore.database' : hudi_database_name, 'hoodie.write.lock.hivemetastore.table' : hudi_table_name, 'hoodie.index.type': 'GLOBAL_BLOOM', 'hoodie.bloom.index.update.partition.path': 'true', 'hoodie.bloom.index.filter.type':'DYNAMIC_V0', 'hoodie.cleaner.commits.retained': 1, 'hoodie.clean.automatic': 'true', 'hoodie.cleaner.parallelism': 100

A clear and concise description of the problem.

To Reproduce

Steps to reproduce the behavior:

1.insert records using bulk insert 2.upsert the incremental records 80 % update and 20% insert 3. 4.

Expected behavior

A clear and concise description of what you expected to happen. Data should be updated and inserted onto the hudi table

Environment Description

Additional context

Add any other context about the problem here.

Stacktrace

py4j.protocol.Py4JJavaError: An error occurred while calling o146.save. : org.apache.hudi.exception.HoodieUpsertException: Failed to upsert for commit time 20240808100109898 at org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:64) at org.apache.hudi.table.action.commit.SparkUpsertCommitActionExecutor.execute(SparkUpsertCommitActionExecutor.java:45) at org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:113) at org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:97) at org.apache.hudi.client.SparkRDDWriteClient.upsert(SparkRDDWriteClient.java:158) at org.apache.hudi.DataSourceUtils.doWriteOperation(DataSourceUtils.java:206) at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:331) at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:154) at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73) at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:103) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107) at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224) at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:114) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$7(SQLExecution.scala:139) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107) at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:139) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:245) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:138) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:100) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:96) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:615) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:177) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:615) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:591) at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:96) at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:83) at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:81) at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:124) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860) at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390) at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182) at py4j.ClientServerConnection.run(ClientServerConnection.java:106) at java.lang.Thread.run(Thread.java:750) Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 25 in stage 6.0 failed 4 times, most recent failure: Lost task 25.3 in stage 6.0 (TID 145) (10.63.111.23 executor 2): org.apache.avro.AvroRuntimeException: Not a valid schema field: IncidentId,EntryTime,ModifiedTime at org.apache.avro.generic.GenericData$Record.get(GenericData.java:268) at org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldVal(HoodieAvroUtils.java:499) at org.apache.hudi.HoodieSparkSqlWriter$.$anonfun$write$11(HoodieSparkSqlWriter.scala:297) at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:199) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:65) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) at org.apache.spark.scheduler.Task.run(Task.scala:138) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1516) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750)

Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2863) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2799) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2798) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2798) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1239) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1239) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1239) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3051) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2993) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2982) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1009) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2229) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2250) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2269) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2294) at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1021) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:406) at org.apache.spark.rdd.RDD.collect(RDD.scala:1020) at org.apache.spark.rdd.PairRDDFunctions.$anonfun$countByKey$1(PairRDDFunctions.scala:367) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:406) at org.apache.spark.rdd.PairRDDFunctions.countByKey(PairRDDFunctions.scala:367) at org.apache.spark.api.java.JavaPairRDD.countByKey(JavaPairRDD.scala:314) at org.apache.hudi.data.HoodieJavaPairRDD.countByKey(HoodieJavaPairRDD.java:105) at org.apache.hudi.index.bloom.HoodieBloomIndex.lookupIndex(HoodieBloomIndex.java:119) at org.apache.hudi.index.bloom.HoodieBloomIndex.tagLocation(HoodieBloomIndex.java:89) at org.apache.hudi.table.action.commit.HoodieWriteHelper.tag(HoodieWriteHelper.java:50) at org.apache.hudi.table.action.commit.HoodieWriteHelper.tag(HoodieWriteHelper.java:33) at org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:53) ... 54 more Caused by: org.apache.avro.AvroRuntimeException: Not a valid schema field: IncidentId,EntryTime,ModifiedTime at org.apache.avro.generic.GenericData$Record.get(GenericData.java:268) at org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldVal(HoodieAvroUtils.java:499) at org.apache.hudi.HoodieSparkSqlWriter$.$anonfun$write$11(HoodieSparkSqlWriter.scala:297) at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:199) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:65) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) at org.apache.spark.scheduler.Task.run(Task.scala:138) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1516) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more

danny0405 commented 1 month ago

You may got some schema mismatch:

public Object get(String key) {
      Field field = schema.getField(key);
      if (field == null) {
        throw new AvroRuntimeException("Not a valid schema field: " + key);
      }
      return values[field.pos()];
    }
prat0082 commented 1 month ago

These are recordkey fields and types are IncidentId,EntryTime,ModifiedTime int and timestamp , am not doing any transformation

Do i need to set any specific hudi options for timestamp recordkey?

prat0082 commented 1 month ago

I see the below in the log as avro schema for timestamp fields which i use as recordkeys , type as long and logicalType as timestamp-micros

{ "name" : "EntryTime", "type" : [ "null", { "type" : "long", "logicalType" : "timestamp-micros" } ], { "name" : "ModifiedTime", "type" : [ "null", { "type" : "long", "logicalType" : "timestamp-micros" } ], "default" : null

danny0405 commented 1 month ago

But it looks like the three fields are concatenated as one as of to Avro schema, which is the reason it can not be found.

prat0082 commented 1 month ago

i need these fields as recordkey for indexing and unique identification for the records

danny0405 commented 1 month ago

Then you just need to define the record keys concatented with comma:

'hoodie.datasource.write.recordkey.field': col_a,col_b,col_c
prat0082 commented 1 month ago

it's defined in code separated with comma recordkey = 'IncidentId,EntryTime,ModifiedTime'

danny0405 commented 1 month ago

I saw this in the options: 'hoodie.datasource.write.recordkey.field': recordkey,

prat0082 commented 1 month ago

Changed the recordkey field to have timestamp column , still throws the same error

Caused by: org.apache.avro.AvroRuntimeException: Not a valid schema field: entryrefintime at org.apache.avro.generic.GenericData$Record.get(GenericData.java:268) at org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldVal(HoodieAvroUtils.java:499) at org.apache.hudi.HoodieSparkSqlWriter$.$anonfun$write$11(HoodieSparkSqlWriter.scala:297) at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:199) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:65) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) at org.apache.spark.scheduler.Task.run(Task.scala:138) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1516) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more

ad1happy2go commented 1 month ago

@prat0082 Can you let us know what value you are giving for recordKey.

image
prat0082 commented 1 month ago

RecordKey - 'incidentid,entrytime,modifiedtime'

incidentid - int , entrytime - timestamp modifiedtime - timestamp