apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.43k stars 2.42k forks source link

Time taken for upserting hudi table is increasing with increase in number of partitions #1552

Closed harshi2506 closed 3 years ago

harshi2506 commented 4 years ago

Hi, I am using DataSourceWriter for HUDI compaction. I populated 30Gb table with around 1 billion rows. It created around 6000 partitions each file of size ranging 2mb - 100mb. I tried upserting 1Gb data and it touched around 256 paritions and rewritten 1/20th records. But the time taken is almost 1 and half hour. If I take out paritions and load it into default its taking 14 minutes but it is touching almost 1/5th of the data. Attached the screenshot of the jobs and also the hudi commits I noticed there was a big time difference between job 7 and job 8, The last job is submitted after 30 minutes. Can I know why is there a big time difference and am I missing something.

Screenshot 2020-04-22 at 2 24 08 PM Screenshot 2020-04-22 at 2 23 47 PM
lamberken commented 4 years ago

Hi @harshi2506, please share the Spark stage UI, thanks

harshi2506 commented 4 years ago

HI @lamber-ken , thanks for the response, attaching the screenshots

Screenshot 2020-04-22 at 10 31 32 PM Screenshot 2020-04-22 at 10 31 43 PM
lamberken commented 4 years ago

From the detailed commit metadata and above spark ui, we know

  1. write about 700 million records at first commit.
  2. upsert 2633 records and touched 255 partitions.

Key point: small input upsert to large old data file slide, and touched many partitions.

This is a different case, so the hoodie.memory.merge.max.size option will not work, So I think it takes a lot of time to read the old file, WDYT? @vinothchandar

image

lamberken commented 4 years ago

hi @harshi2506, based on the above analysis, please try to increate the upsert parallelism(hoodie.upsert.shuffle.parallelism) and spark executor instances, for example

export SPARK_HOME=/work/BigData/install/spark/spark-2.4.4-bin-hadoop2.7
${SPARK_HOME}/bin/spark-shell \
  --master yarn \
  --driver-memory 6G \
  --num-executors 10 \
  --executor-cores 5 \
  --packages org.apache.hudi:hudi-spark-bundle:0.5.1-incubating,org.apache.spark:spark-avro_2.11:2.4.4 \
  --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'

import org.apache.spark.sql.functions._

val tableName = "hudi_mor_table"
val basePath = "file:///tmp/hudi_mor_tablen"

val hudiOptions = Map[String,String](
  "hoodie.upsert.shuffle.parallelism" -> "200",
  "hoodie.datasource.write.recordkey.field" -> "id",
  "hoodie.datasource.write.partitionpath.field" -> "key", 
  "hoodie.table.name" -> tableName,
  "hoodie.datasource.write.precombine.field" -> "timestamp",
  "hoodie.memory.merge.max.size" -> "2004857600000"
)

val inputDF = spark.range(1, 300).
   withColumn("key", $"id").
   withColumn("data", lit("data")).
   withColumn("timestamp", current_timestamp()).
   withColumn("dt", date_format($"timestamp", "yyyy-MM-dd"))

inputDF.write.format("org.apache.hudi").
  options(hudiOptions).
  mode("Append").
  save(basePath)

spark.read.format("org.apache.hudi").load(basePath + "/*/*").show();
vinothchandar commented 4 years ago

@harshi2506 the pause is weird.. As you can see the wall clock time itself is small.. i.e if you subtract the time lost pausing..

Is this reproducible? i.e happens every time? What version of hudi are you running?

harshi2506 commented 4 years ago

@vinothchandar yes, it is happening every time, I tried it 3 times and its always the same. I see a hoodie commit being added in atmost 5-6 minutes and then it takes almost an hour to complete the EMR step. I am using AWS EMR, HUDI Version: 0.5.0-incubating.

harshi2506 commented 4 years ago

hi @lamber-ken, I am already setting parallelism to 200 hudiOptions = (HoodieWriteConfig.TABLE_NAME -> "table_name", DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY -> "COPY_ON_WRITE", DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "primaryKey", DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "dedupeKey", DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "partition_key"(date in format(yyyy/mm/dd)), "hoodie.upsert.shuffle.parallelism" ->"200", DataSourceWriteOptions.OPERATION_OPT_KEY -> DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) inputDF.write.format("org.apache.hudi").options(hudiOptions).mode(SaveMode.Append).save(load_path) I will try setting the spark executor instances. Thanks

vinothchandar commented 4 years ago

@harshi2506 I am suspecting this may be due to a recent bug we fixed on master (still not 100%). Are you open to building hudi off master branch and giving that a shot? I am suspecting #1394

lamberken commented 4 years ago

User report: upsert hoodie log

Started at 20/04/22 20:12:14 

