apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.41k stars 2.43k forks source link

[SUPPORT] Spark job relying over Hudi are blocked after one or zero commit #11011

Open pontisa95 opened 6 months ago

pontisa95 commented 6 months ago

Hello, we are facing the fact that some pyspark job that rely on Hudi seems to be blocked, in fact if we go over the spark console we can see the following situation: Screenshot_20 1 we can see that we have 71 completed jobs but those are CDC process that should read from Kafka topic continuously. We verified yet that there are messages queued over the kafka topic. If you kill the application and then restart in some cases the job will act normally and other times the job still remain stacked.

Our deploy condition are the following: We read INSERT, UPDATE and DELETE operation from a Kafka topic and we replicate them in a target hudi table stored on Hive via a pyspark job running 24/7

Expected behavior

We would like to know if there is a way to reduce, or at least to keep constant, the writing latency on the hudi table and understand if there is something we can improve in the deploy condition or in other configuration described below.

Environment Description

Hudi version : 0.12.1-amzn-0 Spark version : 3.3.0 Hive version : 3.1.3 Hadoop version : 3.3.3 amz Storage (HDFS/S3/GCS..) : S3 Running on Docker? (yes/no) : no (EMR 6.9.0) Additional context

HOODIE TABLE PROPERTIES: 'hoodie.datasource.write.table.type': 'COPY_ON_WRITE', 'hoodie.datasource.write.keygenerator.class': 'org.apache.hudi.keygen.ComplexKeyGenerator', 'hoodie.datasource.write.hive_style_partitioning':'true', 'hoodie.index.type':'GLOBAL_BLOOM', 'hoodie.simple.index.update.partition.path':'true', 'hoodie.datasource.hive_sync.enable': 'true', 'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor', 'hoodie.datasource.hive_sync.use_jdbc': 'false', 'hoodie.datasource.hive_sync.mode': 'hms', 'hoodie.copyonwrite.record.size.estimate':285, 'hoodie.parquet.small.file.limit': 104857600, 'hoodie.parquet.max.file.size': 120000000, 'hoodie.cleaner.commits.retained': 1

KAFKA READ CONFIG: .readStream .format("kafka") .option("kafka.security.protocol", "SSL") .option("kafka.ssl.enabled.protocols", "TLSv1.2, TLSv1.1, TLSv1") .option("kafka.ssl.protocol", "TLS") .option("startingOffsets", "latest") .option("failOnDataLoss", "true") .option("maxOffsetsPerTrigger", 2000) .option("kafka.group.id",CG_NAME) .load()

PYSPARK WRITE df_source.writeStream.foreachBatch(foreach_batch_write_function)

  FOR EACH BATCH FUNCTION:
 #management of delete messages
  batchDF_deletes.write.format('hudi') \
              .option('hoodie.datasource.write.operation', 'delete') \
              .options(**hudiOptions_table) \
              .mode('append') \
              .save(S3_OUTPUT_PATH)

 #management of update and insert messages
  batchDF_upserts.write.format('org.apache.hudi') \
              .option('hoodie.datasource.write.operation', 'upsert') \
              .options(**hudiOptions_table) \
              .mode('append') \
              .save(S3_OUTPUT_PATH)

SPARK SUBMIT spark-submit --master yarn --deploy-mode cluster --num-executors 1 --executor-memory 1G --executor-cores 2 --conf spark.dynamicAllocation.enabled=false --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2 --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.hive.convertMetastoreParquet=false --jars /usr/lib/hudi/hudi-spark-bundle.jar

ad1happy2go commented 6 months ago

@pontisa95 Do you know which phase you see the task is getting stuck. You can check driver logs to know more. In case if its stuck in hive sync, can you let us know how number total partitions you have in your table.

pontisa95 commented 6 months ago

hello, i don't know what is the step were we my job is getting blocked because there are no running job when it happens, and it happens over multiple process that reply over different partition. I could say that it happens event when my table has 10 partitions or 100

ad1happy2go commented 6 months ago

@pontisa95 In this case, Can you check the driver logs where it is stuck, if possible share to us. Also can you paste the timeline?

ad1happy2go commented 6 months ago

@pontisa95 Were you able to get it resolved? If yes, Please let us know the issue and resolution or let us know in case you still need help here.