apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.46k stars 2.43k forks source link

[SUPPORT] Hudi MOR high latency on data availability #11118

Open sgcisco opened 7 months ago

sgcisco commented 7 months ago

Describe the problem you faced

Running a streaming solution with Kafka - Structured Streaming (PySpark) - Hudi (MOR tables) + AWS Glue+S3 we observed periodically growing latencies on data availability at Hudi. Latencies were measured as difference between data generation timestamp and _hudi_commit_timestamp and could go up to 30 min. Periodical manual checks for the latest available data points timestamps, by running queries as described here https://hudi.apache.org/docs/0.13.1/querying_data#spark-snap-query, confirmed such delays.

image

image

In case of using Spark with Hudi data read-out from Kafka had unstable rate

Screenshot 2024-04-29 at 11 49 29

To exclude impact from any other components but Hudi we ran some experiments with the same configuration and ingestion settings but without Hudi and with a direct write on S3. It did not reveal any delays above 2 mins, where 1 min delay is always present due to Structured Streaming minibatch granularity. In this case a read-out Kafka rate was stable overtime.

Additional context

What tried

  1. We tried to optimize Hudi file sizing and MOR layout by applying suggestions from these references https://github.com/apache/hudi/issues/2151#issuecomment-706400445, https://cwiki.apache.org/confluence/display/HUDI/FAQ#FAQ-HowdoItoavoidcreatingtonsofsmallfiles, https://github.com/apache/hudi/issues/2151#issuecomment-706400445

We could get a target file size between 90-120Mb by downing hoodie.copyonwrite.record.size.estimate from 1024 to 100 and using Inline.compact=false and delta.commits=1 and async.compact=true and hoodie.merge.small.file.group.candidates.limit=20 but it did not have any impact on a latency.

  1. Another commit strategy NUM_OR_TIME as suggested here https://github.com/apache/hudi/issues/8975#issuecomment-1593408753 with parameters below did not help to resolve a problem
    "hoodie.copyonwrite.record.size.estimate": "100",
    "hoodie.compact.inline.trigger.strategy": "NUM_OR_TIME",
    "hoodie.metadata.compact.max.delta.commits": "5",
    "hoodie.compact.inline.max.delta.seconds": "60",

Current settings

As a trade-off we came up to the configuration below, which allows us to have relatively low latencies for 90th percentile and file size 40-90Mb

"hoodie.merge.small.file.group.candidates.limit": "40",
"hoodie.cleaner.policy": "KEEP_LATEST_FILE_VERSIONS",

10_31_12

But still some records could go up to 30 min.

02_42_29

However the last config works relatively well for low ingestion rates up to 1.5Mb/s with a daily partitioning partition_date=yyyy-MM-dd/ but stops work for the rates above 2.5 Mb/s even with more granular partitioning partition_date=yyyy-MM-dd-HH/

Expected behavior

Since we use MOR tables:

Environment Description

Hudi configuration

                "hoodie.datasource.hive_sync.auto_create_database": "true",
                "hoodie.datasource.hive_sync.enable": "true",
                "hoodie.datasource.hive_sync.mode": "hms",
                "hoodie.datasource.hive_sync.table": table_name,
                "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor",
                "hoodie.datasource.hive_sync.use_jdbc": "false",
                "hoodie.datasource.hive_sync.database": _glue_db_name,
                "hoodie.datasource.write.hive_style_partitioning": "true",
                "hoodie.datasource.write.keygenerator.class": "org.apache.hudi.keygen.ComplexKeyGenerator",
                "hoodie.datasource.write.operation": "upsert",
                "hoodie.datasource.write.schema.allow.auto.evolution.column.drop": "true",
                "hoodie.datasource.write.table.name": table_name,
                "hoodie.datasource.write.table.type": "MERGE_ON_READ",
                "hoodie.datasource.write.table.name": table_name,
                "hoodie.metadata.index.bloom.filter.enable": "true",
                "hoodie.metadata.index.column.stats.enable": "true",
                "hoodie.table.name": table_name,
                "hoodie.parquet.small.file.limit" : "104857600",
                "hoodie.parquet.max.file.size" : "125829120",
                "hoodie.merge.small.file.group.candidates.limit": "40",
                "hoodie.cleaner.policy": "KEEP_LATEST_FILE_VERSIONS",