20/04/22 20:15:30 INFO HoodieTableMetaClient: Finished Loading Table of type COPY_ON_WRITE from <storgaeLocation>
20/04/22 20:15:30 INFO HoodieTableMetaClient: Loading Active commit timeline for <storageLocation>
20/04/22 20:15:30 INFO HoodieActiveTimeline: Loaded instants java.util.stream.ReferencePipeline$Head@4af81941
20/04/22 20:15:30 INFO HoodieCommitArchiveLog: No Instants to archive
20/04/22 20:15:30 INFO HoodieWriteClient: Auto cleaning is enabled. Running cleaner now
20/04/22 20:15:30 INFO HoodieWriteClient: Cleaner started
20/04/22 20:15:30 INFO HoodieTableMetaClient: Loading HoodieTableMetaClient from <storageLocation>
20/04/22 20:15:30 INFO FSUtils: Hadoop Configuration: fs.defaultFS:hdfs, Config:[Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml, emrfs-site.xml, __spark_hadoop_conf__.xml, file:/etc/spark/conf.dist/hive-site.xml], FileSystem: [com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem@3c0cfaad]
20/04/22 20:15:31 INFO HoodieTableConfig: Loading dataset properties from <storageLocation>.hoodie/hoodie.properties
20/04/22 20:15:31 INFO S3NativeFileSystem: Opening <storageLocation>.hoodie/hoodie.properties' for reading
20/04/22 20:15:31 WARN S3CryptoModuleAE: Unable to detect encryption information for object '<storageLocation>.hoodie/hoodie.properties' in bucket 'delta-data-devo'. Returning object without decryption.
20/04/22 20:15:31 INFO HoodieTableMetaClient: Finished Loading Table of type COPY_ON_WRITE from <storageLocation>
20/04/22 20:15:31 INFO HoodieTableMetaClient: Loading Active commit timeline for <storageLocation>
20/04/22 20:15:31 INFO HoodieActiveTimeline: Loaded instants java.util.stream.ReferencePipeline$Head@72dd302
20/04/22 20:15:31 INFO FileSystemViewManager: Creating View Manager with storage type :MEMORY
20/04/22 20:15:31 INFO FileSystemViewManager: Creating in-memory based Table View
20/04/22 20:15:31 INFO HoodieTableMetaClient: Loading HoodieTableMetaClient from <storageLocation>
20/04/22 20:15:31 INFO FSUtils: Hadoop Configuration: fs.defaultFS: hdfs, Config:[Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml, emrfs-site.xml, __spark_hadoop_conf__.xml, file:/etc/spark/conf.dist/hive-site.xml], FileSystem: [com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem@3c0cfaad]
20/04/22 20:15:31 INFO HoodieTableConfig: Loading dataset properties from <storageLocation>.hoodie/hoodie.properties
20/04/22 20:15:31 INFO S3NativeFileSystem: Opening '<storageLocation>.hoodie/hoodie.properties' for reading
20/04/22 20:15:31 WARN S3CryptoModuleAE: Unable to detect encryption information for object '<storageLocation>.hoodie/hoodie.properties' in bucket 'delta-data-devo'. Returning object without decryption.
20/04/22 20:15:31 INFO HoodieTableMetaClient: Finished Loading Table of type COPY_ON_WRITE from <storageLocation>
20/04/22 20:15:31 INFO HoodieTableMetaClient: Loading Active commit timeline for <storageLocation>
20/04/22 20:15:31 INFO HoodieActiveTimeline: Loaded instants java.util.stream.ReferencePipeline$Head@678852b5
20/04/22 20:24:33 INFO HoodieCopyOnWriteTable: Partitions to clean up : [2001/05/02, 2001/05/07, 2001/05/09, 2001/05/10, 2001/05/17, 2001/05/18, 2001/05/21, 2001/06/01, 2001/06/04, 2001/06/08, 2001/06/20, 2001/06/21, 2001/07/17, 2001/07/23, 2001/07/25, 2001/07/30, 2001/08/02, 2001/08/03, 2001/08/07, 2001/08/08, 2001/08/09, 2001/08/14, 2001/08/23, 2001/09/05, 2001/09/06, 2001/09/07, 2001/09/13, 2001/09/14, 2001/10/02, 2001/10/03, 2001/10/04, 2001/10/09, 2001/11/01, 2001/11/09, 2001/11/14, 2001/11/15, 2001/11/16, 2001/11/19, 2001/11/20, 2001/11/21, 2001/11/27, 2001/11/28, 2001/11/29, 2001/11/30, 2001/12/03, 2001/12/07, 2001/12/10, 2001/12/11, 2001/12/12, 2001/12/13, 2001/12/17, 2001/12/20, 2001/12/21, 2001/12/25, 2001/12/26, 2001/12/27, 2001/12/28, 2001/12/29, 2001/12/31, 2002/01/02, 2002/01/03, 2002/01/07, 2002/01/08, 2002/01/09, 2002/01/11, 2002/01/13, 2002/01/14, 2002/01/15, 2002/01/16, 2002/01/17, 2002/01/18, 2002/01/21, 2002/01/22, 2002/01/23, 2002/01/25, 2002/01/28, 2002/01/29, 2002/01/30, 2002/02/03, 2002/02/05, 2002/02/06, 2002/02/07, 2002/02/11, 2002/02/12, 2002/02/14, 2002/02/15, 2002/02/18, 2002/02/19, 2002/02/20, 2002/02/21, 2002/02/22, 2002/02/26, 2002/03/02, 2002/03/04, 2002/03/06, 2002/03/10, 2002/03/15, 2002/03/17, 2002/03/19, 2002/03/20, 2002/03/21, 2002/03/22, 2002/03/25, 2002/03/26, 2002/03/27, 2002/03/28, 2002/03/30, 2002/04/02, 2002/04/03, 2002/04/04, 2002/04/05, 2002/04/07, 2002/04/09, 2002/04/10, 2002/04/11, 2002/04/14, 2002/04/16, 2002/04/17, 2002/04/22, 2002/04/23, 2002/04/25, 2002/04/30, 2002/05/01, 2002/05/02, 2002/05/06, 2002/05/08, 2002/05/09, 2002/05/12, 2002/05/13, 2002/05/14, 2002/05/17, 2002/05/19, 2002/05/20, 2002/05/21, 2002/05/22, 2002/05/24, 2002/05/27, 2002/05/28, 2002/05/29, 2002/06/03, 2002/06/04, 2002/06/06, 2002/06/10, 2002/06/11, 2002/06/12, 2002/06/14, 2002/06/18, 2002/06/19, 2002/06/24, 2002/07/01, 2002/07/02, 2002/07/04, 2002/07/17, 2002/07/21, 2002/07/31, 2002/08/01, 2002/08/02, 2002/08/07, 2002/08/21, 2002/08/26, 2002/08/27, 2002/08/28, 2002/08/29, 2002/09/03, 2002/09/05, 2002/09/06, 2002/09/09, 2002/09/10, 2002/09/11, 2002/09/12, 2002/09/13, 2002/09/16, 2002/09/17, 2002/09/18, 2002/09/19, 2002/09/23, 2002/09/24, 2002/09/25, 2002/09/28, 2002/09/30, 2002/10/07, 2002/10/08, 2002/10/09, 2002/10/10, 2002/10/11, 2002/10/14, 2002/10/15, 2002/10/16, 2002/10/17, 2002/10/21, 2002/10/23, 2002/10/25, 2002/10/28, 2002/10/29, 2002/10/30, 2002/10/31, 2002/11/07, 2002/11/11, 2002/11/12, 2002/11/13, 2002/11/14, 2002/11/15, 2002/11/18, 2002/11/19, 2002/11/20, 2002/11/21, 2002/11/22, 2002/11/25, 2002/11/26, 2002/11/27, 2002/12/02, 2002/12/04, 2002/12/05, 2002/12/06, 2002/12/09, 2002/12/10, 2002/12/11, 2002/12/12, 2002/12/13, 2002/12/16, 2002/12/17, 2002/12/18, 2002/12/19, 2002/12/20, 2002/12/23, 2002/12/24, 2002/12/25, 2002/12/30, 2002/12/31, 2003/01/03, 2003/01/04, 2003/01/08, 2003/01/09, 2003/01/10, 2003/01/13, 2003/01/14, 2003/01/15, 2003/01/16, 2003/01/17, 2003/01/20, 2003/01/21, 2003/01/22, 2003/01/23, 2003/01/24, 2003/01/27, 2003/01/28, 2003/01/29, 2003/01/30, 2003/01/31, 2003/02/03, 2003/02/06, 2003/02/07, 2003/02/10, 2003/02/11, 2003/02/12, 2003/02/13, 2003/02/14, 2003/02/17, 2003/02/18, 2003/02/19, 2003/02/24, 2003/02/25, 2003/02/26, 2003/02/27, 2003/02/28, 2003/03/04, 2003/03/05, 2003/03/06, 2003/03/07, 2003/03/10, 2003/03/11, 2003/03/12, 2003/03/13, 2003/03/14, 2003/03/17, 2003/03/18, 2003/03/19, 2003/03/24, 2003/03/25, 2003/03/26, 2003/03/27, 2003/03/31, 2003/04/01, 2003/04/02, 2003/04/05, 2003/04/07, 2003/04/08, 2003/04/09, 2003/04/10, 2003/04/11, 2003/04/14, 2003/04/15, 2003/04/16, 2003/04/17, 2003/04/18, 2003/04/21, 2003/04/22, 2003/04/23, 2003/04/24, 2003/04/25, 2003/04/28, 2003/04/29, 2003/04/30, 2003/05/02, 2003/05/05, 2003/05/06, 2003/05/07, 2003/05/12, 2003/05/15, 2003/05/19, 2003/05/20, 2003/05/21, 2003/05/27, 2003/05/28, 2003/05/29, 2003/05/30, 2003/06/02, 2003/06/03, 2003/06/04, 2003/06/05, 2003/06/10, 2003/06/11, 2003/06/18, 2003/06/19, 2003/06/23, 2003/06/24, 2003/06/25, 2003/06/27, 2003/07/01, 2003/07/02, 2003/07/05, 2003/07/07, 2003/07/08, 2003/07/09, 2003/07/10, 2003/07/11, 2003/07/14, 2003/07/15, 2003/07/16, 2003/07/17, 2003/07/18, 2003/07/21, 2003/07/22, 2003/07/23, 2003/07/24, 2003/07/28, 2003/07/29, 2003/07/30, 2003/08/01, 2003/08/05, 2003/08/06, 2003/08/07, 2003/08/11, 2003/08/12, 2003/08/13, 2003/08/15, 2003/08/18, 2003/08/19, 2003/08/20, 2003/08/21, 2003/08/22, 2003/08/26, 2003/08/27, 2003/08/28, 2003/08/29, 2003/09/02, 2003/09/03, 2003/09/04, 2003/09/08, 2003/09/09, 2003/09/10, 2003/09/12, 2003/09/15, 2003/09/16, 2003/09/17, 2003/09/19, 2003/09/22, 2003/09/23, 2003/09/24, 2003/09/25, 2003/09/26, 2003/09/29, 2003/09/30, 2003/10/01, 2003/10/02, 2003/10/08, 2003/10/13, 2003/10/14, 2003/10/15, 2003/10/16, 2003/10/17, 2003/10/19, 2003/10/20, 2003/10/21, 2003/10/22, 2003/10/23, 2003/10/29, 2003/10/30, 2003/10/31, 2003/11/03, 2003/11/04, 2003/11/05, 2003/11/06, 2003/11/07, 2003/11/10, 2003/11/11, 2003/11/12, 2003/11/17, 2003/11/18, 2003/11/19, 2003/11/20, 2003/11/24, 2003/11/25, 2003/11/26, 2003/12/01, 2003/12/02, 2003/12/03, 2003/12/04, 2003/12/05, 2003/12/08, 2003/12/09, 2003/12/10, 2003/12/11, 2003/12/14, 2003/12/15, 2003/12/17, 2003/12/18, 2003/12/19, 2003/12/20, 2003/12/21, 2003/12/22, 2003/12/23, 2003/12/24, 2003/12/25, 2003/12/26, 2003/12/28, 2003/12/30, 2003/12/31, 2004/01/02, 2004/01/03, 2004/01/07, 2004/01/09, 2004/01/12, 2004/01/14, 2004/01/16, 2004/01/19, 2004/01/20, 2004/01/21, 2004/01/23, 2004/01/26, 2004/01/27, 2004/01/28, 2004/01/29, 2004/02/01, 2004/02/02, 2004/02/03, 2004/02/04, 2004/02/06, 2004/02/09, 2004/02/10, 2004/02/11, 2004/02/12, 2004/02/16, 2004/02/18, 2004/02/20, 2004/02/23, 2004/02/24, 2004/02/25, 2004/02/26, 2004/02/27, 2004/03/01, 2004/03/02, 2004/03/03, 2004/03/04, 2004/03/05, 2004/03/08, 2004/03/09, 2004/03/10, 2004/03/11, 2004/03/14, 2004/03/16, 2004/03/17, 2004/03/22, 2004/03/23, 2004/03/24, 2004/03/25, 2004/03/26, 2004/03/28, 2004/03/29, 2004/03/30, 2004/03/31, 2004/04/01, 2004/04/02, 2004/04/03, 2004/04/05, 2004/04/09, 2004/04/12, 2004/04/13, 2004/04/14, 2004/04/16, 2004/04/19, 2004/04/20, 2004/04/21, 2004/04/22, 2004/04/23, 2004/04/26, 2004/04/27, 2004/04/28, 2004/04/29, 2004/04/30, 2004/05/03, 2004/05/04, 2004/05/05, 2004/05/07, 2004/05/11, 2004/05/12, 2004/05/13, 2004/05/14, 2004/05/17, 2004/05/18, 2004/05/20, 2004/05/24, 2004/05/25, 2004/05/26, 2004/05/27, 2004/05/28, 2004/05/29, 2004/05/30, 2004/05/31, 2004/06/02, 2004/06/03, 2004/06/07, 2004/06/08, 2004/06/09, 2004/06/10, 2004/06/11, 2004/06/13, 2004/06/14, 2004/06/15, 2004/06/16, 2004/06/20, 2004/06/21, 2004/06/22, 2004/06/23, 2004/06/24, 2004/06/25, 2004/06/27, 2004/06/28, 2004/06/29, 2004/06/30, 2004/07/06, 2004/07/07, 2004/07/08, 2004/07/09, 2004/07/11, 2004/07/12, 2004/07/14, 2004/07/15, 2004/07/17, 2004/07/21, 2004/07/28, 2004/07/31, 2004/08/02, 2004/08/03, 2004/08/04, 2004/08/05, 2004/08/09, 2004/08/10, 2004/08/12, 2004/08/13, 2004/08/17, 2004/08/19, 2004/08/20, 2004/08/21, 2004/08/22, 2004/08/23, 2004/08/24, 2004/08/25, 2004/08/26, 2004/08/27, 2004/08/28, 2004/08/29, 2004/08/30, 2004/08/31, 2004/09/01, 2004/09/02, 2004/09/09, 2004/09/10, 2004/09/11, 2004/09/12, 2004/09/13, 2004/09/14, 2004/09/15, 2004/09/16, 2004/09/17, 2004/09/18, 2004/09/19, 2004/09/20, 2004/09/21, 2004/09/22, 2004/09/23, 2004/09/24, 2004/09/27, 2004/09/28, 2004/09/29, 2004/09/30, 2004/10/01, 2004/10/02, 2004/10/03, 2004/10/04, 2004/10/05, 2004/10/06, 2004/10/07, 2004/10/08, 2004/10/09, 2004/10/10, 2004/10/11, 2004/10/12, 2004/10/13, 2004/10/14, 2004/10/15, 2004/10/16, 2004/10/17, 2004/10/18, 2004/10/19, 2004/10/20, 2004/10/21, 2004/10/22, 2004/10/23, 2004/10/24, 2004/10/25, 2004/10/26, 2004/10/27, 2004/10/28, 2004/10/29, 2004/10/30, 2004/10/31, 2004/11/02, 2004/11/03, 2004/11/04, 2004/11/05, 2004/11/06, 2004/11/07, 2004/11/08, 2004/11/09, 2004/11/10, 2004/11/11, 2004/11/12, 2004/11/13, 2004/11/16, 2004/11/17, 2004/11/18, 2004/11/19, 2004/11/20, 2004/11/21, 2004/11/22, 2004/11/23,......] - 6040 partitions

