apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.36k stars 2.42k forks source link

[SUPPORT] async clean service java.lang.OutOfMemoryError: Java heap space #10077

Open zyclove opened 11 months ago

zyclove commented 11 months ago

Tips before filing an issue

Why is the task suddenly out of memory? How to solve it?

Describe the problem you faced

A clear and concise description of the problem.

To Reproduce

Steps to reproduce the behavior:

1.The task suddenly out of memory, add spark executor and driver memory is not work.

Expected behavior

A clear and concise description of what you expected to happen.

Environment Description

Additional context

Add any other context about the problem here.

Stacktrace

23/11/13 19:40:18 ERROR HoodieAsyncService: Service shutdown with error
java.util.concurrent.ExecutionException: java.lang.OutOfMemoryError: Java heap space
    at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
    at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
    at org.apache.hudi.async.HoodieAsyncService.waitForShutdown(HoodieAsyncService.java:103)
    at org.apache.hudi.async.AsyncCleanerService.waitForCompletion(AsyncCleanerService.java:75)
    at org.apache.hudi.client.BaseHoodieTableServiceClient.asyncClean(BaseHoodieTableServiceClient.java:132)
    at org.apache.hudi.client.BaseHoodieWriteClient.autoCleanOnCommit(BaseHoodieWriteClient.java:594)
    at org.apache.hudi.client.BaseHoodieWriteClient.mayBeCleanAndArchive(BaseHoodieWriteClient.java:578)
    at org.apache.hudi.client.BaseHoodieWriteClient.commitStats(BaseHoodieWriteClient.java:248)
    at org.apache.hudi.client.SparkRDDWriteClient.commit(SparkRDDWriteClient.java:104)
    at org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:1059)
    at org.apache.hudi.HoodieSparkSqlWriter$.writeInternal(HoodieSparkSqlWriter.scala:441)
    at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:132)
    at org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand$.run(InsertIntoHoodieTableCommand.scala:108)
    at org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand.run(InsertIntoHoodieTableCommand.scala:61)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:110)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:110)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:106)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457)
    at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:106)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:93)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:91)
    at org.apache.spark.sql.Dataset.<init>(Dataset.scala:219)
    at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:99)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:96)
    at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:618)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:613)
    at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:651)
    at org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:67)
    at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:384)
    at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1(SparkSQLCLIDriver.scala:504)
    at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1$adapted(SparkSQLCLIDriver.scala:498)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
    at scala.collection.IterableLike.foreach(IterableLike.scala:74)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
    at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processLine(SparkSQLCLIDriver.scala:498)
    at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:336)
    at org.apache.hadoop.hive.cli.CliDriver.processReader(CliDriver.java:474)
    at org.apache.hadoop.hive.cli.CliDriver.processFile(CliDriver.java:490)
    at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:213)
    at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
    at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:955)
    at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
    at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
    at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
    at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1043)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1052)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.OutOfMemoryError: Java heap space
23/11/13 19:40:18 INFO HoodieSparkSqlWriter$: Config.inlineCompactionEnabled ? true
23/11/13 19:40:18 INFO HoodieSparkSqlWriter$: Config.asyncClusteringEnabled ? false
23/11/13 19:40:18 INFO HoodieSparkSqlWriter$: Closing write client
23/11/13 19:40:18 INFO BaseHoodieClient: Stopping Timeline service !!
23/11/13 19:40:18 INFO EmbeddedTimelineService: Closing Timeline server
23/11/13 19:40:18 INFO TimelineService: Closing Timeline Service
23/11/13 19:40:18 INFO Javalin: Stopping Javalin ...
23/11/13 19:40:18 INFO Javalin: Javalin has stopped
23/11/13 19:40:18 INFO TimelineService: Closed Timeline Service
23/11/13 19:40:18 INFO EmbeddedTimelineService: Closed Timeline server
23/11/13 19:40:18 INFO TransactionManager: Transaction manager closed
23/11/13 19:40:18 INFO AsyncCleanerService: Shutting down async clean service...
23/11/13 19:40:18 INFO TransactionManager: Transaction manager closed
ad1happy2go commented 11 months ago

@zyclove Can you post the timeline when this error occurred and table/writer configs.

zyclove commented 11 months ago

After adjusting the driver memory from 16G to 32G, the run was completed, and the parameters were added --conf spark.network.timeout=6000s --conf spark.executor.heartbeatInterval=6000s;

why does the driver consume so much memory?

@ad1happy2go

ad1happy2go commented 11 months ago

