apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.31k stars 2.41k forks source link

[SUPPORT] - trouble using Apache Hudi with S3. #1723

Closed bobgalvao closed 4 years ago

bobgalvao commented 4 years ago

Hi,

I'm having a trouble using Apache Hudi with S3.

Steps to reproduce the behavior:

  1. Produce messages to topic Kafka. (2000 records per window on average)
  2. Start streaming (sample code below).
  3. Intermittently errors start to occur
  4. It is necessary to leave the streaming consuming the message of Kafka for the error to occur. There is no standard.

Environment Description:

AWS EMR: emr-5.29.0 Hudi version : 0.5.0-inc Spark version : 2.4.4 Hive version : 2.3.6 Hadoop version : 2.8.5 Storage : S3 Running on Docker? : No

The errors occur intermittently, making subsequent writing impossible for the error (error - 1) “Unrecognized token 'Objavro' ..” and for the error (error - 2 / 3) “Could not find any data file written for commit…” / "Failed to read schema from data...". In this last case, it normalize it in the next execution, but the streaming or batch processing ends with an error.

Due to the problem in the use of S3, I started using HDFS with the same code, where I had no problems with inconsistencies caused by S3.

I have already enabled EMRFS, but the same errors occur. I also enabled “hoodie.consistency.check.enabled” as recommended when using S3 storage. It seems to me to be related to the consistency of the S3.

I often get the errors below:

Error – 1 (when this error occurs it is no longer possible to use the Hudi dataset.):

20/05/21 17:49:36 ERROR JobScheduler: Error running job streaming job 1590083340000 ms.0
org.apache.hudi.hive.HoodieHiveSyncException: Failed to get dataset schema for AWS_CASE
  at org.apache.hudi.hive.HoodieHiveClient.getDataSchema(HoodieHiveClient.java:414)
  at org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:93)
  at org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:71)
  at org.apache.hudi.HoodieSparkSqlWriter$.syncHive(HoodieSparkSqlWriter.scala:236)
  at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:169)
  at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:91)
  at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
  at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83)
  at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:83)
  at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
  at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
  at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:84)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:165)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:74)
  at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
  at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
  at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
  at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
  at br.com.agi.bigdata.awscase.Main$.processRDD(Main.scala:91)
  at br.com.agi.bigdata.awscase.Main$$anonfun$main$1.apply(Main.scala:117)
  at br.com.agi.bigdata.awscase.Main$$anonfun$main$1.apply(Main.scala:114)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
  at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
  at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
  at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
  at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
  at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
  at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
  at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
  at scala.util.Try$.apply(Try.scala:192)
  at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
  at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
  at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
  at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
  at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
  at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: unable to read commit metadata
  at org.apache.hudi.common.model.HoodieCommitMetadata.fromBytes(HoodieCommitMetadata.java:328)
  at org.apache.hudi.hive.HoodieHiveClient.readSchemaFromLastCompaction(HoodieHiveClient.java:428)
  at org.apache.hudi.hive.HoodieHiveClient.getDataSchema(HoodieHiveClient.java:407)
  ... 48 more
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'Objavro': was expecting ('true', 'false' or 'null')
 at [Source: Objavro.schema�{"type":"record","name":"HoodieCompactionPlan","namespace":"org.apache.hudi.avro.model","fields":[{"name":"operations","type":["null",{"type":"array","items":{"type":"record","name":"HoodieCompactionOperation","fields":[{"name":"baseInstantTime","type":["null",{"type":"string","avro.java.string":"String"}]},{"name":"deltaFilePaths","type":["null",{"type":"array","items":{"type":"string","avro.java.string":"String"}}],"default":null},{"name":"dataFilePath","type":["null",{"type":"string","avro.java.string":"String"}],"default":null},{"name":"fileId","type":["null",{"type":"string","avro.java.string":"String"}]},{"name":"partitionPath","type":["null",{"type":"string","avro.java.string":"String"}],"default":null},{"name":"metrics","type":["null",{"type":"map","values":"double","avro.java.string":"String"}],"default":null}]}}],"default":null},{"name":"extraMetadata","type":["null",{"type":"map","values":{"type":"string","avro.java.string":"String"},"avro.java.string":"String"}],"default":null}]}� _v :���[�gקGC�20200521174801�s3://bucket01/AWS_CASE/0/.cfe57b0e-1ac0-4650-960a-615cdba323f9-0_20200521174801.log.1_1-927-392109�s3://bucket01/AWS_CASE/0/cfe57b0e-1ac0-4650-960a-615cdba323f9-0_1-895-378588_20200521174801.parquetLcfe57b0e-1ac0-4650-960a-615cdba323f9-00TOTAL_LOG_FILES�?TOTAL_IO_MB TOTAL_IO_READ_MB(TOTAL_LOG_FILES_SIZE�@"TOTAL_IO_WRITE_MB&TOTAL_LOG_FILE_SIZE�@�  _v :���[�gקGC; line: 1, column: 11]
  at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1581)
  at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:533)
  at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._reportInvalidToken(ReaderBasedJsonParser.java:2462)
  at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._handleOddValue(ReaderBasedJsonParser.java:1621)
  at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:689)
  at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3776)
  at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3721)
  at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2726)
  at org.apache.hudi.common.model.HoodieCommitMetadata.fromJsonString(HoodieCommitMetadata.java:129)
  at org.apache.hudi.common.model.HoodieCommitMetadata.fromBytes(HoodieCommitMetadata.java:326)
  ... 50 more

