apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.33k stars 2.42k forks source link

[SUPPORT] High runtime for a batch in SparkWriteHelper stage #6014

Open veenaypatil opened 2 years ago

veenaypatil commented 2 years ago

Describe the problem you faced

We are observing higher run times for a batch , it took 15hr plus to complete single batch, the subsequent batches are running fine. The dataset in question is not big. Attaching few screenshots for reference, GC times are less. hoodieConfigs for reference

Screenshot 2022-06-29 at 10 04 10 PM Screenshot 2022-06-29 at 10 06 53 PM Screenshot 2022-06-29 at 10 08 11 PM

To Reproduce

Steps to reproduce the behavior:

1. 2. 3. 4.

Expected behavior

A clear and concise description of what you expected to happen.

Environment Description

Additional context

Hudi Configs

hoodieConfigs:
  hoodie.datasource.write.operation: upsert
  hoodie.datasource.write.table.type: MERGE_ON_READ
  hoodie.datasource.write.partitionpath.field: ""
  hoodie.datasource.write.keygenerator.class: org.apache.hudi.keygen.NonpartitionedKeyGenerator
  hoodie.metrics.on: true
  hoodie.metrics.reporter.type: CLOUDWATCH
  hoodie.datasource.hive_sync.partition_extractor_class: org.apache.hudi.hive.NonPartitionedExtractor
  hoodie.parquet.max.file.size: 6110612736
  hoodie.compact.inline: true
  hoodie.clean.automatic: true
  hoodie.compact.inline.trigger.strategy: NUM_AND_TIME
  hoodie.clean.async: true
  hoodie.cleaner.policy: KEEP_LATEST_COMMITS
  hoodie.cleaner.commits.retained: 120
  hoodie.keep.min.commits: 130
  hoodie.keep.max.commits: 131

Spark Job configs

{
  "className": "com.hotstar.driver.CdcCombinedDriver",
  "proxyUser": "root",
  "driverCores": 1,
  "executorCores": 4,
  "executorMemory": "4G",
  "driverMemory": "4G",
  "queue": "cdc",
  "name": "hudiJob",
  "file": "s3a://bucket/jars/prod.jar",
  "conf": {
    "spark.eventLog.enabled": "false",
    "spark.ui.enabled": "true",
    "spark.streaming.concurrentJobs": "1",
    "spark.streaming.backpressure.enabled": "false",
    "spark.streaming.kafka.maxRatePerPartition": "500",
    "spark.yarn.am.nodeLabelExpression": "cdc",
    "spark.shuffle.service.enabled": "true",
    "spark.driver.maxResultSize": "8g",
    "spark.driver.memoryOverhead": "2048",
    "spark.executor.memoryOverhead": "2048",
    "spark.dynamicAllocation.enabled": "true",
    "spark.dynamicAllocation.minExecutors": "25",
    "spark.dynamicAllocation.maxExecutors": "50",
    "spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version": "2",
    "spark.jars.packages": "org.apache.spark:spark-avro_2.12:3.0.2,com.izettle:metrics-influxdb:1.2.3",
    "spark.serializer": "org.apache.spark.serializer.KryoSerializer",
    "spark.rdd.compress": "true",
    "spark.sql.hive.convertMetastoreParquet": "false",
    "spark.yarn.maxAppAttempts": "1",
    "spark.task.cpus": "1"
  }
}

Stacktrace

Add the stacktrace of the error.

veenaypatil commented 2 years ago

@bhasudha @vinothchandar @codope Can you please help here

yihua commented 2 years ago

cc @minihippo @xiarixiaoyao

minihippo commented 2 years ago

@veenaypatil Is there any task log which can help us in getting more information?

veenaypatil commented 2 years ago

@minihippo the task logs only shows the consumer logs and it is stuck after that.