20/04/22 20:24:33 INFO HoodieCopyOnWriteTable: Using cleanerParallelism: 200
20/04/22 20:24:33 INFO SparkContext: Starting job: collect at HoodieCopyOnWriteTable.java:396
20/04/22 20:24:33 INFO DAGScheduler: Registering RDD 58 (repartition at HoodieCopyOnWriteTable.java:392)
20/04/22 20:24:33 INFO DAGScheduler: Registering RDD 62 (mapPartitionsToPair at HoodieCopyOnWriteTable.java:393)
20/04/22 20:24:33 INFO DAGScheduler: Got job 8 (collect at HoodieCopyOnWriteTable.java:396) with 200 output partitions
20/04/22 20:24:33 INFO DAGScheduler: Final stage: ResultStage 29 (collect at HoodieCopyOnWriteTable.java:396)
20/04/22 20:24:33 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 28)
20/04/22 20:24:33 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 28)
20/04/22 20:24:33 INFO DAGScheduler: Submitting ShuffleMapStage 27 (MapPartitionsRDD[58] at repartition at HoodieCopyOnWriteTable.java:392), which has no missing parents
20/04/22 20:24:33 INFO MemoryStore: Block broadcast_17 stored as values in memory (estimated size 174.2 KB, free 1008.1 MB)
20/04/22 20:24:33 INFO MemoryStore: Block broadcast_17_piece0 stored as bytes in memory (estimated size 60.8 KB, free 1008.0 MB)
20/04/22 20:24:33 INFO BlockManagerInfo: Added broadcast_17_piece0 in memory on ip (size: 60.8 KB, free: 1008.7 MB)
20/04/22 20:24:33 INFO SparkContext: Created broadcast 17 from broadcast at DAGScheduler.scala:1201
20/04/22 20:24:33 INFO DAGScheduler: Submitting 200 missing tasks from ShuffleMapStage 27 (MapPartitionsRDD[58] at repartition at HoodieCopyOnWriteTable.java:392) (first 15 tasks are for partitions Vector(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14))