Error – 2:

java.lang.IllegalArgumentException: Could not find any data file written for commit [20200520134502__deltacommit__COMPLETED], could not get schema for dataset s3://bucket01/AWS_CASE
, CommitMetadata :HoodieCommitMetadata{partitionToWriteStats={}, compacted=false, extraMetadataMap={}}
    at org.apache.hudi.hive.HoodieHiveClient.lambda$null$10(HoodieHiveClient.java:393)
    at java.util.Optional.orElseThrow(Optional.java:290)
    at org.apache.hudi.hive.HoodieHiveClient.lambda$getDataSchema$11(HoodieHiveClient.java:391)
    at java.util.Optional.orElseGet(Optional.java:267)
    at org.apache.hudi.hive.HoodieHiveClient.getDataSchema(HoodieHiveClient.java:387)
    at org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:93)
    at org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:71)
    at org.apache.hudi.HoodieSparkSqlWriter$.syncHive(HoodieSparkSqlWriter.scala:236)
    at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:169)
    at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:91)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:83)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:84)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:165)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:74)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
    at br.com.agi.bigdata.fastdata.aux.emr.schema1.table1.Main$.processRDD(Main.scala:109)
    at br.com.agi.bigdata.fastdata.aux.emr.schema1.table1.Main$$anonfun$main$1.apply(Main.scala:145)
    at br.com.agi.bigdata.fastdata.aux.emr.schema1.table1.Main$$anonfun$main$1.apply(Main.scala:135)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
    at scala.util.Try$.apply(Try.scala:192)
    at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Error - 3:

20/05/21 17:20:33 ERROR JobScheduler: Error running job streaming job 1590081600000 ms.0
java.lang.IllegalArgumentException: Failed to read schema from data file s3://bucket01/AWS_CASE/0/cfe57b0e-1ac0-4650-960a-615cdba323f9-0_0-1453-608455_20200521172025.parquet. File does not exist.
  at org.apache.hudi.hive.HoodieHiveClient.readSchemaFromDataFile(HoodieHiveClient.java:456)
  at org.apache.hudi.hive.HoodieHiveClient.readSchemaFromLastCompaction(HoodieHiveClient.java:432)
  at org.apache.hudi.hive.HoodieHiveClient.getDataSchema(HoodieHiveClient.java:407)
  at org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:93)
  at org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:71)
  at org.apache.hudi.HoodieSparkSqlWriter$.syncHive(HoodieSparkSqlWriter.scala:236)
  at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:169)
  at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:91)
  at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
  at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83)
  at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:83)
  at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
  at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
  at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:84)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:165)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:74)
  at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
  at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
  at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
  at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
  at br.com.agi.bigdata.awscase.Main$.processRDD(Main.scala:91)
  at br.com.agi.bigdata.awscase.Main$$anonfun$main$1.apply(Main.scala:117)
  at br.com.agi.bigdata.awscase.Main$$anonfun$main$1.apply(Main.scala:114)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
  at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
  at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
  at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
  at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
  at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
  at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
  at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
  at scala.util.Try$.apply(Try.scala:192)
  at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
  at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
  at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
  at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
  at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
  at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:748)
