apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.19k stars 2.38k forks source link

Kafka readStream performance slow [SUPPORT] #2083

Closed rafaelhbarros closed 1 year ago

rafaelhbarros commented 3 years ago

Describe the problem you faced

I have a kafka topic that produces 1-2 million records per minute. I'm trying to write these records to s3 in the hudi format. I can't get it to keep up with the input. I'm running on EMR, m5.xlarge driver, 3x c5.xlarge core instances. The data is serialized in avro, and deserialized with schema registry (using abris).

Environment Description

    --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
    --master yarn \
    --name hudi-consumer \
    --deploy-mode cluster \
    --conf spark.yarn.submit.waitAppCompletion=false \
    --conf spark.scheduler.mode=FAIR \
    --conf spark.task.maxFailures=10 \
    --conf spark.memory.fraction=0.4 \
    --conf spark.rdd.compress=true \
    --conf spark.kryoserializer.buffer.max=512m \
    --conf spark.memory.storageFraction=0.1 \
    --conf spark.shuffle.service.enabled=true \
    --conf spark.sql.hive.convertMetastoreParquet=false \
    --conf spark.driver.maxResultSize=3g \
    --conf spark.yarn.max.executor.failures=10 \
    --conf spark.file.partitions=10 \
    --conf spark.sql.shuffle.partitions=80 \
    --conf spark.executor.extraJavaOptions="-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 -XX:+CMSClassUnloadingEnabled -XX:+ExitOnOutOfMemoryError" \
    --conf spark.driver.extraJavaOptions="-XX:+PrintTenuringDistribution -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCApplicationConcurrentTime -XX:+PrintGCTimeStamps -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/hoodie-heapdump.hprof" \
    --driver-memory 4G \
    --executor-memory 5G \
    --executor-cores 4 \
    --num-executors 6 \
    --class <class> <jar>

Hudi confs:

    hoodie.combine.before.upsert=false 
    hoodie.bulkinsert.shuffle.parallelism=10 
    hoodie.insert.shuffle.parallelism=10 
    hoodie.upsert.shuffle.parallelism=10 
    hoodie.delete.shuffle.parallelism=1
    TABLE_TYPE_OPT_KEY()=COW_TABLE_TYPE_OPT_VAL()

0.5.2-incubating

S3

No

zherenyu831 commented 3 years ago

Hi rafaelhbarros We are running similar solution as you, just a suggestion, isn't the parallelism too small for you?

hoodie.insert.shuffle.parallelism=10 
hoodie.upsert.shuffle.parallelism=10

we using 1500 parallelism(default) to handle 2000 upsert tps with 9 cores of c5 instances.

rnatarajan commented 3 years ago

Update on what @rafaelhbarros has mentioned.

With Hudi 0.6.0, Identified a bottleneck in Sort and turned the feature off ("hoodie.bulkinsert.sort.mode - NONE"). Matching parallelism with number of coresexecutors available give the optimal speed. If the coresexecutors = 10 and if parallelism is 20, then 10 cores*processors cannot perform real parallelism of 20 and the time taken to process the record becomes more.

With Hudi MoR and Bulk Insert + without Sort, with the parameters that @rafaelhbarros has posted was able to achieve about 20K Rows Per second.

With Hudi CoW and Insert Mode + Without Sort was able to achieve 15K Rows per second.

We are aiming to achieve about 20K Rows per second with similar hardware( --driver-memory 4G --executor-memory 5G
--executor-cores 4 --num-executors 6 ).

rnatarajan commented 3 years ago

Update on this:

Found the bottleneck as countByKey

We were reading data from Kafka(spread across 20 partitions) We tested with hoodie.datasource.write.partitionpath.field as "" or ""

In both cases, records read from Kafka across all partitioned(For a batch) was shuffled performing countByKey. This caused a major throughput drop.

tooptoop4 commented 3 years ago

did u fix?

rafaelhbarros commented 3 years ago

Still no success, our issue seems to be very closely aligned with this and this

n3nash commented 3 years ago

@rafaelhbarros Are you using bulkInsert operation in Hudi ? Could you repost the following information so I can help resolve your issue

rnatarajan commented 3 years ago

@n3nash