20/04/22 20:24:35 INFO DAGScheduler: ShuffleMapStage 27 (repartition at HoodieCopyOnWriteTable.java:392) finished in 1.401 s
20/04/22 20:24:35 INFO DAGScheduler: looking for newly runnable stages
20/04/22 20:24:35 INFO DAGScheduler: running: Set()
20/04/22 20:24:35 INFO DAGScheduler: waiting: Set(ShuffleMapStage 28, ResultStage 29)
20/04/22 20:24:35 INFO DAGScheduler: failed: Set()
20/04/22 20:24:35 INFO DAGScheduler: Submitting ShuffleMapStage 28 (MapPartitionsRDD[62] at mapPartitionsToPair at HoodieCopyOnWriteTable.java:393), which has no missing parents
20/04/22 20:24:35 INFO MemoryStore: Block broadcast_18 stored as values in memory (estimated size 174.8 KB, free 1007.9 MB)
20/04/22 20:24:35 INFO MemoryStore: Block broadcast_18_piece0 stored as bytes in memory (estimated size 61.5 KB, free 1007.8 MB)
20/04/22 20:24:35 INFO BlockManagerInfo: Added broadcast_18_piece0 in memory on ip (size: 61.5 KB, free: 1008.7 MB)
20/04/22 20:24:35 INFO SparkContext: Created broadcast 18 from broadcast at DAGScheduler.scala:1201
20/04/22 20:24:35 INFO DAGScheduler: Submitting 200 missing tasks from ShuffleMapStage 28 (MapPartitionsRDD[62] at mapPartitionsToPair at HoodieCopyOnWriteTable.java:393) (first 15 tasks are for partitions Vector(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14))