20/05/21 17:20:33 ERROR ApplicationMaster: User class threw exception: java.lang.IllegalArgumentException: Failed to read schema from data file s3://bucket01/AWS_CASE/0/cfe57b0e-1ac0-4650-960a-615cdba323f9-0_0-1453-608455_20200521172025.parquet. File does not exist.
java.lang.IllegalArgumentException: Failed to read schema from data file s3://bucket01/AWS_CASE/0/cfe57b0e-1ac0-4650-960a-615cdba323f9-0_0-1453-608455_20200521172025.parquet. File does not exist.
  at org.apache.hudi.hive.HoodieHiveClient.readSchemaFromDataFile(HoodieHiveClient.java:456)
  at org.apache.hudi.hive.HoodieHiveClient.readSchemaFromLastCompaction(HoodieHiveClient.java:432)
  at org.apache.hudi.hive.HoodieHiveClient.getDataSchema(HoodieHiveClient.java:407)
  at org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:93)
  at org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:71)
  at org.apache.hudi.HoodieSparkSqlWriter$.syncHive(HoodieSparkSqlWriter.scala:236)
  at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:169)
  at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:91)
  at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
  at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83)
  at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:83)
  at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
  at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
  at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:84)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:165)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:74)
  at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
  at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
  at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
  at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
  at br.com.agi.bigdata.awscase.Main$.processRDD(Main.scala:91)
  at br.com.agi.bigdata.awscase.Main$$anonfun$main$1.apply(Main.scala:117)
  at br.com.agi.bigdata.awscase.Main$$anonfun$main$1.apply(Main.scala:114)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
  at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
  at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
  at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
  at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
  at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
  at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
  at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
  at scala.util.Try$.apply(Try.scala:192)
  at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
  at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
  at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
  at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
  at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
  at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:748)

Hudi-cli information:

