apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.35k stars 2.42k forks source link

[SUPPORT] Spark insert overwrite in partition table causes executors OOM. #8332

Closed HuangFru closed 1 year ago

HuangFru commented 1 year ago

Describe the problem you faced

I'm doing a simple write performance test for Hudi in Spark on Yarn, but my executors will be dead for OOM. And the 'insert overwrite' SQL could be very slow. I've created a table like this:

create table lineitem_kp_mor (
   l_orderkey bigint,
   l_partkey bigint,
   l_suppkey bigint,
   l_linenumber int,
   l_quantity double,
   l_extendedprice double,
   l_discount double,
   l_tax double,
   l_returnflag string,
   l_linestatus string,
   l_shipdate date,
   l_receiptdate date,
   l_shipinstruct string,
   l_shipmode string,
   l_comment string,
   l_commitdate date
) USING hudi tblproperties (
    type = 'mor',
    primaryKey = 'l_orderkey,l_linenumber',
    hoodie.memory.merge.max.size = '2004857600000',
    hoodie.write.markers.type = 'DIRECT',
    hoodie.sql.insert.mode = 'non-strict'
) PARTITIONED BY (l_commitdate);

This is one of the TPC-H Benchmark's table lineitem, and I add a partition key 'l_commitdate' for it. Then the table will have 2466 partitions. Then I'm trying to write into the table throw 'insert overwrite as select from' in Spark SQL. The data size is 'sf100', with about 600 million rows of data, and 15GB in storage.

Here's my command submit to yarn:

bin/beeline -u 'jdbc:hive2://localhost:10010/;#spark.master=yarn;spark.yarn.queue=default;spark.executor.instances=11;spark.executor.cores=5;spark.executor.memory=8g;spark.driver.memory=4g'

Resources: 11 executors, each 8G memory, each 5 cores, driver memory 4G.

The insert overwrite SQL:

insert overwrite table lineitem_kp_mor select l_orderkey,l_partkey,l_suppkey,l_linenumber,l_quantity,l_extendedprice,l_discount,l_tax,l_returnflag,l_linestatus,l_shipdate,l_receiptdate,l_shipinstruct,l_shipmode,l_comment,l_commitdate from tpch.sf100.lineitem;

The SQL run very slow and will failed due to executor dead. Executors: image Stages: image Dead Executors' log: image

To Reproduce

Steps to reproduce the behavior:

  1. Prepare a TPC-H data set in Spark. 2.Run Spark sql on Yarn and do create table and insert overwrite like my SQL.

Expected behavior

I don't know if I've missed some important config of something. I'm new to Hudi. Could you please give me some help?

Environment Description

Additional context

Add any other context about the problem here.

