apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.39k stars 2.42k forks source link

[SUPPORT] Hudi upsert takes more time than merging using spark sql #9329

Open chandu-1101 opened 1 year ago

chandu-1101 commented 1 year ago

Spark version: 3.3

Issue:

  1. I have 39GB parquet file on s3 which is ingested into Apache hudi. This is snappy compressed.
  2. I have 147GB json file-s on s3 representing CDC data. This is CDC from mongo db.
  3. each row in json file is ~ 5-6kb (kilo bytes)
  4. -- When I try to merge them using spark SQL I see it completing in 9000 vcore seconds (This runs every day in our production. The SQL runs on 8-10 node EMR cluster ( 10 X M5.2x -8cores 32GB ram; ~8-10 executors are spinned on avg ). The same job when run with apache hudi takes 30000 (approx 22-30k vcore seconds)

I remember reading hudi is around LSM trees. And the way I understand COW of hudi is

  1. Hudi gets a bunch of updates aimed at a parquet. Then suddenly copies the old parquet into a new one with the updates in it.
  2. Essentially this mechanism, in my view, should be much much faster than Spark SQL. HOwever its not the case.

One suggestion i got is to partition the data. But this requires full pipeline change. Any ways of getting this less than spark SQL vcore secs?

spark shell command

spark-shell --driver-memory 1g --executor-memory 3g --executor-cores 1 --driver-cores 1  --conf spark.executor.heartbeatInterval=600s --conf spark.network.timeout=5000s --conf yarn.resourcemanager.nodemanagers.heartbeat-interval-max-ms=60000 --conf yarn.resourcemanager.nodemanagers.heartbeat-interval-min-ms=60000 --conf yarn.resourcemanager.nodemanagers.heartbeat-interval-ms=60000 --conf yarn.app.mapreduce.am.scheduler.heartbeat.interval-ms=60000 --conf yarn.app.mapreduce.am.hard-kill-timeout-ms=600000 --conf yarn.nodemanager.health-checker.timeout-ms=72000000 --conf yarn.nodemanager.health-checker.interval-ms=36000000 --conf yarn.resourcemanager.application-timeouts.monitor.interval-ms=180000 --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" --conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog" --conf "spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar" --conf spark.sql.adaptive.enabled=true --conf spark.sql.adaptive.coalescePartitions.enabled=true --conf spark.sql.adaptive.coalescePartitions.minPartitionNum=1 --conf spark.sql.adaptive.advisoryPartitionSizeInBytes="128MB" --conf spark.sql.legacy.parquet.int96RebaseModeInRead=CORRECTED --conf spark.sql.legacy.parquet.int96RebaseModeInWrite=CORRECTED --conf spark.sql.legacy.parquet.datetimeRebaseModeInRead=CORRECTED --conf spark.sql.legacy.parquet.datetimeRebaseModeInWrite=CORRECTED --conf spark.yarn.maxAppAttempts=1 --conf spark.yarn.maxAppAttempts=1   --conf spark.yarn.submit.waitAppCompletion=false  --conf "spark.executor.extraJavaOptions=-Dlog4j.debug=false -XX:-PrintGC -XX:-PrintGCDetails   -Dconfig.appname=ravic  -Dconfig=s3://bucket/configs/ravic.json -Dconfig.properties=s3://bucket/configs/properties.json" --conf "spark.driver.extraJavaOptions=-Dlog4j.debug=false -XX:-PrintGC -XX:-PrintGCDetails   -Dconfig.appname=ravic -Dconfig=s3://bucket/configs/ravic.json -Dconfig.properties=s3://bucket/configs/properties.json" --files /home/hadoop/jars/log4j2.properties --conf "spark.yarn.appMasterEnv.configs=s3://bucket/configs/ravic.json" --conf "spark.hadoop.parquet.avro.write-old-list-structure=false" --conf "spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2" --conf spark.memory.fraction=0.8 --name ravic  --packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.13.1  --jars /home/hadoop/jars2/spark-1.0-SNAPSHOT.jar

hudi code to merge

    import org.apache.commons.lang3.ClassUtils.getCanonicalName
    import org.apache.hudi.bootstrap.SparkParquetBootstrapDataProvider
    import org.apache.hudi.{DataSourceWriteOptions, QuickstartUtils}
    import org.apache.hudi.common.model.{HoodieAvroPayload, HoodieFileFormat, WriteOperationType}
    import org.apache.hudi.common.table.HoodieTableConfig
    import org.apache.hudi.config.{HoodieBootstrapConfig, HoodieWriteConfig}
    import org.apache.hudi.keygen.constant.KeyGeneratorOptions

    import java.util
    import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
    import org.apache.hudi.keygen.{NonpartitionedKeyGenerator, SimpleKeyGenerator}
    import org.apache.spark.sql.SaveMode
    import org.apache.spark.sql.functions.{col, hash, lit}
    import org.apache.hudi.QuickstartUtils._

    val sess = Application.spark();
    /* get snapshot df; cdc df  */
      val snapshotDf = sess.read.parquet("s3://bucket/snapshots-test/ge11/_bid_9223370348396273945__mongoToParq/")
      val cdcSchema1 = SparkUtils.getSchema("s3://bucket/schemas/ge11.json")
      val cdcDf = sess.read.schema(snapshotDf.schema).json("s3://bucket/inputs-test/ge11/23-07-26/*")

    cdcDf.createOrReplaceTempView("cdc")
    val _cdcDf =sess.sql("select * from cdc where _id.oid is not null and _id.oid !=''  ")
    _cdcDf.createOrReplaceTempView("_cdc");
    _cdcDf.write.format("hudi")
      .options(getQuickstartWriteConfigs)
      .option(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(), "cdc_pk")
      .option(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "_id.oid")
      .option(DataSourceWriteOptions.OPERATION.key(), WriteOperationType.UPSERT.name() )
      .option(TBL_NAME.key(), "GE11")
      .mode(SaveMode.Append)
      .save("s3://buket/snapshots-hudi/ge11/snapshot");

SQL used to merge the files

with cdcTable as
(
    select cdc_pk,
      cdc_oid,
      @{getCols('s3://bucket/schemas/[dbname]-[collection].json' ) }
      from input where cdc_pk in (select max(cdc_pk) from input group by cdc_oid)
),
snapshotTable as
(
    select cdc_pk,
      cdc_oid,
      @{getCols('s3://bucket/schemas/[dbname]-[collection].json') }
      from snapshot where _id.oid not in (select cdc_oid from input where cdc_oid is not null)
),
resultTable as
(
    select * from snapshotTable
    union all
    select * from cdcTable
)
select * from resultTable 

getCols --> goes to the schema file and lists all columns one by one.

ad1happy2go commented 1 year ago

During the conversation with @chandu-1101, it was discussed that certain types of workloads might experience slow performance when using lake house technologies like Apache Hudi. The specific reason for this potential slowdown is related to the nature of upsert operations and the need to perform joins with a significant number of records after the index lookup phase.

chandu-1101 commented 1 year ago

Hi,

After painstakingly...

  1. Getting the partition data (by created date) -- in Json
  2. Getting the parquet snapshot partitioned by created date.
  3. Created hudi table on s3 from step 2, partitioned by created date --This fails.

I am not sure what could be my next step!

I tried with executor memory form 3G, 6G, 8G. while only 1 executor runs per node, executor cores=1, and the node has 4 cores 16GB ram. The task keeps failing after writing for 30-40 minutes.

Hudi version:

 --packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.13.1

Spark

3.3.0

Emr

6.9.0

spark shell code

  val snapshotDf = sess.read.parquet("s3://bucket/snapshots2/ge11-partitioned/")
    snapshotDf.write.format("hudi")
      .options(getQuickstartWriteConfigs)
      .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "cdc_pk")
      .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_id.oid")
      .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "__created_date_")
      .option(HoodieWriteConfig.TABLE_NAME,"GE11")
      .mode(SaveMode.Overwrite)
      .save("s3://partitioned/snapshots2/ge11-hudi/");