hudi->connect --path s3://bucket01/AWS_CASE
6671 [Spring Shell] INFO  org.apache.hudi.common.table.HoodieTableMetaClient  - Loading HoodieTableMetaClient from s3://bucket01/AWS_CASE
7242 [Spring Shell] WARN  org.apache.hadoop.util.NativeCodeLoader  - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
11229 [Spring Shell] INFO  org.apache.hudi.common.util.FSUtils  - Hadoop Configuration: fs.defaultFS: [hdfs://ip-10-xx-x-xx.agi.aws.local:8020], Config:[Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml, emrfs-site.xml], FileSystem: [com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem@6a9763d6]
12778 [Spring Shell] INFO  org.apache.hudi.common.table.HoodieTableConfig  - Loading dataset properties from s3://bucket01/AWS_CASE/.hoodie/hoodie.properties
12795 [Spring Shell] INFO  com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem  - Opening 's3://bucket01/AWS_CASE/.hoodie/hoodie.properties' for reading
12835 [Spring Shell] INFO  org.apache.hudi.common.table.HoodieTableMetaClient  - Finished Loading Table of type MERGE_ON_READ from s3://bucket01/AWS_CASE
Metadata for table AWS_CASE loaded

hudi:AWS_CASE->desc
35395 [Spring Shell] INFO  org.apache.hudi.common.table.timeline.HoodieActiveTimeline  - Loaded instants java.util.stream.ReferencePipeline$Head@1e3450cf
╔═════════════════════════════════╤════════════════════════════════════════════════╗
║ Property                        │ Value                                          ║
╠═════════════════════════════════╪════════════════════════════════════════════════╣
║ basePath                        │ s3://bucket01/AWS_CASE                         ║
╟─────────────────────────────────┼────────────────────────────────────────────────╢
║ metaPath                        │ s3://bucket01/AWS_CASE/.hoodie                 ║
╟─────────────────────────────────┼────────────────────────────────────────────────╢
║ fileSystem                      │ s3                                             ║
╟─────────────────────────────────┼────────────────────────────────────────────────╢
║ hoodie.table.name               │ AWS_CASE                                       ║
╟─────────────────────────────────┼────────────────────────────────────────────────╢
║ hoodie.compaction.payload.class │ org.apache.hudi.common.model.HoodieAvroPayload ║
╟─────────────────────────────────┼────────────────────────────────────────────────╢
║ hoodie.table.type               │ MERGE_ON_READ                                  ║
╟─────────────────────────────────┼────────────────────────────────────────────────╢
║ hoodie.archivelog.folder        │ archived                                       ║
╚═════════════════════════════════╧════════════════════════════════════════════════╝

**hudi:AWS_CASE->commits show**
69651 [Spring Shell] INFO  com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem  - Opening 's3://bucket01/AWS_CASE/.hoodie/20200521174926.commit' for reading
Command failed java.lang.reflect.UndeclaredThrowableException
java.lang.reflect.UndeclaredThrowableException
  at org.springframework.util.ReflectionUtils.rethrowRuntimeException(ReflectionUtils.java:315)
  at org.springframework.util.ReflectionUtils.handleInvocationTargetException(ReflectionUtils.java:295)
  at org.springframework.util.ReflectionUtils.handleReflectionException(ReflectionUtils.java:279)
  at org.springframework.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:219)
  at org.springframework.shell.core.SimpleExecutionStrategy.invoke(SimpleExecutionStrategy.java:68)
  at org.springframework.shell.core.SimpleExecutionStrategy.execute(SimpleExecutionStrategy.java:59)
  at org.springframework.shell.core.AbstractShell.executeCommand(AbstractShell.java:134)
  at org.springframework.shell.core.JLineShell.promptLoop(JLineShell.java:533)
  at org.springframework.shell.core.JLineShell.run(JLineShell.java:179)
  at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: unable to read commit metadata
  at org.apache.hudi.common.model.HoodieCommitMetadata.fromBytes(HoodieCommitMetadata.java:328)
  at org.apache.hudi.cli.commands.CommitsCommand.showCommits(CommitsCommand.java:89)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:498)
  at org.springframework.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:216)
  ... 6 more
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'Objavro': was expecting ('true', 'false' or 'null')
 at [Source: Objavro.schema�{"type":"record","name":"HoodieCompactionPlan","namespace":"org.apache.hudi.avro.model","fields":[{"name":"operations","type":["null",{"type":"array","items":{"type":"record","name":"HoodieCompactionOperation","fields":[{"name":"baseInstantTime","type":["null",{"type":"string","avro.java.string":"String"}]},{"name":"deltaFilePaths","type":["null",{"type":"array","items":{"type":"string","avro.java.string":"String"}}],"default":null},{"name":"dataFilePath","type":["null",{"type":"string","avro.java.string":"String"}],"default":null},{"name":"fileId","type":["null",{"type":"string","avro.java.string":"String"}]},{"name":"partitionPath","type":["null",{"type":"string","avro.java.string":"String"}],"default":null},{"name":"metrics","type":["null",{"type":"map","values":"double","avro.java.string":"String"}],"default":null}]}}],"default":null},{"name":"extraMetadata","type":["null",{"type":"map","values":{"type":"string","avro.java.string":"String"},"avro.java.string":"String"}],"default":null}]}� _v :���[�gקGC�20200521174801�s3://bucket01/AWS_CASE/0/.cfe57b0e-1ac0-4650-960a-615cdba323f9-0_20200521174801.log.1_1-927-392109�s3://bucket01/AWS_CASE/0/cfe57b0e-1ac0-4650-960a-615cdba323f9-0_1-895-378588_20200521174801.parquetLcfe57b0e-1ac0-4650-960a-615cdba323f9-00
                                                               TOTAL_LOG_FILES�?TOTAL_IO_MB TOTAL_IO_READ_MB(TOTAL_LOG_FILES_SIZE�@"TOTAL_IO_WRITE_MB&TOTAL_LOG_FILE_SIZE�@�  _v :���[�gקGC; line: 1, column: 11]
  at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1581)
  at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:533)
  at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._reportInvalidToken(ReaderBasedJsonParser.java:2462)
  at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._handleOddValue(ReaderBasedJsonParser.java:1621)
  at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:689)
  at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3776)
  at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3721)
  at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2726)
  at org.apache.hudi.common.model.HoodieCommitMetadata.fromJsonString(HoodieCommitMetadata.java:129)
  at org.apache.hudi.common.model.HoodieCommitMetadata.fromBytes(HoodieCommitMetadata.java:326)
  ... 12 more