2/08/03 11:02:40 INFO KafkaRDD: Computing topic in.split.prod.hspay.subscriptions.partner_subscriptions, partition 1 offsets 3422973 -> 3497480
22/08/03 11:02:40 INFO KafkaRDD: Computing topic in.split.prod.hspay.subscriptions.apple_partner_subscriptions, partition 0 offsets 6038717 -> 6222344
22/08/03 11:02:40 INFO KafkaRDD: Computing topic in.split.prod.hspay.subscriptions.bsnl_partner_subscriptions, partition 1 offsets 31179 -> 32046
22/08/03 11:02:40 INFO CodeGenerator: Code generated in 15.559109 ms
22/08/03 11:02:40 INFO InternalKafkaConsumer: Initial fetch for spark-executor-hudi_in_hspay_subs in.split.prod.hspay.subscriptions.apple_partner_subscriptions-0 6038717
22/08/03 11:02:40 INFO InternalKafkaConsumer: Initial fetch for spark-executor-hudi_in_hspay_subs in.split.prod.hspay.subscriptions.partner_subscriptions-1 3422973
22/08/03 11:02:40 INFO KafkaConsumer: [Consumer clientId=consumer-spark-executor-hudi_in_hspay_subs-2, groupId=spark-executor-hudi_in_hspay_subs] Seeking to offset 6038717 for partition in.split.prod.hspay.subscriptions.apple_partner_subscriptions-0
22/08/03 11:02:40 INFO InternalKafkaConsumer: Initial fetch for spark-executor-hudi_in_hspay_subs in.split.prod.hspay.subscriptions.bsnl_partner_subscriptions-1 31179
22/08/03 11:02:40 INFO KafkaConsumer: [Consumer clientId=consumer-spark-executor-hudi_in_hspay_subs-3, groupId=spark-executor-hudi_in_hspay_subs] Seeking to offset 3422973 for partition in.split.prod.hspay.subscriptions.partner_subscriptions-1
22/08/03 11:02:40 INFO KafkaConsumer: [Consumer clientId=consumer-spark-executor-hudi_in_hspay_subs-1, groupId=spark-executor-hudi_in_hspay_subs] Seeking to offset 31179 for partition in.split.prod.hspay.subscriptions.bsnl_partner_subscriptions-1
22/08/03 11:02:40 INFO FetchSessionHandler: [Consumer clientId=consumer-spark-executor-hudi_in_hspay_subs-1, groupId=spark-executor-hudi_in_hspay_subs] Error sending fetch request (sessionId=307489077, epoch=2) to node 35: {}.
org.apache.kafka.common.errors.DisconnectException
22/08/03 11:02:40 INFO FetchSessionHandler: [Consumer clientId=consumer-spark-executor-hudi_in_hspay_subs-3, groupId=spark-executor-hudi_in_hspay_subs] Error sending fetch request (sessionId=1858238483, epoch=26) to node 7: {}.
org.apache.kafka.common.errors.DisconnectException
22/08/03 11:02:40 INFO Executor: Finished task 21.0 in stage 285.0 (TID 14173). 1611 bytes result sent to driver
22/08/03 11:02:40 INFO FetchSessionHandler: [Consumer clientId=consumer-spark-executor-hudi_in_hspay_subs-2, groupId=spark-executor-hudi_in_hspay_subs] Error sending fetch request (sessionId=1695836972, epoch=3257) to node 11: {}.
org.apache.kafka.common.errors.DisconnectException
22/08/03 11:02:44 INFO Executor: Finished task 52.0 in stage 285.0 (TID 14197). 1611 bytes result sent to driver
22/08/03 11:04:25 INFO Executor: Finished task 29.0 in stage 285.0 (TID 14185). 1654 bytes result sent to driver
22/08/03 11:30:45 INFO BlockManager: Removing RDD 495
22/08/03 11:30:45 INFO BlockManager: Removing RDD 559
22/08/03 11:30:45 INFO BlockManager: Removing RDD 536
22/08/03 11:30:45 INFO BlockManager: Removing RDD 544

[mapToPair at SparkWriteHelper.java:63] This stage is running and I pasted the above log from one of the running task. This is causing a lot of issues when we try to kill the job and restart it. It only happens for the first batch. Like today the first is running from 24hrs

Screenshot 2022-08-03 at 6 26 19 PM
nsivabalan commented 2 years ago

@alexeykudinkin : can you look into this.

alexeykudinkin commented 2 years ago

@nsivabalan this doesn't seem to be related to performance, seems more of an issue of some jobs getting stuck.

@veenaypatil are you able to reliably reproduce this? To be able to troubleshoot this we will definitely need more info regarding your setup:

  1. Where are you reading from? (I see Kafka in the logs, but need to confirm)
  2. How are you reading? (Using DeltaStreamer, Spark Source, etc)
  3. Describe your data workload (as much as possible, schema, size, etc)
  4. Describe the issue you're observing in more details
  5. Provide logs for the failing jobs wherever possible
gudladona commented 2 years ago

We are facing a similar issue during this phase

image image

kafka fetch errors (at INFO level) are as follows:

2022-09-02 06:26:12,606 INFO [Executor task launch worker for task 9.1 in stage 370.0 (TID 160203)] org.apache.kafka.clients.FetchSessionHandler:[Consumer clientId=consumer-spark-executor-hudi-ingest-auth-1, groupId=spark-executor-hudi-ingest-auth] Error sending fetch request (sessionId=1968849354, epoch=629) to node 8: org.apache.kafka.common.errors.DisconnectException

