apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.2k stars 2.38k forks source link

[SUPPORT] Spark Streaming - Failed to rollback #7285

Open rubenssoto opened 1 year ago

rubenssoto commented 1 year ago

Hello everyone,

We have one spark streaming 24/7 using Hudi COW, we use Spot instances, so, sometimes the machine crash and the streaming is started again. Our source is Kafka.

Hudi: 0.12.1 EMR on EKS: 6.7 Spark: 3.2.1

After two days running the job had these error:

22/11/23 14:07:02 INFO S3NativeFileSystem: Opening 's3://datalake/worldcup/order/.hoodie/20221122225647465.rollback.requested' for reading
22/11/23 14:07:02 ERROR HoodieStreamingSink: Micro batch id=4129 threw following exception: 
org.apache.hudi.exception.HoodieRollbackException: Failed to rollback s3://30-ze-datalake-raw/worldcup/order commits 20221122225538334
    at org.apache.hudi.client.BaseHoodieWriteClient.rollback(BaseHoodieWriteClient.java:789)
    at org.apache.hudi.client.BaseHoodieWriteClient.rollbackFailedWrites(BaseHoodieWriteClient.java:1198)
    at org.apache.hudi.client.BaseHoodieWriteClient.rollbackFailedWrites(BaseHoodieWriteClient.java:1181)
    at org.apache.hudi.client.BaseHoodieWriteClient.rollbackFailedWrites(BaseHoodieWriteClient.java:1169)
    at org.apache.hudi.client.BaseHoodieWriteClient.lambda$startCommitWithTime$97cdbdca$1(BaseHoodieWriteClient.java:970)
    at org.apache.hudi.common.util.CleanerUtils.rollbackFailedWrites(CleanerUtils.java:151)
    at org.apache.hudi.client.BaseHoodieWriteClient.startCommitWithTime(BaseHoodieWriteClient.java:969)
    at org.apache.hudi.client.BaseHoodieWriteClient.startCommitWithTime(BaseHoodieWriteClient.java:962)
    at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:328)
    at org.apache.hudi.HoodieStreamingSink.$anonfun$addBatch$2(HoodieStreamingSink.scala:91)
    at scala.util.Try$.apply(Try.scala:213)
    at org.apache.hudi.HoodieStreamingSink.$anonfun$addBatch$1(HoodieStreamingSink.scala:90)
    at org.apache.hudi.HoodieStreamingSink.retry(HoodieStreamingSink.scala:166)
    at org.apache.hudi.HoodieStreamingSink.addBatch(HoodieStreamingSink.scala:89)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:600)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
    at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
    at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
    at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:598)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:598)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:228)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:193)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:187)
    at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:303)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:286)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:209)
Caused by: org.apache.hudi.exception.HoodieRollbackException: Found commits after time :20221122225538334, please rollback greater commits first
    at org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor.validateRollbackCommitSequence(BaseRollbackActionExecutor.java:173)
    at org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor.doRollbackAndGetStats(BaseRollbackActionExecutor.java:212)
    at org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor.runRollback(BaseRollbackActionExecutor.java:110)
    at org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor.execute(BaseRollbackActionExecutor.java:137)
    at org.apache.hudi.table.HoodieSparkCopyOnWriteTable.rollback(HoodieSparkCopyOnWriteTable.java:281)
    at org.apache.hudi.client.BaseHoodieWriteClient.rollback(BaseHoodieWriteClient.java:772)
    ... 43 more

Do you have any idea what could cause this?

Thank you

codope commented 1 year ago

Are there other multiple writers? I am wondering commits after time 20221122225538334 went through. If you could share your configs and steps to reproduce, that would be great. I understand it may not be easy to repro, in that case, timeline would help.

rubenssoto commented 1 year ago

@codope no, only one writer for each table, we have 3 streamings and those 3 are suffering from the same problem, I'll get my configs to share with you.

rubenssoto commented 1 year ago
df = (
    spark.readStream.format("kafka")
    .option(
        "kafka.bootstrap.servers",
        "broker1,broker2,broker3",
    )
    .option("subscribe", "topic_name")
    .option("kafka.sasl.mechanism", "SCRAM-SHA-512")
    .option("kafka.security.protocol", "SASL_SSL")
    .option(
        "kafka.sasl.jaas.config",
        f'org.apache.kafka.common.security.scram.ScramLoginModule required username="USERNAME" password="{password}";',
    )
    .option("startingOffsets", "latest")
    .load()
    .selectExpr("substring(value, 6) as avro_value")
    .select(from_avro(col("avro_value"), schema).alias("data"))
    .select(col("data.*"))
)

query = (
    df.writeStream.format("hudi")
    .option(
        "checkpointLocation",
        "Checkpoint path",
    )
    .option("path", "Destination Path")
    .outputMode("append")
    .trigger(processingTime="60 seconds")
    .option("hoodie.table.name", "Table Name")
    .option("hoodie.datasource.write.recordkey.field", "id")
    .option("hoodie.datasource.write.table.name", "Table Name")
    .option("hoodie.datasource.hive_sync.enable", True)
    .option("hoodie.datasource.hive_sync.mode", "hms")
    .option("hoodie.datasource.hive_sync.database", "Database Name")
    .option("hoodie.datasource.hive_sync.table", "Table Name")
    .option(
        "hoodie.datasource.hive_sync.partition_extractor_class",
        "org.apache.hudi.hive.NonPartitionedExtractor",
    )
    .option("hoodie.datasource.hive_sync.support_timestamp", "true")
    .option("hoodie.upsert.shuffle.parallelism", 50)
    .option(
        "hoodie.datasource.hive_sync.partition_extractor_class",
        "org.apache.hudi.hive.NonPartitionedExtractor",
    )
    .option(
        "hoodie.datasource.write.keygenerator.class",
        "org.apache.hudi.keygen.NonpartitionedKeyGenerator",
    )
    .option("hoodie.datasource.write.row.writer.enable", "false")
    .option("hoodie.parquet.small.file.limit", 536870912)
    .option("hoodie.parquet.max.file.size", 1073741824)
    .option("hoodie.parquet.block.size", 536870912)
    .option("hoodie.datasource.write.operation", "upsert")
    .option("hoodie.datasource.write.precombine.field", "__ts_ms")
    .option("hoodie.datasource.compaction.async.enable", False)
    .start()
    .awaitTermination()
)
rubenssoto commented 1 year ago

We are thinking that the problem is in our side, because we have a job to start the streaming when it stop for any reason, I will update you soon.

nsivabalan commented 1 year ago

@codope : can you check these jiras. we made some fixes already around this. https://issues.apache.org/jira/browse/HUDI-3393 https://issues.apache.org/jira/browse/HUDI-739

nsivabalan commented 1 year ago

@rubenssoto : any updates for us in this regards. if its resolved, can you close it out.