Sample code used:

package br.com.agi.bigdata.awscase

import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.hive.MultiPartKeysValueExtractor
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{ArrayType, StringType, StructField, StructType}
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.{CanCommitOffsets, HasOffsetRanges, KafkaUtils}
import org.apache.spark.streaming.{Minutes, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

object Main {

  val sparkConf: SparkConf = new SparkConf()
    .set("spark.sql.catalogImplementation", "hive")
    .set("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2")
    .set("spark.streaming.driver.writeAheadLog.closeFileAfterWrite", "true")
    .set("spark.streaming.receiver.writeAheadLog.closeFileAfterWrite", "true")
    .set("spark.yarn.maxAppAttempts", "3")
    .set("spark.locality.wait", "1")
    .set("spark.streaming.backpressure.enabled", "true")
    .set("spark.streaming.backpressure.initialRate", "1")
    .set("spark.streaming.receiver.maxRate", "1")
    .set("spark.streaming.receiver.initialRate", "1")
    .set("spark.sql.hive.convertMetastoreParquet", "false")
    .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

  val sc = new SparkContext(sparkConf)
  val spark = SparkSession.builder().config(sparkConf).getOrCreate()
  spark.sparkContext.setLogLevel("ERROR")

  def processRDD(rddRaw: RDD[ConsumerRecord[String, String]]): Unit = {
    import spark.implicits._

    val hiveSchema = spark.sql(s"select * from datalake.aux.emr.schema1.table1 limit 1").schema

    val fieldsRemove = Array("current_ts", "table", "op_type", "op_ts", "pos", "primary_keys", "tokens", "date_partition")

    val jsonSchema = Seq[StructField](
      StructField("table", StringType),
      StructField("op_type", StringType),
      StructField("op_ts", StringType),
      StructField("current_ts", StringType),
      StructField("pos", StringType),
      StructField("primary_keys", ArrayType(StringType)),
      StructField("normalizedkey", StringType),
      StructField("after", StructType(hiveSchema.fields.filter(it => !fieldsRemove.contains(it.name)).map(it => StructField(it.name.toUpperCase, it.dataType, it.nullable, it.metadata))))
    )

    var df = spark.read
      .schema(StructType(jsonSchema))
      .json(rddRaw.map(_.value()))
      .withColumn("normalizedkey", concat(lit(col("current_ts")), lit(col("pos"))).cast(StringType))
      .select("table", "op_type", "op_ts", "current_ts", "pos", "primary_keys", "normalizedkey", "after.*")

    val hiveTableName = "AWS_CASE"
    val pks = df.select($"primary_keys").first().getSeq[String](0).map(col)

    df.withColumn("key", concat(pks: _*))
      .withColumn("partition_id", (pks(0)/10000000).cast("Integer"))
      .write.format("org.apache.hudi")
      .option(HoodieWriteConfig.TABLE_NAME, hiveTableName)
      .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "key")
      .option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY, "datalake")
      .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
      .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "normalizedkey")
      .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true")
      .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, hiveTableName)
      .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, classOf[MultiPartKeysValueExtractor].getName)
      .option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_STORAGE_TYPE_OPT_VAL)
      .option(DataSourceWriteOptions.DEFAULT_HIVE_ASSUME_DATE_PARTITION_OPT_VAL, "false")
      .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "partition_id")
      .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "partition_id")
      .option(DataSourceWriteOptions.HIVE_URL_OPT_KEY, "jdbc:hive2://10.xx.x.xxx:10000")
      .option("hoodie.consistency.check.enabled", "true")
      .option("hoodie.cleaner.policy", "KEEP_LATEST_FILE_VERSIONS")
      .option("hoodie.keep.max.commits", 2)
      .option("hoodie.keep.min.commits", 1)
      .option("hoodie.parquet.compression.codec", "snappy")
      .option("hoodie.cleaner.commits.retained", 0)
      .option("hoodie.parquet.max.file.size", 1073741824)
      .option("hoodie.parquet.small.file.limit", 943718400)
      .mode(SaveMode.Append)
      .save("s3://bucket01/AWS_CASE")

  }

  def main(args: Array[String]): Unit = {

    val ssc = new StreamingContext(sc, Minutes(5))
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "broker01:9092,broker02:9092,broker03:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "aws_case",
      "enable.auto.commit" -> (false: java.lang.Boolean),
      "auto.offset.reset" -> "earliest"
    )

    val stream = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](Array("TOPIC-01"), kafkaParams)
    )

    stream.foreachRDD(rddRaw => {
      val offsetRanges = rddRaw.asInstanceOf[HasOffsetRanges].offsetRanges
      if (!rddRaw.isEmpty()) {
        processRDD(rddRaw)
      }
      stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
    })
    ssc.start()
    ssc.awaitTermination()
  }
}