and warnings on the executors

2022-09-02 06:12:47,340 WARN [netty-rpc-env-timeout] org.apache.spark.rpc.netty.NettyRpcEnv:Ignored failure: java.util.concurrent.TimeoutException: Cannot receive any reply from ip-10-100-232-180.us-east-2.compute.internal:37403 in 10000 milliseconds 2022-09-02 06:13:53,639 WARN [executor-heartbeater] org.apache.spark.executor.Executor:Issue communicating with driver in heartbeater org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [10000 milliseconds]. This timeout is controlled by spark.executor.heartbeatInterval

nsivabalan commented 1 year ago

@veenaypatil : I see you are using non partitioned key gen. So, index look up is going to relative to the number of file groups you have in total. do you know whats total file groups you have. can you enable clustering to batch smaller file groups into larger ones.

codope commented 1 year ago

@veenaypatil Did the suggestion above work for you?

nsivabalan commented 1 year ago

hey @veenaypatil : any updates on this regard. I see the issue has been open for quite sometime. let us know how we can help.

veenaypatil commented 1 year ago

@nsivabalan sorry for late response on this issue, I am not seeing this issue as of now, we were only seeing this issue when we killed the job and restarted it.

I see you are using non partitioned key gen. So, index look up is going to relative to the number of file groups you have in total. do you know whats total file groups you have. can you enable clustering to batch smaller file groups into larger ones.

How to get the number of file groups ? is there a command for it ? also what should be the limit on the number of file groups

devjain47 commented 7 months ago

@veenaypatil @alexeykudinkin @nsivabalan @yihua @bvaradar @gudladona

Even I am facing the same issue with [mapToPair at SparkWriteHelper.java:63] it is taking 18 min, did you guys get any solution for the same? image

ad1happy2go commented 7 months ago

@devjain47 What is your data size and number of file groups in your dataset. Can you please post your table and writer configurtion

devjain47 commented 7 months ago

@ad1happy2go As of now, we are just testing the data so it is just 1 file that we are processing and the data size is nearly 100 MB not more than that but later the size of the data will be in GB's

Below I am mentioning the hudi table config:

hudi_options = { 'hoodie.table.name': HUDI_QUEUE_TABLE, 'hoodie.datasource.write.storage.type': 'COPY_ON_WRITE', 'hoodie.datasource.write.recordkey.field': 'id', 'hoodie.datasource.write.table.name': HUDI_QUEUE_TABLE, 'hoodie.datasource.write.operation': 'upsert', 'hoodie.datasource.write.partitionpath.field': 'orgid,lenderid', 'hoodie.datasource.write.precombine.field': 'startdate', 'hoodie.datasource.write.hive_style_partitioning': 'true', 'hudi.metadata-listing-enabled':'false', 'path': HUDI_QUEUE_TABLE_LOCATION, 'hoodie.datasource.hive_sync.enable': 'true', 'hoodie.datasource.hive_sync.database': HUDI_DATABASE, 'hoodie.datasource.hive_sync.table': HUDI_QUEUE_TABLE, 'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor', 'hoodie.datasource.hive_sync.use_jdbc': 'false', 'hoodie.upsert.shuffle.parallelism': 1, 'hoodie.insert.shuffle.parallelism': 1, 'hoodie.datasource.hive_sync.mode': 'hms', 'hoodie.write.concurrency.mode':'OPTIMISTIC_CONCURRENCY_CONTROL', 'hoodie.cleaner.policy.failed.writes':'LAZY', 'hoodie.write.lock.provider':'org.apache.hudi.client.transaction.lock.InProcessLockProvider', 'hoodie.metadata.enable':'false', 'hoodie.index.type':'SIMPLE', 'hoodie.bloom.index.prune.by.ranges':'false', 'hoodie.metadata.enable':'false', 'hoodie.enable.data.skipping':'true', 'hoodie.metadata.index.column.stats.enable':'true', 'hoodie.bloom.index.use.metadata':'false' 'hoodie.clustering.inline':'false', 'hoodie.write.lock.hivemetastore.table': HUDI_QUEUE_TABLE, 'hoodie.write.lock.hivemetastore.database' : HUDI_DATABASE }

ad1happy2go commented 7 months ago

@devjain47 So, with 1 file only, is this taking 18 mins for tagging phase? How much data in your existing hudi table?

devjain47 commented 7 months ago

@ad1happy2go , almost 20 GB data is present

devjain47 commented 6 months ago

Hi @alexeykudinkin @nsivabalan @yihua @bvaradar @gudladona @ad1happy2go @veenaypatil @codope @veenaypatil @nsivabalan

did you get any solution for the above mentioned issue