20/04/22 20:24:35 INFO DAGScheduler: ResultStage 29 (collect at HoodieCopyOnWriteTable.java:396) finished in 0.057 s
20/04/22 20:24:35 INFO DAGScheduler: Job 8 finished: collect at HoodieCopyOnWriteTable.java:396, took 1.627846 s
20/04/22 20:24:35 INFO FileSystemViewManager: Creating InMemory based view for basePath <storageLocation>
20/04/22 20:24:35 INFO HoodieTableMetaClient: Loading HoodieTableMetaClient from <storageLocation>
20/04/22 20:24:35 INFO FSUtils: Hadoop Configuration: fs.defaultFS: hdfs-ip, Config:[Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml, emrfs-site.xml, __spark_hadoop_conf__.xml, file:/etc/spark/conf.dist/hive-site.xml], FileSystem: [com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem@3c0cfaad]
20/04/22 20:24:35 INFO HoodieTableConfig: Loading dataset properties from <storageLocation>.hoodie/hoodie.properties
20/04/22 20:24:35 INFO S3NativeFileSystem: Opening '<storageLocation>.hoodie/hoodie.properties' for reading
20/04/22 20:24:35 WARN S3CryptoModuleAE: Unable to detect encryption information for object '<storageLocation>.hoodie/hoodie.properties' in bucket 'delta-data-devo'. Returning object without decryption.
20/04/22 20:24:35 INFO HoodieTableMetaClient: Finished Loading Table of type COPY_ON_WRITE from <storageLocation>
20/04/22 20:24:35 INFO HoodieTableMetaClient: Loading Active commit timeline for <storageLocation>
20/04/22 20:24:35 INFO HoodieActiveTimeline: Loaded instants java.util.stream.ReferencePipeline$Head@1b7440c3
20/04/22 20:24:35 INFO HoodieWriteClient: Cleaned 0 files
20/04/22 20:24:36 INFO HoodieActiveTimeline: Marking instant complete [==>20200422201250__clean__INFLIGHT]
20/04/22 20:24:36 INFO CSEMultipartUploadOutputStream: close closed:false <storageLocation>.hoodie/20200422201250.clean.inflight
20/04/22 20:24:36 INFO DefaultMultipartUploadDispatcher: Completed multipart upload of 1 parts 0 bytes
20/04/22 20:24:36 INFO CSEMultipartUploadOutputStream: Finished uploading <storageLocation>.hoodie/20200422201250.clean.inflight. Elapsed seconds: 0.
20/04/22 20:24:36 INFO HoodieActiveTimeline: Created a new file in meta path: <storageLocation>.hoodie/20200422201250.clean.inflight
20/04/22 20:24:36 INFO CSEMultipartUploadOutputStream: close closed:false <storageLocation>.hoodie/20200422201250.clean.inflight
20/04/22 20:24:36 INFO DefaultMultipartUploadDispatcher: Completed multipart upload of 1 parts 272855 bytes
20/04/22 20:24:36 INFO CSEMultipartUploadOutputStream: Finished uploading <storageLocation>.hoodie/20200422201250.clean.inflight. Elapsed seconds: 0.
20/04/22 20:24:36 INFO S3NativeFileSystem: rename <storageLocation>.hoodie/20200422201250.clean.inflight <storageLocation>.hoodie/20200422201250.clean
20/04/22 20:24:36 INFO HoodieActiveTimeline: Completed [==>20200422201250__clean__INFLIGHT]
20/04/22 20:24:36 INFO HoodieWriteClient: Marked clean started on 20200422201250 as complete
20/04/22 20:24:36 INFO HoodieWriteClient: Committed 20200422201250
20/04/22 20:24:36 INFO HoodieSparkSqlWriter$: Commit 20200422201250 successful!
20/04/22 20:24:36 INFO DefaultSource: Constructing hoodie (as parquet) data source with options :Map(hoodie.datasource.write.insert.drop.duplicates -> false, hoodie.datasource.hive_sync.database -> default, hoodie.insert.shuffle.parallelism -> 200, path -> <storageLocation>, hoodie.datasource.write.precombine.field -> dedupeKey, hoodie.datasource.hive_sync.partition_fields -> , hoodie.datasource.write.payload.class -> org.apache.hudi.OverwriteWithLatestAvroPayload, hoodie.datasource.hive_sync.partition_extractor_class -> org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor, hoodie.datasource.write.streaming.retry.interval.ms -> 2000, hoodie.datasource.hive_sync.table -> unknown, hoodie.datasource.write.streaming.ignore.failed.batch -> true, hoodie.datasource.write.operation -> upsert, hoodie.datasource.hive_sync.enable -> false, hoodie.datasource.write.recordkey.field -> customer_trx_id, hoodie.datasource.view.type -> read_optimized, hoodie.table.name -> ra_customer_trx_all, hoodie.datasource.hive_sync.jdbcurl -> jdbc:hive2://localhost:10000, hoodie.datasource.write.storage.type -> COPY_ON_WRITE, hoodie.datasource.hive_sync.username -> hive, hoodie.datasource.write.streaming.retry.count -> 3, hoodie.datasource.hive_sync.password -> hive, hoodie.datasource.write.keygenerator.class -> org.apache.hudi.SimpleKeyGenerator, hoodie.upsert.shuffle.parallelism -> 200, hoodie.datasource.hive_sync.assume_date_partitioning -> false, hoodie.datasource.write.partitionpath.field -> partition_key, hoodie.datasource.write.commitmeta.key.prefix -> _, hoodie.bulkinsert.shuffle.parallelism -> 200)

Finally it says loading hoodie table for each partition like for 6040 partitions.
Something similar to 

20/04/22 21:06:20 INFO HoodieTableConfig: Loading dataset properties from <storageLocation>/.hoodie/hoodie.properties
20/04/22 21:06:20 INFO S3NativeFileSystem: Opening '<storageLocation>/.hoodie/hoodie.properties' for reading
20/04/22 21:06:20 WARN S3CryptoModuleAE: Unable to detect encryption information for object '<storageLocation>/.hoodie/hoodie.properties' in bucket 'delta-data-devo'. Returning object without decryption.
20/04/22 21:06:20 INFO HoodieTableMetaClient: Finished Loading Table of type COPY_ON_WRITE from <storageLocation>
20/04/22 21:06:20 INFO HoodieActiveTimeline: Loaded instants java.util.stream.ReferencePipeline$Head@b9cfa18
20/04/22 21:06:20 INFO HoodieTableFileSystemView: Adding file-groups for partition :2020/04/20, #FileGroups=1
20/04/22 21:06:20 INFO AbstractTableFileSystemView: addFilesToView: NumFiles=3, FileGroupsCreationTime=160, StoreTimeTaken=0
20/04/22 21:06:20 INFO HoodieROTablePathFilter: Based on hoodie metadata from base path: <storageLocation>, caching 1 files under <storageLocation>/2020/04/20
20/04/22 21:06:25 INFO SparkContext: Invoking stop() from shutdown hook
20/04/22 21:06:25 INFO SparkUI: Stopped Spark web UI at ip
20/04/22 21:06:25 INFO YarnClientSchedulerBackend: Interrupting monitor thread
20/04/22 21:06:25 INFO YarnClientSchedulerBackend: Shutting down all executors
20/04/22 21:06:25 INFO YarnSchedulerBackend$YarnDriverEndpoint: Asking each executor to shut down
20/04/22 21:06:25 INFO SchedulerExtensionServices: Stopping SchedulerExtensionServices

It runs till 20/04/22 21:06:25 
lamberken commented 4 years ago

hi @harshi2506, need more spark info log, you can put the logfile to google drive, e.g https://drive.google.com/file/d/1zzyaySDJqPgAdTSLnKwOG667QGvZhd03

lamberken commented 4 years ago

@harshi2506, I tried to reproduce your steps, maybe got the answer

From following demo code

export HADOOP_HOME=/BigData/install/hadoop/hadoop-2.7.7-streamjar
export HADOOP_MAPRED_HOME=$HADOOP_HOME
export HADOOP_COMMON_HOME=$HADOOP_HOME
export HADOOP_HDFS_HOME=$HADOOP_HOME
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export YARN_HOME=$HADOOP_HOME
export YARN_CONF_DIR=$HADOOP_HOME/etc/hadoop
export HADOOP_USER_NAME=dcadmin

export SPARK_HOME=/BigData/install/spark-2.4.4-bin-hadoop2.7
${SPARK_HOME}/bin/spark-shell \
  --master yarn \
  --driver-memory 6G \
  --executor-memory 6G \
  --num-executors 5 \
  --executor-cores 5 \
  --queue root.default \
  --packages org.apache.hudi:hudi-spark-bundle:0.5.0-incubating,org.apache.spark:spark-avro_2.11:2.4.4 \
  --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'

import org.apache.spark.sql.functions._

val tableName = "hudi_mor_table"
val basePath = "viewfs://dcfs/ns-log/wxdprtc/flink/hudi"

val hudiOptions = Map[String,String](
  "hoodie.upsert.shuffle.parallelism" -> "10",
  "hoodie.datasource.write.recordkey.field" -> "key",
  "hoodie.datasource.write.partitionpath.field" -> "dt", 
  "hoodie.table.name" -> tableName,
  "hoodie.datasource.write.precombine.field" -> "timestamp"
)

val inputDF = spark.range(1, 7000).
   withColumn("key", $"id").
   withColumn("data", lit("data")).
   withColumn("timestamp", unix_timestamp()).
   withColumn("dtstamp", unix_timestamp() + ($"id" * 24 * 3600)).
   withColumn("dt", from_unixtime($"dtstamp", "yyyy/MM/dd"))

inputDF.write.format("org.apache.hudi").
  options(hudiOptions).
  mode("Overwrite").
  save(basePath)

spark.read.format("org.apache.hudi").load(basePath + "/*/*/*").count();

// cost long time, about 15min
sc.setLogLevel("INFO")

val inputDF = spark.range(1, 300).
   withColumn("key", $"id").
   withColumn("data", lit("data")).
   withColumn("timestamp", unix_timestamp()).
   withColumn("dtstamp", unix_timestamp() + ($"id" * 24 * 3600)).
   withColumn("dt", from_unixtime($"dtstamp", "yyyy/MM/dd"))

inputDF.write.format("org.apache.hudi").
  options(hudiOptions).
  mode("Append").
  save(basePath)

// cost less time, about 3min
sc.setLogLevel("WARN")

val inputDF = spark.range(1, 300).
   withColumn("key", $"id").
   withColumn("data", lit("data")).
   withColumn("timestamp", unix_timestamp()).
   withColumn("dtstamp", unix_timestamp() + ($"id" * 24 * 3600)).
   withColumn("dt", from_unixtime($"dtstamp", "yyyy/MM/dd"))

inputDF.write.format("org.apache.hudi").
  options(hudiOptions).
  mode("Append").
  save(basePath)
lamberken commented 4 years ago

Spark log analysis brainstorming

hi @vinothchandar, preliminary analysis about this issue (based on release-0.5.0 branch https://github.com/apache/incubator-hudi/tree/release-0.5.0)

Simplified origin spark log

// Upsert part
Warning: Ignoring non-spark config property: "spark.sql.hive.convertMetastoreParquet=false"
Warning: Ignoring non-spark config property: "spark.serializer=org.apache.spark.serializer.KryoSerializer"
20/04/22 20:12:14 WARN S3CryptoModuleAE: Unable to detect encryption information for object '<JARPATH>' in bucket '<storageBucket>'. Returning object without decryption.
20/04/22 20:12:14 WARN HiveConf: HiveConf of name hive.server2.thrift.url does not exist
20/04/22 20:12:14 INFO SparkContext: Running Spark version 2.4.4
20/04/22 20:12:14 INFO SparkContext: Submitted application: HUDI incremental data loading
20/04/22 20:12:15 INFO SecurityManager: Changing view acls to: hadoop
20/04/22 20:12:15 INFO SecurityManager: Changing modify acls to: hadoop
20/04/22 20:12:15 INFO SecurityManager: Changing view acls groups to: 
20/04/22 20:15:31 INFO S3NativeFileSystem: Opening '<storageLocation>.hoodie/hoodie.properties' for reading
20/04/22 20:15:31 WARN S3CryptoModuleAE: Unable to detect encryption information for object '<storgaeLocation>/.hoodie/hoodie.properties' in bucket '<storageBucket>'. Returning object without decryption.
20/04/22 20:15:31 INFO HoodieTableMetaClient: Finished Loading Table of type COPY_ON_WRITE from <storageLocation>
20/04/22 20:15:31 INFO HoodieTableMetaClient: Loading Active commit timeline for <storageLocation>
20/04/22 20:15:31 INFO HoodieActiveTimeline: Loaded instants java.util.stream.ReferencePipeline$Head@678852b5
20/04/22 20:24:33 INFO HoodieCopyOnWriteTable: Partitions to clean up : [2001/05/02, 2001/05/07, 2001/05/09, 2001/05/10, 2001/05/17, 2001/05/18, 2001/05/21, 2001/06/01, 2001/06/04, 2001/06/08, 2001/06/20, 2001/06/21, 2001/07/17, 2001/07/23, 2001/07/25, 2001/07/30, 2001/08/02, 2001/08/03, 2001/08/07, 2001/08/08, 2001/08/09, 2001/08/14, 2001/08/23, 2001/09/05, 2001/09/06, 2001/09/07, 2001/09/13, 2001/09/14, 2001/10/02, 2001/10/03, 2001/10/04, 2001/10/09, 2001/11/01, 2001/11/09, 2001/11/14, 2001/11/15, 2001/11/16, 2001/11/19, 2001/11/20, 2001/11/21,
20/04/22 20:24:36 INFO HoodieWriteClient: Marked clean started on 20200422201250 as complete
20/04/22 20:24:36 INFO HoodieWriteClient: Committed 20200422201250
20/04/22 20:24:36 INFO HoodieSparkSqlWriter$: Commit 20200422201250 successful!

// CreateRelation(sqlContext, parameters, df.schema)
20/04/22 20:24:36 INFO DefaultSource: Constructing hoodie (as parquet) data source with options ...

20/04/22 20:24:38 INFO HoodieTableMetaClient: Loading HoodieTableMetaClient from <storgaeLocation>
20/04/22 20:24:38 INFO FSUtils: Hadoop Configuration: fs.defaultFS:...], FileSystem: [com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem@3c0cfaad]
...
20/04/22 20:24:38 INFO S3NativeFileSystem: Opening '<storageLocation>2001/05/10/.hoodie_partition_metadata' for reading
20/04/22 20:24:38 WARN S3CryptoModuleAE: Unable to detect encryption information for object '<storgaeLocation>/2001/05/10/.hoodie_partition_metadata' in bucket '<storageBucket>'. Returning object without decryption.

20/04/22 20:24:38 INFO HoodieTableMetaClient: Loading HoodieTableMetaClient from <storgaeLocation>
20/04/22 20:24:38 INFO FSUtils: Hadoop Configuration: fs.defaultFS:...], FileSystem: [com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem@3c0cfaad]
...
20/04/22 20:24:39 INFO S3NativeFileSystem: Opening '<storageLocation>2001/05/17/.hoodie_partition_metadata' for reading
20/04/22 20:24:39 WARN S3CryptoModuleAE: Unable to detect encryption information for object '<storgaeLocation>/2001/05/17/.hoodie_partition_metadata' in bucket '<storageBucket>'. Returning object without decryption.

20/04/22 20:24:39 INFO HoodieTableMetaClient: Loading HoodieTableMetaClient from <storgaeLocation>
20/04/22 20:24:39 INFO FSUtils: Hadoop Configuration: fs.defaultFS:...], FileSystem: [com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem@3c0cfaad]
...
20/04/22 20:24:39 INFO S3NativeFileSystem: Opening '<storageLocation>2001/05/18/.hoodie_partition_metadata' for reading
20/04/22 20:24:39 WARN S3CryptoModuleAE: Unable to detect encryption information for object '<storgaeLocation>/2001/05/18/.hoodie_partition_metadata' in bucket '<storageBucket>'. Returning object without decryption.