Stacktrace


    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2303)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2252)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2251)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2251)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1124)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1124)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1124)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2490)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2432)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2421)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:902)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2196)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2217)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2236)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2261)
    at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:1029)
    at org.apache.spark.rdd.PairRDDFunctions.$anonfun$countByKey$1(PairRDDFunctions.scala:366)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
    at org.apache.spark.rdd.PairRDDFunctions.countByKey(PairRDDFunctions.scala:366)
    at org.apache.spark.api.java.JavaPairRDD.countByKey(JavaPairRDD.scala:314)
    at org.apache.hudi.data.HoodieJavaPairRDD.countByKey(HoodieJavaPairRDD.java:104)
    at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.buildProfile(BaseSparkCommitActionExecutor.java:187)
    at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:156)
    at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:85)
    at org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:57)
    ... 36 more

    at org.apache.kyuubi.KyuubiSQLException$.apply(KyuubiSQLException.scala:69)
    at org.apache.kyuubi.engine.spark.operation.SparkOperation$$anonfun$onError$1.applyOrElse(SparkOperation.scala:112)
    at org.apache.kyuubi.engine.spark.operation.SparkOperation$$anonfun$onError$1.applyOrElse(SparkOperation.scala:96)
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
    at org.apache.kyuubi.engine.spark.operation.ExecuteStatement.$anonfun$executeStatement$1(ExecuteStatement.scala:113)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.kyuubi.engine.spark.operation.SparkOperation.withLocalProperties(SparkOperation.scala:87)
    at org.apache.kyuubi.engine.spark.operation.ExecuteStatement.org$apache$kyuubi$engine$spark$operation$ExecuteStatement$$executeStatement(ExecuteStatement.scala:89)
    at org.apache.kyuubi.engine.spark.operation.ExecuteStatement$$anon$1.run(ExecuteStatement.scala:125)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.hudi.exception.HoodieUpsertException: Failed to upsert for commit time 20230331101517327
    at org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:64)
    at org.apache.hudi.table.action.commit.SparkInsertOverwriteCommitActionExecutor.execute(SparkInsertOverwriteCommitActionExecutor.java:63)
    at org.apache.hudi.table.HoodieSparkCopyOnWriteTable.insertOverwrite(HoodieSparkCopyOnWriteTable.java:159)
    at org.apache.hudi.table.HoodieSparkCopyOnWriteTable.insertOverwrite(HoodieSparkCopyOnWriteTable.java:97)
    at org.apache.hudi.client.SparkRDDWriteClient.insertOverwrite(SparkRDDWriteClient.java:207)
    at org.apache.hudi.DataSourceUtils.doWriteOperation(DataSourceUtils.java:215)
    at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:306)
    at org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand$.run(InsertIntoHoodieTableCommand.scala:94)
    at org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand.run(InsertIntoHoodieTableCommand.scala:47)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
    at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:228)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3700)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3698)
    at org.apache.spark.sql.Dataset.<init>(Dataset.scala:228)
    at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:99)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:96)
    at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:618)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:613)
    at org.apache.kyuubi.engine.spark.operation.ExecuteStatement.$anonfun$executeStatement$1(ExecuteStatement.scala:94)
    ... 9 more```
ad1happy2go commented 1 year ago

@HuangFru Did you tried by increasing executor memory? executor--cores you can still keep 5.

danny0405 commented 1 year ago

Yes, it seems we still use the UPSERT code path for the INSERT OVERWRITE TABLE operation, should be optimized to INSERT if possible.

HuangFru commented 1 year ago

@HuangFru Did you tried by increasing executor memory? executor--cores you can still keep 5.

I'm doing a performance test so I must control variables, I've tried to decrease the number of executors and increase the executor memory, but still OOM.

HuangFru commented 1 year ago

Yes, it seems we still use the UPSERT code path for the INSERT OVERWRITE TABLE operation, should be optimized to INSERT if possible.

Any suggestion for me to carry on this? Should I use 'insert into' to write the data? But usually insert into has worse performant than insert overwrite.

HuangFru commented 1 year ago

@HuangFru Did you tried by increasing executor memory? executor--cores you can still keep 5.

Even if I increase executor memory to 16G, and keep all other resources the same. The same phenomenon will also occur.

xmubeta commented 1 year ago

I could be wrong, but it is worth to try to run the insert statement on a non-hudi table. I suspect you might get the same error because of too much data.

HuangFru commented 1 year ago

I could be wrong, but it is worth to try to run the insert statement on a non-hudi table. I suspect you might get the same error because of too much data.

I've tried it in the Iceberg, works fine. Iceberg takes about 230s, the same resources to do the same 'insert overwrite'. And I also tried it in Hive, which took more time but works fine.

KnightChess commented 1 year ago

insert overwrite will still deal with small file, you can use bulk insert avoid it to speed up, but it only support to insert into which is not idempotent operation, you need truncate partitions which you want to insert. Fortunately, the community has related patch support bulk overwrite, #8076 , you can try it.

HuangFru commented 1 year ago

insert overwrite will still deal with small file, you can use bulk insert avoid it to speed up, but it only support to insert into which is not idempotent operation, you need truncate partitions which you want to insert. Fortunately, the community has related patch support bulk overwrite, #8076 , you can try it.

If the table is empty, will the 'insert into' using 'bulk insert' have better performance than 'insert overwrite'?

KnightChess commented 1 year ago

@HuangFru yes, much better

HuangFru commented 1 year ago

@HuangFru yes, much better

Thanks for your answer. I'll try.

HuangFru commented 1 year ago

insert overwrite will still deal with small file, you can use bulk insert avoid it to speed up, but it only support to insert into which is not idempotent operation, you need truncate partitions which you want to insert. Fortunately, the community has related patch support bulk overwrite, #8076 , you can try it.

'bulk insert' works fine. According to my understanding, the MOR table will have better performance than the COW table in writing, but I found that the COW table took 450s to finish this and the same MOR table took 500s. Why? In addition, the 450s are much worse than the Iceberg's 230s. Are there some configurations I didn't set?

KnightChess commented 1 year ago

@HuangFru 1.if the primaryKey not exist in the table, it will add parquet first in generally, you can confirm it. 2.Can you confirm your parallelism? or Can you provide spark ui in stage tab, iceberg and hudi

HuangFru commented 1 year ago

@KnightChess

  1. I'm not sure what 'primaryKey not exist in the table' means, the data is intact. And what does 'add parquet first' mean?
  2. Hudi: SQL(just change the table name):

    
    create table lineitem_kp_mor (
    l_orderkey bigint,
    l_partkey bigint,
    l_suppkey bigint,
    l_linenumber int,
    l_quantity double,
    l_extendedprice double,
    l_discount double,
    l_tax double,
    l_returnflag string,
    l_linestatus string,
    l_shipdate date,
    l_receiptdate date,
    l_shipinstruct string,
    l_shipmode string,
    l_comment string,
    l_commitdate date
    ) USING hudi tblproperties (
    type = 'mor',
    primaryKey = 'l_orderkey,l_linenumber',
    hoodie.memory.merge.max.size = '2004857600000',
    hoodie.write.markers.type = 'DIRECT',
    hoodie.sql.insert.mode = 'non-strict'
    ) PARTITIONED BY (l_commitdate);

set hoodie.sql.bulk.insert.enable=true; set hoodie.sql.insert.mode=non-strict; set hoodie.bulkinsert.shuffle.parallelism=200;

insert into lineitem_kp_mor select l_orderkey,l_partkey,l_suppkey,l_linenumber,l_quantity,l_extendedprice,l_discount,l_tax,l_returnflag,l_linestatus,l_shipdate,l_receiptdate,l_shipinstruct,l_shipmode,l_comment,l_commitdate from spark_catalog.test_origin.lineitem_sf100 order by l_commitdate;


![image](https://user-images.githubusercontent.com/68625618/230532528-6397f571-9fec-4d93-a333-3dbe096298bb.png)
![image](https://user-images.githubusercontent.com/68625618/230532844-070dc110-71c6-452d-82fc-27922362e149.png)
![image](https://user-images.githubusercontent.com/68625618/230532879-aa2d7f11-727b-4ef4-a674-85dd64eb631a.png)
![image](https://user-images.githubusercontent.com/68625618/230533072-e761e244-3e4e-477a-9777-6c277dd81040.png)

Iceberg:
![image](https://user-images.githubusercontent.com/68625618/230533780-35eea907-09d7-444d-9670-a97cf3fbf624.png)
![image](https://user-images.githubusercontent.com/68625618/230533801-c87ec10f-69f5-4062-a09f-61b23c55e20d.png)
![image](https://user-images.githubusercontent.com/68625618/230533837-68fd69ca-c1f4-41e4-a865-daa22f6586c2.png)
![image](https://user-images.githubusercontent.com/68625618/230533936-737d7069-1db8-483f-80b9-54724ac83b8d.png)
KnightChess commented 1 year ago
  1. you can try the following steps and observe the data file every time. (Assume that the primary key is ID) first: insert one record: id = 1 second: insert one record: id = 2 third: insert one record: id = 1 fourth: insert two records: id = 1 and 3
KnightChess commented 1 year ago
  1. you can set hoodie.populate.meta.fields false have a try, not populate meta to base file
KnightChess commented 1 year ago

@HuangFru Does the above configuration solve your problem? I found a article desc the benchmark load, you can refer to it: https://www.onehouse.ai/blog/apache-hudi-vs-delta-lake-transparent-tpc-ds-lakehouse-performance-benchmarks

HuangFru commented 1 year ago

@KnightChess Sorry, I'm a little busy recently, I didn't push this thing forward. I'll try it when I'm free.

ad1happy2go commented 1 year ago

@HuangFru Did you got the chance to try above.

HuangFru commented 1 year ago

@HuangFru Did you got the chance to try above.

Not yet, It may be a long time until my next performance test(or I'll forget about it). You can close this issue temporarily. When I do a test next time, I will reopen this issue and give feedback.

nandubatchu commented 1 year ago

Hey @ad1happy2go - I too observed similar OOM behaviour on using insert_overwrite_table on reasonably large table (200gb) - works fine with very small tables

ad1happy2go commented 1 year ago

@nandubatchu What executor-memory you were using?