apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.41k stars 2.43k forks source link

[SUPPORT] How to reduce hoodie commit latency #8261

Closed alexone95 closed 1 year ago

alexone95 commented 1 year ago

Hello, we are facing the fact that commit are getting slower and slower as time goes by (from a delta commit of 160 s during the day 1 to a delta commit of 300 s during day 4). Our deploy condition are the following:

Expected behavior

We would like to know if there is a way to reduce, or at least to keep constant, the writing latency on the hudi table and understand if there is something we can improve in the deploy condition or in other configuration described below.

Environment Description

Additional context

HOODIE TABLE PROPERTIES: 'hoodie.datasource.write.table.type': 'COPY_ON_WRITE', 'hoodie.datasource.write.keygenerator.class': 'org.apache.hudi.keygen.ComplexKeyGenerator', 'hoodie.datasource.write.hive_style_partitioning':'true', 'hoodie.index.type':'GLOBAL_BLOOM', 'hoodie.simple.index.update.partition.path':'true', 'hoodie.datasource.hive_sync.enable': 'true', 'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor', 'hoodie.datasource.hive_sync.use_jdbc': 'false', 'hoodie.datasource.hive_sync.mode': 'hms', 'hoodie.copyonwrite.record.size.estimate':285, 'hoodie.parquet.small.file.limit': 104857600, 'hoodie.parquet.max.file.size': 120000000, 'hoodie.cleaner.commits.retained': 1

KAFKA READ CONFIG: .readStream \ .format("kafka") \ .option("kafka.security.protocol", "SSL") \ .option("kafka.ssl.enabled.protocols", "TLSv1.2, TLSv1.1, TLSv1") \ .option("kafka.ssl.protocol", "TLS") \ .option("startingOffsets", "latest") \ .option("failOnDataLoss", "true") \ .option("maxOffsetsPerTrigger", 2000) \ .option("kafka.group.id",CG_NAME) \ .load()

PYSPARK WRITE df_source.writeStream.foreachBatch(foreach_batch_write_function)

      FOR EACH BATCH FUNCTION:
     #management of delete messages
      batchDF_deletes.write.format('hudi') \
                  .option('hoodie.datasource.write.operation', 'delete') \
                  .options(**hudiOptions_table) \
                  .mode('append') \
                  .save(S3_OUTPUT_PATH)

     #management of update and insert messages
      batchDF_upserts.write.format('org.apache.hudi') \
                  .option('hoodie.datasource.write.operation', 'upsert') \
                  .options(**hudiOptions_table) \
                  .mode('append') \
                  .save(S3_OUTPUT_PATH)

SPARK SUBMIT spark-submit --master yarn --deploy-mode cluster --num-executors 1 --executor-memory 1G --executor-cores 2 --conf spark.dynamicAllocation.enabled=false --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2 --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.hive.convertMetastoreParquet=false --jars /usr/lib/hudi/hudi-spark-bundle.jar

Thanks

kazdy commented 1 year ago

in 0.12.1 there was a bug related to hive sync, I also observed increasing processing time, which was mostly spent on reading all files under .hoodie/archived directory you can disable hive sync to confirm if this is the issue EMR 6.10 comes with 0.12.2 and this is fixed in this version afaik

switching to non global index could also help, depending on the workload you can also consider async table services if this is streaming job, but you'd need to rewrite your job and not use forEachBatch() as it only works with inline services, or you can disable table services and run these as a separate job (here I think you'd need to enable OCC and use some lock provider)

alexone95 commented 1 year ago

Hi

thanks for the answer, disable the hive sync seems to solve the problem generating a lower writing latency.

Thanks for support

alexone95 commented 1 year ago

As i said in the previous comment, having disabled the hive sync configuration we get an improvement for the delta commits time. In the script we partition by the attunity_dt, in this way, every day we get a new partition on S3 that didn't get recognize so that we don't get new records of the day in our table. There is a workaround to deal with this problem in 0.12.1?

kazdy commented 1 year ago

The issue here is that hudi reads all files under .hoodie/archived directory and the number of files to read grows with every archived commit.

The workaround is to clean .hoodie/archived directory frequently (or move files to another dir). Some users enabled s3 lifecycle rule to expire objects under this prefix. I have not tried it myself as I don't want to remove anything manually.

You can also run hive sync in a separate job once a day so that new partitions are added. Then, it will not affect your data writing. Bit after some time this also will become slow and use more memory.

nsivabalan commented 1 year ago

here is the fix: https://github.com/apache/hudi/pull/7561 that went into 0.13.0.

alexone95 commented 1 year ago

here is the fix: #7561 that went into 0.13.0.

Hello, there is a way to patch hudi to version 0.13.0 in EMR 6.9?

kazdy commented 1 year ago

Aws emr team provided me with patched hudi 0.12.1 jar, you can ask aws support for it and instructions how to provide it to the cluster