// Finish part
20/04/22 21:06:25 INFO SparkContext: Successfully stopped SparkContext
20/04/22 21:06:25 INFO ShutdownHookManager: Shutdown hook called
20/04/22 21:06:25 INFO ShutdownHookManager: Deleting directory /mnt/tmp/spark-c24e90ad-7be5-4a04-b4f1-2575cf68bd5a
20/04/22 21:06:25 INFO ShutdownHookManager: Deleting directory /mnt/tmp/spark-c18b2f4f-e525-43de-b896-75244fe63591

Analysis

image

Analysis about step 2

Load about 6000 partitions, cost 9mins

  1. Call stackstrace
    "main" Id=1 cpuUsage=94% RUNNABLE
    ...
    at org.apache.hudi.common.util.FSUtils.processFiles(FSUtils.java:227)
    at org.apache.hudi.common.util.FSUtils.getAllFoldersWithPartitionMetaFile(FSUtils.java:191)
    at org.apache.hudi.common.util.FSUtils.getAllPartitionPaths(FSUtils.java)
    at org.apache.hudi.table.HoodieCopyOnWriteTable.clean(HoodieCopyOnWriteTable.java:285)
    at org.apache.hudi.HoodieWriteClient.clean(HoodieWriteClient.java:950)
  2. IMO, Step 2 affected by NN's RPC performance.

Analysis about step 4

When I set WARN log level, the clean operation run faster. Performance loss due to the interaction between driver and executor?

image

lamberken commented 4 years ago

@harshi2506 I am suspecting this may be due to a recent bug we fixed on master (still not 100%). Are you open to building hudi off master branch and giving that a shot? I am suspecting #1394

IMO, this solution will help you, please try to building hudi master branch. @harshi2506

lamberken commented 4 years ago

hi @harshi2506, build steps: 1. Build Env

2. Commands

git clone https://github.com/apache/incubator-hudi.git
mvn clean install -DskipTests -DskipITs -Dcheckstyle.skip=true -Drat.skip=true

3. Run env

// run in yarn env export SPARK_HOME=/BigData/install/spark-2.4.4-bin-hadoop2.7 ${SPARK_HOME}/bin/spark-shell \ --master yarn \ --driver-memory 6G \ --executor-memory 6G \ --num-executors 5 \ --executor-cores 5 \ --queue root.default \ --packages org.apache.spark:spark-avro2.11:2.4.4 \ --jars `ls packaging/hudi-spark-bundle/target/hudi-spark-bundle.-..*-SNAPSHOT.jar` \ --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'

// scripts import org.apache.spark.sql.functions._

val tableName = "hudi_mor_table" val basePath = "file:///tmp/hudi_mor_tablen" // val basePath = "hdfs:///hudi/test"

val hudiOptions = Map[String,String]( "hoodie.upsert.shuffle.parallelism" -> "10", "hoodie.datasource.write.recordkey.field" -> "key", "hoodie.datasource.write.partitionpath.field" -> "dt", "hoodie.table.name" -> tableName, "hoodie.datasource.write.precombine.field" -> "timestamp" )

val inputDF = spark.range(1, 7). withColumn("key", $"id"). withColumn("data", lit("data")). withColumn("timestamp", unix_timestamp()). withColumn("dtstamp", unix_timestamp() + ($"id" 24 3600)). withColumn("dt", from_unixtime($"dtstamp", "yyyy/MM/dd"))

inputDF.write.format("org.apache.hudi"). options(hudiOptions). mode("Overwrite"). save(basePath)

spark.read.format("org.apache.hudi").load(basePath + "///*").show();

harshi2506 commented 4 years ago

hi @harshi2506, build steps: 1. Build Env

  • JDK8
  • Unix

2. Commands

git clone https://github.com/apache/incubator-hudi.git
mvn clean install -DskipTests -DskipITs -Dcheckstyle.skip=true -Drat.skip=true

3. Run env

  • Spark-2.4.4+
  • avro-1.8.0
// run in local env
export SPARK_HOME=/work/BigData/install/spark/spark-2.4.4-bin-hadoop2.7
${SPARK_HOME}/bin/spark-shell \
  --driver-memory 6G \
  --packages org.apache.spark:spark-avro_2.11:2.4.4 \
  --jars `ls packaging/hudi-spark-bundle/target/hudi-spark-bundle_*.*-*.*.*-SNAPSHOT.jar` \
  --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'