Build.sbt:

name := "aws-case-hudi" version := "0.1" scalaVersion := "2.11.12" libraryDependencies ++= Seq( "org.apache.spark" %% "spark-streaming" % "2.4.4" % "provided", "org.apache.spark" %% "spark-streaming-kafka-0-10" % "2.4.4", "org.apache.spark" %% "spark-sql" % "2.4.4", "org.apache.spark" %% "spark-core" % "2.4.4", "org.apache.hudi" % "hudi-spark-bundle" % "0.5.0-incubating", "org.apache.httpcomponents" % "httpclient" % "4.5.12", )

Any additional information I am available.

Best regards, bobgalvao

bhasudha commented 4 years ago

Taking a look.

Quick question: Why is this value set to zero ? "hoodie.cleaner.commits.retained"

Also, The first error seems to be coming from parsing Json. Can you paste the schema in a gist file may be ?

bobgalvao commented 4 years ago

Hello, The configuration parameter "hoodie.cleaner.commits.retained" was used in the test, please do not consider it. I'm sorry I didn't remove it.

In other processes that do not have this schema capture step, the error occurs.

hudi-issue.txt

Thank you!

bvaradar commented 4 years ago

@bobgalvao : Can you list your .hoodie/ and .hoodie/.aux folder and add it as attachment. It looks like you may be using hudi version 0.5.1. The stack-trace conforms to 0.5.1 and not to 0.5.0 Can you confirm this. Also, can you recheck if are not bringing in different versions of hudi accidentally ?

Regarding Problem 3: Does this file actually exist ? s3://bucket01/AWS_CASE/0/cfe57b0e-1ac0-4650-960a-615cdba323f9-0_0-1453-608455_20200521172025.parquet.

bobgalvao commented 4 years ago

@bvaradar: I am using AWS EMR 5.29.0 and according to the release informed by AWS it is 5.0.0-inc. I am using the default installation provided by AWS. Reference: https://docs.aws.amazon.com/emr/latest/ReleaseGuide/images/emr-releases-5x.png

Due to problems with S3, I started using HDFS, and I don't have files stored on S3. I will recreate the environment using S3 and as soon as I receive the errors, I send the information to you.

As I cannot reproduce easily. I may take a while to return. But as soon as the errors occur I will send them to you.

Thank you very much for your return.