spark shell command

spark-shell --driver-memory 1g --executor-memory 3g --executor-cores 1 --driver-cores 1  --conf spark.executor.heartbeatInterval=600s --conf spark.network.timeout=50000s --conf yarn.resourcemanager.nodemanagers.heartbeat-interval-max-ms=600000 --conf yarn.resourcemanager.nodemanagers.heartbeat-interval-min-ms=600000 --conf yarn.resourcemanager.nodemanagers.heartbeat-interval-ms=60000 --conf yarn.app.mapreduce.am.scheduler.heartbeat.interval-ms=60000 --conf yarn.app.mapreduce.am.hard-kill-timeout-ms=6000000 --conf yarn.nodemanager.health-checker.timeout-ms=720000000 --conf yarn.nodemanager.health-checker.interval-ms=360000000 --conf yarn.resourcemanager.application-timeouts.monitor.interval-ms=180000 --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" --conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog" --conf "spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar" --conf spark.sql.adaptive.enabled=true --conf spark.sql.legacy.parquet.int96RebaseModeInRead=CORRECTED --conf spark.sql.legacy.parquet.int96RebaseModeInWrite=CORRECTED --conf spark.sql.legacy.parquet.datetimeRebaseModeInRead=CORRECTED --conf spark.sql.legacy.parquet.datetimeRebaseModeInWrite=CORRECTED --conf spark.yarn.maxAppAttempts=1 --conf spark.yarn.maxAppAttempts=1   --conf spark.yarn.submit.waitAppCompletion=false  --conf "spark.executor.extraJavaOptions=-Dlog4j.debug=false -XX:-PrintGC -XX:-PrintGCDetails   -Dconfig.appname=ravic  -Dconfig=s3://bucket/configs/ravic.json -Dconfig.properties=s3://bucket/configs/properties.json" --conf "spark.driver.extraJavaOptions=-Dlog4j.debug=false -XX:-PrintGC -XX:-PrintGCDetails   -Dconfig.appname=ravic -Dconfig=s3://bucket/configs/ravic.json -Dconfig.properties=s3://bucket/configs/properties.json" --files /home/hadoop/jars/log4j2.properties --conf "spark.yarn.appMasterEnv.configs=s3://bucket/configs/ravic.json" --conf "spark.hadoop.parquet.avro.write-old-list-structure=false" --conf "spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2" --name ravic  --packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.13.1  --jars /home/hadoop/jars2/spark-1.0-SNAPSHOT.jar