Environment - AWS. Master Nodes - 1 m5.xlarge - 4 vCore, 16 GiB memory Core Nodes - 6 c5.xlarge - 4 vCore, 8 GiB memory Spark Submit config - --driver-memory 4G --executor-memory 5G --executor-cores 4 --num-executors 6 Hudi Config - hoodie.combine.before.upsert=false hoodie.bulkinsert.shuffle.parallelism=10 hoodie.insert.shuffle.parallelism=10 hoodie.upsert.shuffle.parallelism=10 hoodie.delete.shuffle.parallelism=1 hoodie.datasource.write.operatio=bulk_insert hoodie.bulkinsert.sort.mode=NONE hoodie.datasource.write.table.type=MERGE_ON_READ hoodie.datasource.write.partitionpath.field="" hoodie.combine.before.upsert=false

The events ingested are not time based events.

Events have unique id in the of type long and using the unique id as hoodie.datasource.write.recordkey.field Events have a date field and the date field is used as hoodie.datasource.write.precombine.field

Events have 40 columns of types - long, int, date, timestamp, string.

For Ingesting, we attempted both bulk insert and insert.

n3nash commented 3 years ago

@rnatarajan Thanks for sharing this information, this is helpful. Firstly, you seem to have 24 cores (4*6) which means you can get a parallelism of 24. So, for you can try setting bulkinsert.shuffle.parallelism=24 for starters. I need some more information based on what you provided : 1) Right now, you are able to ingest 15K rows/second with the current setup, but you want to achieve 20K rows/second, is that correct ? 2) Are you using SparkStructured Streaming to ingest or are you using Spark datasource and running batch jobs ? If it is spark structured streaming, can you share screenshots of the read stages of the DAG, essentially, the stages where spark is reading from Kafka ? 3) Which part of the entire DAG is taking most time right now ?

rnatarajan commented 3 years ago

Sorry I did not update the ticket in the past week.

  1. Yes that is correct.
  2. I tried with both Spark Structured Streaming and DStream. But Since our source is Debezium/Kafka, we had to use ForEachRDD to convert few fields (days since epoch to date, unix time to timestamp) and then we are doing df.write.format("hudi")......mode(SaveMode.Append).save("")
  3. I am attaching the screenshot that shows that count takes most time in case of bulk_insert and countByKey in case of insert.

Structured Streaming had a higher throughput. But with triggers in Spark, I cannot post granular details. Hence using DStream to illustrate thoughput issue.

Regarding operation mode as insert, each DStream batch was about 434000 records. For the first batch processing took 1.3mins batch but then the processing time drops to about 37s. Attached are the details that narrow down the bottleneck to CountByKey. Picture shows that first batch took 53s in countByKey where as for subsequent batches drops to about 28s. Time taken in CountByKey remains around 28s for each batch of 434000 records. In this case DStream achieves a peak throughput of about 12K Rows per second.

insert_time_taken_by_each_batch insert_for_each_rdd insert_dag_details insert_first_batch_bottleneck insert_expand_bottleneck_in_first_batch insert_subsequent_batch_bottleneck

Regarding operation mode as bulk_insert, each DStream batch was about 434000 records. For the first batch processing took 1.2mins batch but then the processing time drops to about 34s. Attached are the details that narrow down the bottleneck to Count. Picture shows that first batch took 56s in Count where as for subsequent batches drops to about 32s. Time taken in Count remains around 32s for each batch of 434000 records. In this case DStream achieves a peak throughput of about 12K Rows per second.

bulk_insert_time_taken_by_each_batch bulk_insert_for_each_rdd bulk_insert_dag_details bulk_insert_first_batch_bottleneck bulk_insert_expand_bottleneck_in_first_batch bulk_insert_subsequent_batch_bottleneck

bvaradar commented 3 years ago

@rnatarajan : I was trying to find the batch interval in your comments but couldn't find it. What is the batch interval you kept for streaming ? If you are seeing around 28-32 secs consistently, then it should not be related to https://github.com/apache/hudi/issues/1830. Also, seeing consistent turnaround time means the write is not building the lag. Let me know if I understand correctly ?

nsivabalan commented 1 year ago

This seems like a very old version that is not actively maintained.

we have done lot of improvements around perf w/ hudi. https://hudi.apache.org/blog/2022/06/29/Apache-Hudi-vs-Delta-Lake-transparent-tpc-ds-lakehouse-performance-benchmarks

Can you try out 0.12. Closing it out due to long inactivity.

Feel free to open new issue if you need assistance.