bobgalvao commented 4 years ago

@bvaradar I checked the versions in the EMR 5.29.0 cluster and they are in 0.5.0.

drwxr-xr-x 2 root root 4096 Jun 14 14:00 bin drwxr-xr-x 4 root root 4096 Jun 14 14:00 cli -rw-r--r-- 1 root root 2728594 Dec 14 2019 hudi-hadoop-mr-bundle-0.5.0-incubating.jar lrwxrwxrwx 1 root root 42 Jun 14 14:00 hudi-hadoop-mr-bundle.jar -> hudi-hadoop-mr-bundle-0.5.0-incubating.jar -rw-r--r-- 1 root root 1134353 Dec 14 2019 hudi-hive-bundle-0.5.0-incubating.jar lrwxrwxrwx 1 root root 37 Jun 14 14:00 hudi-hive-bundle.jar -> hudi-hive-bundle-0.5.0-incubating.jar -rw-r--r-- 1 root root 20967361 Dec 14 2019 hudi-spark-bundle-0.5.0-incubating.jar lrwxrwxrwx 1 root root 38 Jun 14 14:00 hudi-spark-bundle.jar -> hudi-spark-bundle-0.5.0-incubating.jar -rw-r--r-- 1 root root 23310250 Dec 14 2019 hudi-timeline-server-bundle-0.5.0-incubating.jar lrwxrwxrwx 1 root root 48 Jun 14 14:00 hudi-timeline-server-bundle.jar -> hudi-timeline-server-bundle-0.5.0-incubating.jar -rw-r--r-- 1 root root 39051878 Dec 14 2019 hudi-utilities-bundle-0.5.0-incubating.jar lrwxrwxrwx 1 root root 42 Jun 14 14:00 hudi-utilities-bundle.jar -> hudi-utilities-bundle-0.5.0-incubating.jar [hadoop@ip-10-xx-x-xxx hudi]$ pwd /usr/lib/hudi

vinothchandar commented 4 years ago

@bobgalvao error-1 seems to be failing to read a .avro file, which I have not seen before.. this does not seem related to consistency issues actually, since s3 guarantees that partial objects are not exposed ...

error-2 is delta commit not having any data files for fetching the schema.. This should not happen in the recent 0.5.2 release even IIUC, since we are not writing schema with commit metadata always and reading out of it.. @bvaradar correct me if I am wrong.. but again does not seem related to s3

bvaradar commented 4 years ago

Correct, they do not seem to be related to consistency issues. I think hudi timeline might give some pointers to what is going on here.

bobgalvao commented 4 years ago

I started processing using S3 and am waiting for errors to occur. I am thinking of migrating EMR to version 5.30.0 which is with version 0.5.2 of Hudi. Do you suggest we continue to evaluate the errors that occurred in version 0.5.0 or do I upgrade from version to 0.5.2? Today, my production process is running smoothly with HDFS. But I want to use S3 again. Guys, thanks for attention so far.

bvaradar commented 4 years ago

@bobgalvao :Can you start with 0.5.2 on S3 and see if you can repro the error. We haven't seen this error before and it is always good to use 0.5.2 as it has general bug fixes

bobgalvao commented 4 years ago

@bvaradar: Perfect, I will start migrating to version 0.5.2 and signal you if an error occurs. Doubt: is it mandatory to use EMRFS when using Hudi with S3? Thank you.

vinothchandar commented 4 years ago

@bobgalvao nope it’s not . If you are using s3, please consider turning on consistency checks https://hudi.apache.org/docs/configurations.html#withConsistencyCheckEnabled

bobgalvao commented 4 years ago

Hi, I performed the migration to version 0.5.2 and no longer got the errors. Thanks for the support.

vinothchandar commented 4 years ago

@bobgalvao we also have a 0.5.3 out, with a bunch of perf fixes.. Might be going there directly as we cook 0.6.0 :)

root18039532923 commented 3 years ago

When I use hudi-0.7.0 and add clustering to my program, the same problem happened at the process time.But before I add clustering,no problem. image