Ganglia snapshot

image

exception


23/08/14 13:36:05 WARN TaskSetManager: Lost task 1.3 in stage 3.0 (TID 970) (ip-172-25-26-247.prod.phenom.local executor 14): ExecutorLostFailure (executor 14 exited caused by one of the running tasks) Reason: Container from a bad node: container_1692006436772_0006_01_000017 on host: ip-172-25-26-247.prod.phenom.local. Exit status: 137. Diagnostics: [2023-08-14 13:36:05.150]Container killed on request. Exit code is 137
[2023-08-14 13:36:05.150]Container exited with a non-zero exit code 137.
[2023-08-14 13:36:05.150]Killed by external signal
.
23/08/14 13:36:05 ERROR TaskSetManager: Task 1 in stage 3.0 failed 4 times; aborting job
org.apache.hudi.exception.HoodieUpsertException: Failed to upsert for commit time 20230814132104020
  at org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:75)
  at org.apache.hudi.table.action.commit.SparkUpsertCommitActionExecutor.execute(SparkUpsertCommitActionExecutor.java:44)
  at org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:107)
  at org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:96)
  at org.apache.hudi.client.SparkRDDWriteClient.upsert(SparkRDDWriteClient.java:140)
  at org.apache.hudi.DataSourceUtils.doWriteOperation(DataSourceUtils.java:214)
  at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:372)
  at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:150)
  at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
  at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:103)
  at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
  at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)
  at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:114)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$7(SQLExecution.scala:139)
  at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
  at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:139)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:245)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:138)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
  at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:100)
  at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:96)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:615)
  at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:177)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:615)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:591)
  at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:96)
  at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:83)
  at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:81)
  at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:124)
  at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
  at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
  at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
  at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
  ... 49 elided
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 3.0 failed 4 times, most recent failure: Lost task 1.3 in stage 3.0 (TID 970) (ip-172-25-26-247.prod.phenom.local executor 14): ExecutorLostFailure (executor 14 exited caused by one of the running tasks) Reason: Container from a bad node: container_1692006436772_0006_01_000017 on host: ip-172-25-26-247.prod.phenom.local. Exit status: 137. Diagnostics: [2023-08-14 13:36:05.150]Container killed on request. Exit code is 137
[2023-08-14 13:36:05.150]Container exited with a non-zero exit code 137.
[2023-08-14 13:36:05.150]Killed by external signal
.
Driver stacktrace:
  at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2863)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2799)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2798)
  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
  at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2798)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1239)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1239)
  at scala.Option.foreach(Option.scala:407)
  at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1239)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3051)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2993)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2982)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1009)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2229)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2250)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2269)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2294)
  at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1021)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
  at org.apache.spark.rdd.RDD.collect(RDD.scala:1020)
  at org.apache.spark.rdd.PairRDDFunctions.$anonfun$countByKey$1(PairRDDFunctions.scala:367)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
  at org.apache.spark.rdd.PairRDDFunctions.countByKey(PairRDDFunctions.scala:367)
  at org.apache.spark.api.java.JavaPairRDD.countByKey(JavaPairRDD.scala:314)
  at org.apache.hudi.data.HoodieJavaPairRDD.countByKey(HoodieJavaPairRDD.java:105)
  at org.apache.hudi.index.bloom.HoodieBloomIndex.lookupIndex(HoodieBloomIndex.java:121)
  at org.apache.hudi.index.bloom.HoodieBloomIndex.tagLocation(HoodieBloomIndex.java:90)
  at org.apache.hudi.table.action.commit.HoodieWriteHelper.tag(HoodieWriteHelper.java:55)
  at org.apache.hudi.table.action.commit.HoodieWriteHelper.tag(HoodieWriteHelper.java:37)
  at org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:64)
  ... 91 more
chandu-1101 commented 1 year ago

any update?

ad1happy2go commented 1 year ago

@chandu-1101 Try with these configs - executor-cores=3, executor-memory-14g and also increase driver-memory.

Can you post spark UI screeenshot and one of the executor logs.

ad1happy2go commented 1 year ago

@chandu-1101 Were you able to try out this?