Spark configuration

            "spark.sql.extensions": "org.apache.spark.sql.hudi.HoodieSparkSessionExtension",
            "spark.serializer": "org.apache.spark.serializer.KryoSerializer",
            # Glue support
            "spark.hadoop.hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory",
            # Spark resource
            "spark.driver.cores": "4",
            "spark.driver.memory": "4400m",
            "spark.driver.memoryOverhead": "800m",
            "spark.executor.cores": "4",
           "spark.executor.memory": "4400m",
           "spark.executor.memoryOverhead": "800m",
           "spark.dynamicAllocation.initialExecutors": "4",
           "spark.dynamicAllocation.minExecutors": "4",
           "spark.dynamicAllocation.maxExecutors": "8"
ad1happy2go commented 6 months ago

Thanks for raising this @sgcisco . I noticed you are using compact num.delta commits as 1. Any reason for the same. If we need to compact after every commit, then better we use COW table itself. One other reason may be the ingestion Job is starved of resources as async compact job may be consuming. Did we analysed spark UI. Which stage is started taking more time.

sgcisco commented 6 months ago

@ad1happy2go thanks for your reply. We tried compact num.delta commits as 1 in one of the tests for other runs and in what try to use now it is a default value which is 5.

As another test attempt we tried to run a pipeline over several days but with lower ingestion rate 600Kb/s and the same Hudi and Spark configuration as above.

The most time consuming stage is Building workload profile which takes 2.5 - 12 min, with average around 7 min.

Screenshot 2024-04-30 at 19 44 00

Screenshot 2024-04-30 at 20 37 15

Over 3 days partitions latencies look as

Screenshot 2024-04-30 at 21 00 20

So in this case it is around 35-40Mb per minute, current Structured Streaming minibatch, and workers can go up to 35Gb and 32 cores. Does it look as a sufficient resource config for Hudi to handle such load?

ad1happy2go commented 6 months ago

@sgcisco What is nature of your record key? Is it random id ? Building workload profile do the index lookup which is basically the join between the existing data with the incremental data to identify which records to be updated or inserted. Are you seeing the disk spill during this operation, you can try increasing the executor memory to avoid the same.

sgcisco commented 6 months ago

@ad1happy2go record key looks as record_keys=["timestamp", "A", "B", "C"],. Where timestamp is monotonically increasing in ms, A a string with a range of some 500k values, B is similar to A, C is max hundred values. We use upsert which is a default operation but we don't expect any updates on the inserted values. We tried insert but observed latencies were worse.

Increasing partitioning granularity from daily to hourly seems help to decrease latencies but not to solve the problem completely. Screenshot 2024-05-01 at 22 07 16

In this case partitioning size goes down from 100Gb to 4.7Gb.

Are you seeing the disk spill during this operation, you can try increasing the executor memory to avoid the same.

No, over 15h running job

Screenshot 2024-05-01 at 22 19 07

In this case, with low ingestion rate ~600Kb/s and hourly partitions, at Spark Structured streaming Operation duration/Batch duration grows each time towards end of the hour.

image

Which looks similar for latencies in written data

image

ad1happy2go commented 5 months ago

@sgcisco Sorry for the delayed response here. Were you able to get the root cause for the same. What was your batch interval in this case?

sgcisco commented 5 months ago

@ad1happy2go no, we could not figure out a working configuration. Batch interval is 1 min mentioned in my original message

It did not reveal any delays above 2 mins, where 1 min delay is always present due to Structured Streaming minibatch granularity.