@zyclove What are your instant sizes? Hudi tries to load instant in the driver, so if instant sizes are too large it may fail with OOO exception. You can also check if your table contains too many small files. Share us the writer configuration and I can also check. Thanks.

zyclove commented 11 months ago

@ad1happy2go

Thank you very much for your reply despite your busy schedule. After looking at the output results, there are indeed a lot of small files. How should we solve this situation now? I couldn't run anymore. The table creation statements and execution scripts are as follows.

spark-sql> show create table bi_ods_real.smart_datapoint_report_rw_clear_rt;
CREATE TABLE `bi_ods_real`.`smart_datapoint_report_rw_clear_rt` (
  `_hoodie_commit_time` STRING,
  `_hoodie_commit_seqno` STRING,
  `_hoodie_record_key` STRING,
  `_hoodie_partition_path` STRING,
  `_hoodie_file_name` STRING,
  `id` STRING COMMENT 'id',
  `uuid` STRING COMMENT 'log uuid',
  `data_id` STRING,
  `dev_id` STRING COMMENT 'id',
  `gw_id` STRING,
  `product_id` STRING,
  `uid` STRING COMMENT '用户ID',
  `dp_code` STRING,
  `dp_id` STRING COMMENT 'dp点',
  `gmtModified` STRING,
  `dp_name` STRING,
  `dp_time` STRING,
  `dp_type` STRING,
  `dp_value` STRING,
  `gmt_modified` BIGINT COMMENT 'ct 时间',
  `dt` STRING COMMENT '时间分区字段',
  `dp_mode` STRING)
USING hudi
PARTITIONED BY (dt, dp_mode)
COMMENT ''
TBLPROPERTIES (
  'hoodie.bucket.index.num.buckets' = '50',
  'primaryKey' = 'id',
  'last_commit_time_sync' = '20231021185003298',
  'hoodie.common.spillable.diskmap.type' = 'ROCKS_DB',
  'hoodie.combine.before.upsert' = 'false',
  'hoodie.compact.inline' = 'false',
  'type' = 'mor',
  'preCombineField' = 'gmt_modified',
  'hoodie.datasource.write.partitionpath.field' = 'dt,dp_mode')
insert into bi_ods_real.smart_datapoint_report_rw_clear_rt 
select
      md5(concat(coalesce(data_id,''),coalesce(dev_id,''),coalesce(gw_id,''),coalesce(product_id,''),coalesce(uid,''),coalesce(dp_code,''),coalesce(dp_id,''),coalesce(gmtModified,''),if(dp_mode in ('ro','rw','wr'),dp_mode,'un'),coalesce(dp_name,''),coalesce(dp_time,''),coalesce(dp_type,''),coalesce(dp_value,''),coalesce(ct,''))) as id, 
      _hoodie_record_key as uuid,
      data_id,dev_id,gw_id,product_id,uid,
      dp_code,dp_id,gmtModified,if(dp_mode in ('ro','rw','wr'),dp_mode,'un') as dp_mode ,dp_name,dp_time,dp_type,dp_value,
      ct as gmt_modified,
      case 
          when length(ct)=10 then date_format(from_unixtime(ct),'yyyyMMddHH')  
          when length(ct)=13 then date_format(from_unixtime(ct/1000),'yyyyMMddHH') 
          else '1970010100' end as dt
from 
    hudi_table_changes('bi_ods_real.ods_log_smart_datapoint_report_batch_rt', 'latest_state', '20231114033500000', '20231114040500000')  
    lateral  view dataPointExplode(split(value,'\001')[0]) dps as ct, data_id, dev_id, gw_id, product_id, uid, dp_code, dp_id, gmtModified, dp_mode, dp_name, dp_time, dp_type, dp_value
where _hoodie_commit_time >20231114033500000 and _hoodie_commit_time<=20231114040500000

image

The driver memory image

Please help me find out how to solve it, thank you very much.

zyclove commented 11 months ago

@ad1happy2go @codope Hi, could you please help me look into this problem and give you a solution? Thanks.

ad1happy2go commented 11 months ago

@zyclove You can run clustering to merge this small files.

Also can you let us know the timeline instant sizes under ./hoodie/

zyclove commented 11 months ago

@ad1happy2go The timeline instant size is not big, but 910 files. Other files are kb level. In theory, there are no such good resources for such a small file.

so how to run clustering to merge this small files. Can you provide a sample command? image

ad1happy2go commented 11 months ago

@zyclove Refer - https://hudi.apache.org/docs/next/clustering/

nsivabalan commented 1 month ago

hey, are we good to close out the ticket?