// run in yarn env
export SPARK_HOME=/BigData/install/spark-2.4.4-bin-hadoop2.7
${SPARK_HOME}/bin/spark-shell \
  --master yarn \
  --driver-memory 6G \
  --executor-memory 6G \
  --num-executors 5 \
  --executor-cores 5 \
  --queue root.default \
  --packages org.apache.spark:spark-avro_2.11:2.4.4 \
  --jars `ls packaging/hudi-spark-bundle/target/hudi-spark-bundle_*.*-*.*.*-SNAPSHOT.jar` \
  --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'

// scripts
import org.apache.spark.sql.functions._

val tableName = "hudi_mor_table"
val basePath = "file:///tmp/hudi_mor_tablen"
// val basePath = "hdfs:///hudi/test"

val hudiOptions = Map[String,String](
  "hoodie.upsert.shuffle.parallelism" -> "10",
  "hoodie.datasource.write.recordkey.field" -> "key",
  "hoodie.datasource.write.partitionpath.field" -> "dt", 
  "hoodie.table.name" -> tableName,
  "hoodie.datasource.write.precombine.field" -> "timestamp"
)

val inputDF = spark.range(1, 7).
   withColumn("key", $"id").
   withColumn("data", lit("data")).
   withColumn("timestamp", unix_timestamp()).
   withColumn("dtstamp", unix_timestamp() + ($"id" * 24 * 3600)).
   withColumn("dt", from_unixtime($"dtstamp", "yyyy/MM/dd"))

inputDF.write.format("org.apache.hudi").
  options(hudiOptions).
  mode("Overwrite").
  save(basePath)

spark.read.format("org.apache.hudi").load(basePath + "/*/*/*").show();

@lamber-ken will try and let you know. Thanks

lamberken commented 4 years ago

The lastest spark log (master branch)

Simplified origin spark log

// Upsert part
Warning: Ignoring non-spark config property: "spark.sql.hive.convertMetastoreParquet=false"
Warning: Ignoring non-spark config property: "spark.serializer=org.apache.spark.serializer.KryoSerializer"
20/04/26 17:35:58 WARN S3CryptoModuleAE: Unable to detect encryption information for object 'jar' in bucket '<storageBucket>'. Returning object without decryption.

// Start rollback
20/04/26 17:36:34 INFO HoodieWriteClient: Begin rollback of instant 20200426170940
20/04/26 17:36:34 INFO HoodieTableMetaClient: Loading HoodieTableMetaClient from <storageLocation>
20/04/26 17:36:34 INFO FSUtils: Hadoop Configuration: fs.defaultFS: [hdfs://ip-10-0-55-99.ec2.internal:8020]

// Output repeatly 
20/04/26 17:36:40 INFO FileStatusExt: Metadata Entry doesn't exist
...
20/04/26 18:19:07 INFO FileStatusExt: Metadata Entry doesn't exist

// Finish
20/04/26 18:19:08 INFO CleanActionExecutor: Total Partitions to clean : 6052, with policy KEEP_LATEST_COMMITS
20/04/26 18:19:08 INFO CleanActionExecutor: Using cleanerParallelism: 6000
20/04/26 18:19:08 INFO SparkContext: Starting job: collect at CleanActionExecutor.java:87
20/04/26 18:19:08 INFO DAGScheduler: Got job 10 (collect at CleanActionExecutor.java:87) with 6000 output partitions
20/04/26 18:19:08 INFO DAGScheduler: Final stage: ResultStage 30 (collect at CleanActionExecutor.java:87)
20/04/26 18:19:08 INFO DAGScheduler: Parents of final stage: List()
20/04/26 18:19:08 INFO DAGScheduler: Missing parents: List()
20/04/26 18:19:08 INFO DAGScheduler: Submitting ResultStage 30 (MapPartitionsRDD[63] at map at CleanActionExecutor.java:86), which has no missing parents
20/04/26 18:19:08 INFO MemoryStore: Block broadcast_20 stored as values in memory (estimated size 247.7 KB, free 1008.3 MB)
20/04/26 18:19:08 INFO MemoryStore: Block broadcast_20_piece0 stored as bytes in memory (estimated size 87.1 KB, free 1008.2 MB)
20/04/26 18:19:08 INFO BlockManagerInfo: Added broadcast_20_piece0 in memory on ip-10-0-55-99.ec2.internal:44415 (size: 87.1 KB, free: 1008.8 MB)
20/04/26 18:19:08 INFO SparkContext: Created broadcast 20 from broadcast at DAGScheduler.scala:1201
20/04/26 18:19:08 INFO DAGScheduler: Submitting 6000 missing tasks from ResultStage 30 (MapPartitionsRDD[63] at map at CleanActionExecutor.java:86) (first 15 tasks are for partitions Vector(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14))
20/04/26 18:19:08 INFO YarnScheduler: Adding task set 30.0 with 6000 tasks
20/04/26 18:19:23 INFO YarnScheduler: Removed TaskSet 32.0, whose tasks have all completed, from pool 
20/04/26 18:19:23 INFO DAGScheduler: ResultStage 32 (collect at CleanActionExecutor.java:155) finished in 3.444 s
20/04/26 18:19:23 INFO DAGScheduler: Job 11 finished: collect at CleanActionExecutor.java:155, took 8.313284 s
20/04/26 18:19:23 INFO HoodieActiveTimeline: Loaded instants [[20200425201231__clean__COMPLETED], [20200425201231__commit__COMPLETED], [==>20200426173625__clean__INFLIGHT], [20200426173625__commit__COMPLETED]]
20/04/26 18:19:24 INFO HoodieActiveTimeline: Checking for file exists ?<storageLocation>.hoodie/20200426173625.clean.inflight
20/04/26 18:19:24 INFO CSEMultipartUploadOutputStream: close closed:false <storageLocation>.hoodie/20200426173625.clean
20/04/26 18:19:24 INFO DefaultMultipartUploadDispatcher: Completed multipart upload of 1 parts 273450 bytes
20/04/26 18:19:24 INFO CSEMultipartUploadOutputStream: Finished uploading <storageLocation>.hoodie/20200426173625.clean. Elapsed seconds: 0.
20/04/26 18:19:24 INFO HoodieActiveTimeline: Create new file for toInstant ?<storageLocation>.hoodie/20200426173625.clean
20/04/26 18:19:24 INFO CleanActionExecutor: Marked clean started on 20200426173625 as complete
20/04/26 18:19:24 INFO AbstractHoodieWriteClient: Committed 20200426173625
20/04/26 18:19:24 INFO HoodieSparkSqlWriter$: Commit 20200426173625 successful!
20/04/26 18:19:28 INFO SparkContext: Invoking stop() from shutdown hook
20/04/26 18:19:28 INFO SparkUI: Stopped Spark web UI at ip
20/04/26 18:19:28 INFO YarnClientSchedulerBackend: Interrupting monitor thread
20/04/26 18:19:28 INFO YarnClientSchedulerBackend: Shutting down all executors
20/04/26 18:19:28 INFO YarnSchedulerBackend$YarnDriverEndpoint: Asking each executor to shut down
20/04/26 18:19:28 INFO SchedulerExtensionServices: Stopping SchedulerExtensionServices
(serviceOption=None,
 services=List(),
 started=false)
20/04/26 18:19:28 INFO YarnClientSchedulerBackend: Stopped
20/04/26 18:19:28 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
20/04/26 18:19:28 INFO MemoryStore: MemoryStore cleared
20/04/26 18:19:28 INFO BlockManager: BlockManager stopped
20/04/26 18:19:28 INFO BlockManagerMaster: BlockManagerMaster stopped
20/04/26 18:19:28 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
20/04/26 18:19:28 INFO SparkContext: Successfully stopped SparkContext
20/04/26 18:19:28 INFO ShutdownHookManager: Shutdown hook called
20/04/26 18:19:28 INFO ShutdownHookManager: Deleting directory /mnt/tmp/spark-e12c26c7-150a-4837-8fb2-72db585dd2f3
20/04/26 18:19:28 INFO ShutdownHookManager: Deleting directory /mnt/tmp/spark-1a4b587a-a177-4cab-8f93-6849d64386d9
Command exiting with ret '0'

Analysis

image

Analysis about step 2

Rollback cost 42mins, output FileStatusExt: Metadata Entry doesn't exist repeatly.

harshi2506 commented 4 years ago

@vinothchandar, I tried building jar from mater branch and loaded a snapshot, it is failing every time saying

20/04/28 09:40:14 WARN TaskSetManager: Lost task 178.2 in stage 4.0 (TID 37246, ip-10-0-61-179.ec2.internal, executor 17): java.lang.RuntimeException: org.apache.hudi.exception.HoodieException: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieInsertException: Failed to close the Insert Handle for path <storageLocation>/2018/07/03/6452abed-658d-4840-92cf-1b7b970d2b61-30_178-4-37246_20200428084112.parquet
    at org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:121)
    at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:43)
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
    at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
    at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:349)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.hudi.exception.HoodieException: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieInsertException: Failed to close the Insert Handle for path <storageLocation>/2018/07/03/6452abed-658d-4840-92cf-1b7b970d2b61-30_178-4-37246_20200428084112.parquet
    at org.apache.hudi.execution.CopyOnWriteLazyInsertIterable.computeNext(CopyOnWriteLazyInsertIterable.java:110)
    at org.apache.hudi.execution.CopyOnWriteLazyInsertIterable.computeNext(CopyOnWriteLazyInsertIterable.java:46)
    at org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:119)
    ... 23 more
Caused by: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieInsertException: Failed to close the Insert Handle for path <storageLocation>/2018/07/03/6452abed-658d-4840-92cf-1b7b970d2b61-30_178-4-37246_20200428084112.parquet
    at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:143)
    at org.apache.hudi.execution.CopyOnWriteLazyInsertIterable.computeNext(CopyOnWriteLazyInsertIterable.java:106)
    ... 25 more
Caused by: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieInsertException: Failed to close the Insert Handle for path <storageLocation>/2018/07/03/6452abed-658d-4840-92cf-1b7b970d2b61-30_178-4-37246_20200428084112.parquet
    at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    at java.util.concurrent.FutureTask.get(FutureTask.java:192)
    at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:141)
    ... 26 more
Caused by: org.apache.hudi.exception.HoodieInsertException: Failed to close the Insert Handle for path <storageLocation>/2018/07/03/6452abed-658d-4840-92cf-1b7b970d2b61-30_178-4-37246_20200428084112.parquet
    at org.apache.hudi.io.HoodieCreateHandle.close(HoodieCreateHandle.java:183)
    at org.apache.hudi.execution.CopyOnWriteLazyInsertIterable$CopyOnWriteInsertHandler.consumeOneRecord(CopyOnWriteLazyInsertIterable.java:152)
    at org.apache.hudi.execution.CopyOnWriteLazyInsertIterable$CopyOnWriteInsertHandler.consumeOneRecord(CopyOnWriteLazyInsertIterable.java:132)
    at org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:37)
    at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:121)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    ... 3 more
Caused by: java.io.FileNotFoundException: No such file or directory '<storageLocation>/2018/07/03/6452abed-658d-4840-92cf-1b7b970d2b61-30_178-4-37246_20200428084112.parquet'
    at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:830)
    at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.getFileStatus(EmrFileSystem.java:553)
    at org.apache.hudi.common.fs.HoodieWrapperFileSystem.getFileStatus(HoodieWrapperFileSystem.java:304)
    at org.apache.hudi.common.fs.FSUtils.getFileSize(FSUtils.java:143)
    at org.apache.hudi.io.HoodieCreateHandle.close(HoodieCreateHandle.java:169)
    ... 8 more
vinothchandar commented 4 years ago

@harshi2506 On master, there is no CopyOnWriteLazyInsertIterable https://github.com/apache/incubator-hudi/tree/master/hudi-client/src/main/java/org/apache/hudi/execution . Wondering if you are behind on master.

In any case, are you running with consistency check enabled? That error seems to indicate that close() cannot find the file it wrote.

https://hudi.apache.org/docs/configurations.html#withConsistencyCheckEnabled This has been done many times.. So must be some issue..

lamberken commented 4 years ago

BUG status

bug status way
upsert long time first time fixed upgrate version (0.5.0 to master)
FileStatusExt: Metadata Entry doesn't exist fixed using a whole new base path
java.io.FileNotFoundException: No such file or directory fiexed but don't know the root cause
17 minutes lag between HoodieActiveTimeline and CleanActionExecutor fighting xxx
harshi2506 commented 4 years ago

fighting

BUG status

bug status way upsert long time first time fixed upgrate version (0.5.0 to master) FileStatusExt: Metadata Entry doesn't exist fixed using a whole new base path java.io.FileNotFoundException: No such file or directory fiexed but don't know the root cause 17 minutes lag between HoodieActiveTimeline and CleanActionExecutor fighting xxx

Hi @lamber-ken , few changes to the bug status report

bug status way
upsert long time first time fixed upgrate version (0.5.0 to master)
FileStatusExt: Metadata Entry doesn't exist fixed By enabling EMRFS on EMR cluster or by using hoodie file consistency config
java.io.FileNotFoundException: No such file or directory fixed By enabling EMRFS on EMR cluster or by using hoodie file consistency config("hoodie.consistency.check.enabled" -> "true")
17 minutes lag between HoodieActiveTimeline and CleanActionExecutor fighting xxx
harishchanderramesh commented 4 years ago

@harshi2506 I am suspecting this may be due to a recent bug we fixed on master (still not 100%). Are you open to building hudi off master branch and giving that a shot? I am suspecting #1394

Is this issue available in aws emr too? i am using "org.apache.hudi:hudi-spark-bundle_2.11:0.5.1-incubating" package.

vinothchandar commented 4 years ago

aws emr is on 0.5.0, correct? @umehrot2

seems like it https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-release-6x.html#emr-600-app-versions

how would it interplay if the user wants to use their own hudi version>?

harishchanderramesh commented 4 years ago

Thats a good question. I would like to know how would it impact. I see hudi 0.5.0-incubating jar preloaded while launching hudi cli. but what if i use any other versions on spark jobs?

umehrot2 commented 4 years ago

@vinothchandar we have two series 5.x and 6.x which both support Hudi. Latest 5.x series i.e. emr-5.30.0 support 0.5.2-incubating, while the latest 6.x series is i.e. emr-6.0.0 is still on 0.5.0-incubating. If the user, wants to try another or their own version of Hudi they can drop their jars under /usr/lib/hudi/ to replace existing ones. But in theory it has the potential to cause runtime issues because that hudi jar would not have been built with our version of say spark, hadoop etc (however at this point i see less chances of that happening). But you will never know that without trying it out.

FelixKJose commented 4 years ago

Did someone try replacing the existing HUDI jars with 0.6.0 version or some custom jars in /usr/lib/hudi/ of EMR? I am trying to use 6.0 EMR, but the HUDI version is 0.5.0 and is missing these performance fixes. So could you @umehrot2 let me know whats the advised direction?

n3nash commented 4 years ago

@umehrot2 Could you please help @FelixKJose with how they can try 0.6.0 with EMR ?

n3nash commented 3 years ago

@FelixKJose Closing this ticket due to inactivity. Please feel free to re-open or open a new one if you continue to see issue. I understand you are also trying to get 0.8.0 version work with EMR to try out multi-writer. Let us know if you run into issues by opening a new ticket.