apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.34k stars 2.42k forks source link

Upsert operation not working and job is running longer while using "Record level index" in Apache Hudi 0.14 in EMR 6.15 #10587

Open SudhirSaxena opened 8 months ago

SudhirSaxena commented 8 months ago

Describe the problem you faced

I am facing issues for upsert operation in hudi 0.14 RLI in EMR 6.15 Spark 3.4.1 using "Record level Index". i see insert mode working as expected but upsert operation is not working with existing hudi table(lower version) and having several issues. In short, I am getting error for hoodieException config conflict(key current value existing value) Hoodie.database.name : database name I am giving the same database name(hudi db) where data is stored as target and same db providing in source data ( hudi config) and trying to upsert but throwing this hoodie.database.name issues. Can you please have a look and help me to fix this issues.

To Reproduce Steps to reproduce the behavior (For me):

  1. Spin up new EMR cluster (emr-6.15.0 with spark 3.4.1
  2. placed the jar hudi-spark3.4-bundle_2.12-0.14.0.jar and spark-avro_2.13-3.5.0.jar on S3 jar location
  3. trying to run the exact script which is running on emr-6.5.0 with Spark 3.1.2 and hudi-spark3.1.2-bundle_2.12-0.10.1.jar

Expected behavior My expectations would be to have the same script run successfully and insert and update data into Hudi table.

Environment Description Hudi version : 0.14.0 Spark version : 3.4.1 Hive version : 3.1.3 Hadoop version : 3.3.6 Storage (HDFS/S3/GCS..) : s3 Running on Docker? (yes/no) : no

Additional context config I am using for upsert operation: Upsert operation config:

hudi_operation = "upsert" hudi_write_mode = "append" hudi_config["className"] = "org.apache.hudi" hudi_config["hoodie.table.keygenerator.class"] = "org.apache.hudi.keygen.ComplexKeyGenerator" hudi_config["hoodie.table.name"] = tgt_tbl hudi_config["hoodie.database.name"] = tgt_db hudi_config["hoodie.datasource.write.recordkey.field"] = "id" hudi_config["hoodie.datasource.write.operation"] = hudi_operation hudi_config["hoodie.datasource.hive_sync.table"] = tgt_tbl hudi_config["hoodie.datasource.hive_sync.database"] = tgt_db hudi_config["hoodie.index.type"] = "RECORD_INDEX" hudi_config["hoodie.datasource.write.table.name"] = tgt_tbl hudi_config["hoodie.datasource.write.precombine.field"] = "eff_fm_cent_tz" hudi_config["hoodie.metadata.record.index.enable"] = "true"

below is code snippet :-

hudi_config = { "className":"org.apache.hudi", "hoodie.table.name": tgt_tbl, "hoodie.datasource.write.recordkey.field": "id", "hoodie.datasource.write.precombine.field": "eff_fm_cent_tz", "hoodie.datasource.write.operation": "upsert", "hoodie.datasource.write.keygenerator.class": "org.apache.hudi.keygen.ComplexKeyGenerator", "hoodie.datasource.write.partitionpath.field": "year,month", "hoodie.datasource.hive_sync.support_timestamp": "true", "hoodie.datasource.hive_sync.enable": "true", "hoodie.datasource.hive_sync.assume_date_partitioning": "false", "hoodie.datasource.hive_sync.table": tgt_tbl, "hoodie.datasource.hive_sync.use_jdbc": "false", "hoodie.datasource.hive_sync.mode": "hms", "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor", "hoodie.datasource.write.hive_style_partitioning": "true", "hoodie.upsert.shuffle.parallelism": hudi_upsert_parallelism, "hoodie.delete.shuffle.parallelism": hudi_delete_parallelism, "hoodie.bulkinsert.sort.mode": "GLOBAL_SORT", "hoodie.metadata.enable": "true", "hoodie.metadata.record.index.enable": "true", "hoodie.index.type": "RECORD_INDEX", "hoodie.metadata.index.column.stats.column.list": "res_sys_id,pnr_rec_loc_id,pnr_cre_dt", "hoodie.enable.data.skipping": "true" } hudi_delete_config = { "hoodie.datasource.write.payload.class": "org.apache.hudi.common.model.EmptyHoodieRecordPayload" }

Initialize local s3

    if (files_exist == True):
        print("upsert started")
        hudi_operation = "upsert"
        hudi_write_mode = "append"
        hudi_config["className"] = "org.apache.hudi"
        hudi_config["hoodie.table.keygenerator.class"] = "org.apache.hudi.keygen.ComplexKeyGenerator"
        hudi_config["hoodie.database.name"] = tgt_db
        hudi_config["hoodie.table.name"] = tgt_tbl
        hudi_config["hoodie.datasource.write.recordkey.field"] = "id"
        hudi_config["hoodie.datasource.write.operation"] = hudi_operation
        hudi_config["hoodie.datasource.hive_sync.table"] = tgt_tbl
        hudi_config["hoodie.datasource.hive_sync.database"] = tgt_db
        hudi_config["hoodie.index.type"] = "RECORD_INDEX"
        hudi_config["hoodie.metadata.enable"] = "true"
        hudi_config["hoodie.datasource.write.table.name"] = tgt_tbl
        hudi_config["hoodie.datasource.write.precombine.field"] = "eff_fm_cent_tz"
        hudi_config["hoodie.metadata.record.index.enable"] = "true"
        print("Upserting records into " + tgt_tbl + " Hudi table")
        res_pnr_concat.write.format("org.apache.hudi").options(**hudi_config).mode("Append").save(hudi_tbl_path + tgt_tbl)
        print("Successfully process records for " + tgt_tbl + " Hudi table")
        print("res_pnr hudi loading for upsert ended ----",datetime.now())
        if len(target_records) > 0:
            print("delete logic started ----",datetime.now())
            res_pnr_concat.createOrReplaceTempView("res_pnr_concat")
            res_pnr_del_df=spark.sql("select distinct A.* from res_pnr_cached A where not exists(select 1 from res_pnr_concat B where A.RES_SYS_ID = B.RES_SYS_ID AND A.PNR_REC_LOC_ID = B.PNR_REC_LOC_ID AND A.PNR_CRE_DT = B.PNR_CRE_DT AND A.EFF_FM_CENT_TZ = B.eff_fm_cent_tz and A.year = B.year and A.month = B.month)").drop("src_hoodie_record_key").drop("_hoodie_commit_seqno").drop("_hoodie_commit_time").drop("_hoodie_file_name").drop("_hoodie_partition_path").drop("_hoodie_record_key")
            if len(res_pnr_del_df.take(1)) > 0:
                common_config = {**hudi_config, **hudi_delete_config}
                spark.sql("uncache table if exists res_pnr_src_df")
                print("Deleting records from RES_PNR Hudi table")
                res_pnr_del_df.write.format("org.apache.hudi").options(**common_config).mode("append").save(hudi_tbl_path + tgt_tbl)
            else:
                print("Delete eligible records are NOT identified")
            print("delete logic Ended ----",datetime.now())
        else:
            pass
        print("res_pnr hudi upsert logic ended ----",datetime.now())
    else:
        hudi_operation = "bulk_insert"
        hudi_write_mode = "overwrite"
        hudi_config["className"] = "org.apache.hudi"
        hudi_config["hoodie.table.name"] = tgt_tbl
        hudi_config["hoodie.database.name"] = tgt_db
        hudi_config["hoodie.datasource.write.operation"] = hudi_operation
        hudi_config["hoodie.datasource.hive_sync.table"] = tgt_tbl
        hudi_config["hoodie.datasource.write.recordkey.field"] = "id"
        hudi_config["hoodie.datasource.hive_sync.database"] = tgt_db
        hudi_config["hoodie.index.type"] = "RECORD_INDEX"
        hudi_config["hoodie.metadata.enable"] = "true"
        hudi_config["hoodie.datasource.write.table.name"] = tgt_tbl
        hudi_config["hoodie.datasource.write.precombine.field"] = "eff_fm_cent_tz"
        hudi_config["hoodie.metadata.record.index.enable"] = "true"
        print("bulk insert " + tgt_tbl + " Hudi table")
        res_pnr_concat.write.format("org.apache.hudi").options(**hudi_config).mode(hudi_write_mode).save(hudi_tbl_path + tgt_tbl)
        print("Successfully bulk upserted records into " + tgt_tbl + " Hudi table")
    print("res_pnr hudi loading for upsert ended ----",datetime.now())

Upsert operation job is throwing an error:

An error was encountered: An error occurred while calling o2149.save. : org.apache.hudi.exception.HoodieException: Config conflict(key current value existing value): hoodie.database.name: datalake_dev1_entp_cds at org.apache.hudi.HoodieWriterUtils$.validateTableConfig(HoodieWriterUtils.scala:211) at org.apache.hudi.HoodieSparkSqlWriter$.writeInternal(HoodieSparkSqlWriter.scala:177) at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:132) at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:150) at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:47) : org.apache.hudi.exception.HoodieException: Config conflict(key current value existing value): hoodie.database.name: datalake_dev1_entp_cds at org.apache.hudi.HoodieWriterUtils$.validateTableConfig(HoodieWriterUtils.scala:211) at org.apache.hudi.HoodieSparkSqlWriter$.writeInternal(HoodieSparkSqlWriter.scala:177) at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:132) at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:150)

ad1happy2go commented 8 months ago

@SudhirSaxena As the error suggests, you are writing on existing hudi table for which database name is different. Can you confirm value for tgt_db.

org.apache.hudi.exception.HoodieException: Config conflict(key current value existing value):
hoodie.database.name: datalake_dev1_entp_cds
SudhirSaxena commented 8 months ago

tgt_db value is same datalake_dev1_entp_cds, which i am using for insert operation and upsert operation.

Currently i am facing another issue " Caused by: java.lang.StackOverflowError " so i have increased driver stack spark.driver.extraJavaOptions=-Xss512m but still job is running longer and getting failed after 1-2 hours, when i am running same job first time(as insert mode) so working fine and completing within 30 min. do we need to add any additional configuration to handle upsert operation with append mode.

spark-submit --deploy-mode client --jars s3:///jars/Jars_Latest/hudi-spark3.4-bundle_2.12-0.14.0.jar,s3:///jars/Jars_Latest/spark-avro_2.13-3.5.0.jar --conf spark.sql.autoBroadcastJoinThreshold=-1 --conf spark.executors.memory=25G --conf spark.driver.memory=25G --conf spark.executors.cores=5 --conf spark.driver.cores=5 --conf spark.executor.instances=17 --conf spark.driver.maxResultSize=10G --conf spark.yarn.executor.memoryOverhead=5G --conf spark.sql.inMemoryColumnarStorage.compressed=true --conf spark.sql.files.maxPartitionBytes=2097152 --conf spark.executor.memoryOverhead=5G --conf spark.executor.extraJavaOptions=-Xss128m --conf spark.driver.extraJavaOptions=-Xss512m s3:///s3_to_hudi/res/RES_PNR_H_RLI.py --config s3:///utilities/configs/hudi_config_14.ini --env dev

error message at calling main dataframe : An error occurred while calling o1777.showString. : java.util.concurrent.ExecutionException: Boxed Error at scala.concurrent.impl.Promise$.resolver(Promise.scala:87) at scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolveTry(Promise.scala:79) at scala.concurrent.impl.Promise$KeptPromise$.apply(Promise.scala:406) at scala.concurrent.Promise$.fromTry(Promise.scala:142) at scala.concurrent.Promise$.failed(Promise.scala:128) at scala.concurrent.Future$.failed(Future.scala:624) at org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.$anonfun$materializeFuture$1(ShuffleExchangeExec.scala:104) at org.apache.spark.sql.util.LazyValue.getOrInit(LazyValue.scala:41) at org.apache.spark.sql.execution.exchange.Exchange.getOrInitMaterializeFuture(Exchange.scala:69) at org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.materializeFuture(ShuffleExchangeExec.scala:100) at org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.materialize(ShuffleExchangeExec.scala:88) at org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.materialize$(ShuffleExchangeExec.scala:87) at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.materialize(ShuffleExchangeExec.scala:140) at org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.doMaterialize(QueryStageExec.scala:182) at org.apache.spark.sql.execution.adaptive.QueryStageExec.$anonfun$materialize$1(QueryStageExec.scala:77) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:265) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:262) at org.apache.spark.sql.execution.adaptive.QueryStageExec.materialize(QueryStageExec.scala:75) at org.apache.spark.sql.execution.adaptive.MaterializeExecutable.tryStart(AdaptiveExecutable.scala:370) at org.apache.spark.sql.execution.adaptive.AdaptiveExecutorRuntime.startChild(AdaptiveExecutor.scala:232) at org.apache.spark.sql.execution.adaptive.ExecutionHelper.doStart(ExecutionHelper.scala:94) at org.apache.spark.sql.execution.adaptive.ExecutionHelper.start(ExecutionHelper.scala:50) at org.apache.spark.sql.execution.adaptive.QueryStageExecutable$$anon$2.$anonfun$new$1(AdaptiveExecutable.scala:251) at org.apache.spark.sql.execution.adaptive.ExecutionHelper.$anonfun$startAll$2(ExecutionHelper.scala:81) at org.apache.spark.sql.execution.adaptive.ExecutionHelper.$anonfun$startAll$2$adapted(ExecutionHelper.scala:78) at org.apache.spark.sql.execution.adaptive.ExecutionHelper$Listener.$anonfun$onChildSuccess$2(ExecutionHelper.scala:102) at org.apache.spark.sql.execution.adaptive.ExecutionHelper$Listener.$anonfun$onChildSuccess$2$adapted(ExecutionHelper.scala:101) at scala.Option.foreach(Option.scala:407) at org.apache.spark.sql.execution.adaptive.ExecutionHelper$Listener.$anonfun$onChildSuccess$1(ExecutionHelper.scala:101) at org.apache.spark.sql.execution.adaptive.ExecutionHelper$Listener.$anonfun$onChildSuccess$1$adapted(ExecutionHelper.scala:100) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.sql.execution.adaptive.ExecutionHelper$Listener.onChildSuccess(ExecutionHelper.scala:100) at org.apache.spark.sql.execution.adaptive.AdaptiveExecutorRuntime.$anonfun$onActiveChildSuccess$2(AdaptiveExecutor.scala:321) at org.apache.spark.sql.execution.adaptive.AdaptiveExecutorRuntime.$anonfun$onActiveChildSuccess$2$adapted(AdaptiveExecutor.scala:321) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.sql.execution.adaptive.AdaptiveExecutorRuntime.onActiveChildSuccess(AdaptiveExecutor.scala:321) at org.apache.spark.sql.execution.adaptive.AdaptiveExecutorRuntime.onChildSuccess(AdaptiveExecutor.scala:291) at org.apache.spark.sql.execution.adaptive.AdaptiveExecutor.$anonfun$doRun$1(AdaptiveExecutor.scala:92) at org.apache.spark.sql.execution.adaptive.AdaptiveExecutor.$anonfun$doRun$1$adapted(AdaptiveExecutor.scala:91) at scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149) at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237) at scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44) at scala.collection.mutable.HashMap.foreach(HashMap.scala:149) at org.apache.spark.sql.execution.adaptive.AdaptiveExecutor.doRun(AdaptiveExecutor.scala:91) at org.apache.spark.sql.execution.adaptive.AdaptiveExecutor.tryRunningAndGetFuture(AdaptiveExecutor.scala:66) at org.apache.spark.sql.execution.adaptive.AdaptiveExecutor.execute(AdaptiveExecutor.scala:57) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:275) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:274) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:556) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:518) at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4244) at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3205) at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4234) at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:570) at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4232) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107) at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:250) at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:123) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$9(SQLExecution.scala:160) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107) at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:250) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$8(SQLExecution.scala:160) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:271) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:159) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:69) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4232) at org.apache.spark.sql.Dataset.head(Dataset.scala:3205) at org.apache.spark.sql.Dataset.take(Dataset.scala:3426) at org.apache.spark.sql.Dataset.getRows(Dataset.scala:286) at org.apache.spark.sql.Dataset.showString(Dataset.scala:325) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182) at py4j.ClientServerConnection.run(ClientServerConnection.java:106) at java.lang.Thread.run(Thread.java:750) Caused by: java.lang.StackOverflowError at org.apache.spark.sql.catalyst.expressions.CodeGeneratorWithInterpretedFallback.createObject(CodeGeneratorWithInterpretedFallback.scala:53)

soumilshah1995 commented 8 months ago

I hope this message finds you well. I have a question regarding the Bloom index in Hudi. Were you using the Bloom index previously, and have you recently changed the index type to a record-level index (RLI)? I am curious about the scenario where a Hudi table already exists, and you are attempting to modify the index type. My understanding is that this action could result in an error unless it is a completely new table with an RLI. Could you kindly confirm if my interpretation is accurate or provide any additional insights on this matter? Your assistance is greatly appreciated

SudhirSaxena commented 8 months ago

Hi @soumilshah1995 , yes, we are currently using bloom index (below hudi configuration given) hudi lower version(0.10) and it's working but somewhat slow performance, so we are trying to use RLI to improve performance. as of now, we are not integrating with existing Hudi table, and trying to load in new table to see how it works, if it will work as expected then will see later how to integrate or use for new tables.

spark command: spark-submit --deploy-mode client --jars s3:///jars/hudi-spark3.1.2-bundle_2.12-0.10.1.jar,s3:///jars/spark-avro_2.12-3.1.2.jar --conf spark.sql.autoBroadcastJoinThreshold=-1 --conf spark.executors.memory=25G --conf spark.driver.memory=25G --conf spark.executors.cores=5 --conf spark.driver.cores=5 --conf spark.executor.instances=21 --conf spark.driver.maxResultSize=5G --conf spark.yarn.executor.memoryOverhead=5G --conf spark.sql.inMemoryColumnarStorage.compressed=true --conf spark.sql.files.maxPartitionBytes=2097152 --conf spark.executor.memoryOverhead=5G --conf spark.executor.defaultJavaOptions=-XX:+UseG1GC --conf spark.driver.defaultJavaOptions=-XX:+UseG1GC s3:///RES_PAX_AIR.py --config s3:///utilities/configs/hudi_config.ini --env dev

hudi_config = { "className":"org.apache.hudi", "hoodie.table.name": tgt_tbl, "hoodie.datasource.write.recordkey.field": "id", "hoodie.datasource.write.precombine.field": "eff_fm_cent_tz", "hoodie.datasource.write.operation": "upsert", "hoodie.datasource.write.keygenerator.class": "org.apache.hudi.keygen.ComplexKeyGenerator", "hoodie.datasource.write.partitionpath.field": "year,month", "hoodie.datasource.hive_sync.support_timestamp": "true", "hoodie.datasource.hive_sync.enable": "true", "hoodie.datasource.hive_sync.assume_date_partitioning": "false", "hoodie.datasource.hive_sync.table": tgt_tbl, "hoodie.datasource.hive_sync.use_jdbc": "false", "hoodie.datasource.hive_sync.mode": "hms", "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor", "hoodie.datasource.write.hive_style_partitioning": "true", "hoodie.upsert.shuffle.parallelism": hudi_upsert_parallelism, "hoodie.delete.shuffle.parallelism": hudi_delete_parallelism, "hoodie.bulkinsert.sort.mode": "GLOBAL_SORT", "hoodie.index.type": "BLOOM", "hoodie.metadata.enable": "true", "hoodie.metadata.index.bloom.filter.enable": "true", "hoodie.metadata.index.bloom.filter.parallelism": hudi_filter_parallelism, "hoodie.metadata.index.bloom.filter.column.list": "id", "hoodie.bloom.index.use.metadata": "true", "hoodie.metadata.index.column.stats.enable": "true", "hoodie.metadata.index.column.stats.column.list": "res_sys_id,pnr_rec_loc_id,pnr_cre_dt",
"hoodie.enable.data.skipping": "true" }

upsert: hudi_operation = "upsert" hudi_write_mode = "append" hudi_config["className"] = "org.apache.hudi" hudi_config["hoodie.table.keygenerator.class"]="org.apache.hudi.keygen.ComplexKeyGenerator" hudi_config["hoodie.database.name"]=tgt_db
hudi_config["hoodie.table.name"] = tgt_tbl hudi_config["hoodie.datasource.write.recordkey.field"] = "id" hudi_config["hoodie.datasource.write.operation"] = hudi_operation hudi_config["hoodie.datasource.hive_sync.table"] = tgt_tbl hudi_config["hoodie.datasource.hive_sync.database"] = tgt_db print("Upserting records into " + tgt_tbl + " Hudi table")

bulk_insert: hudi_operation = "bulk_insert" hudi_write_mode = "overwrite" hudi_config["className"] = "org.apache.hudi" hudi_config["hoodie.table.name"] = tgt_tbl hudi_config["hoodie.datasource.write.operation"] = hudi_operation hudi_config["hoodie.datasource.hive_sync.table"] = tgt_tbl hudi_config["hoodie.datasource.write.recordkey.field"] = "id" hudi_config["hoodie.datasource.hive_sync.database"] = tgt_db

soumilshah1995 commented 8 months ago

a potential issue that may have arisen during the transition from the Bloom index to the Record Level index. It seems that the Record Level index was not built on a fresh table, which could be the reason for the error you are encountering. I kindly suggest creating a new table with the Index Type set to Record Level index to address this issue.

SudhirSaxena commented 8 months ago

i have done some changes in configuration so job is not failing for hoodie.database.name: datalake_dev1_entp_cds now but job is running longer and not completing. it's getting stuck where trying to fetch data from Hudi target table and making join with source dataframe view. below query is keep running and not moving at all so i cancelled the job multiple times as it's stuck 2-3 hours in this query. Since this query and existing job is running more than 6 months using bloom index(hudi 0.10) so i believe there is no issues in query, also this spark.sql query is very straight and running fine when first time job is running as insert operation. Can you please suggest why it's creating issues for upsert operation, do i need to add any additional configuration for upsert, or what could be the reason not to run the job. thanks

src_res_pnr_hoodie_record_key_vw - source dataframe view

res_pnr_df=spark.sql("select tgt.* from "+tgt_db+"."+tgt_tbl+" tgt INNER JOIN ( select distinct res_sys_id, pnr_rec_loc_id, pnr_cre_dt, year,month from src_res_pnr_hoodie_record_key_vw ) src ON tgt.RES_SYS_ID = src.RES_SYS_ID AND tgt.PNR_REC_LOC_ID = src.PNR_REC_LOC_ID AND tgt.PNR_CRE_DT = src.PNR_CRE_DT and tgt.year = src.year and tgt.month = src.month and tgt._hoodie_partition_path=concat('year=',src.year,'/','month=',src.month) and tgt.year>='2021' ").drop("src_hoodie_record_key").drop("_hoodie_commit_seqno").drop("_hoodie_commit_time").drop("_hoodie_file_name").drop("_hoodie_partition_path").drop("_hoodie_record_key")

soumilshah1995 commented 8 months ago

didn't you mentioned you have changed the index type Bloom -> RLI and your job was failing in order to use RLI I am saying you need to use fresh table

can you paste also hoodie.properties file

SudhirSaxena commented 8 months ago

Hi @soumilshah1995 , i am not using any existing hudi table to do transition from Bloom index to the Record Level index. since it's new version so have to test first, hence i am trying to implement with fresh new table.

  1. i am able to insert the data into Hudi table(fresh new table) using Record level index (hudi 0.14 with EMR 6.15). it takes max 20-25 min to complete the job.
  2. when i am running 2nd time to perform upsert operation, it's keep running and not at all completing, looks like job is not able to fetch the data from Hudi target table, and keep running, i have cancelled the job after 2-3 hours, it's stuck the same df more then 2 hours.

Hudi properties for Record level index :

hudi_config = { "className":"org.apache.hudi", "hoodie.table.name": tgt_tbl, "hoodie.datasource.write.recordkey.field": "id", "hoodie.datasource.write.precombine.field": "eff_fm_cent_tz", "hoodie.datasource.write.keygenerator.class": "org.apache.hudi.keygen.ComplexKeyGenerator", "hoodie.datasource.write.partitionpath.field": "year,month", "hoodie.datasource.hive_sync.support_timestamp": "true", "hoodie.datasource.hive_sync.enable": "true", "hoodie.datasource.hive_sync.assume_date_partitioning": "false", "hoodie.datasource.hive_sync.table": tgt_tbl, "hoodie.datasource.hive_sync.use_jdbc": "false", "hoodie.datasource.hive_sync.mode": "hms", "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor", "hoodie.datasource.write.hive_style_partitioning": "true", "hoodie.upsert.shuffle.parallelism": hudi_upsert_parallelism, "hoodie.delete.shuffle.parallelism": hudi_delete_parallelism, "hoodie.bulkinsert.sort.mode": "GLOBAL_SORT", "hoodie.metadata.enable": "true", "hoodie.metadata.record.index.enable": "true", "hoodie.index.type": "RECORD_INDEX", "hoodie.metadata.index.column.stats.column.list": "res_sys_id,pnr_rec_loc_id,pnr_cre_dt",
"hoodie.enable.data.skipping": "true" }

insert operation: hudi_operation = "bulk_insert" hudi_write_mode = "overwrite" hudi_config["className"] = "org.apache.hudi" hudi_config["hoodie.table.name"] = tgt_tbl hudi_config["hoodie.database.name"] = tgt_db
hudi_config["hoodie.datasource.write.operation"] = hudi_operation hudi_config["hoodie.datasource.hive_sync.table"] = tgt_tbl hudi_config["hoodie.datasource.write.recordkey.field"] = "id" hudi_config["hoodie.datasource.hive_sync.database"] = tgt_db hudi_config["hoodie.index.type"] = "RECORD_INDEX" hudi_config["hoodie.metadata.enable"] = "true" hudi_config["hoodie.datasource.write.table.name"] = tgt_tbl hudi_config["hoodie.datasource.write.precombine.field"] = "eff_fm_cent_tz" hudi_config["hoodie.metadata.record.index.enable"] = "true" res_pnr_df.write.format("org.apache.hudi").options(**hudi_config).mode(hudi_write_mode).save(hudi_tbl_path + tgt_tbl)

Upsert operation config: hudi_operation = "upsert" hudi_write_mode = "append" hudi_config["className"] = "org.apache.hudi" hudi_config["hoodie.table.keygenerator.class"] = "org.apache.hudi.keygen.ComplexKeyGenerator"
hudi_config["hoodie.table.name"] = tgt_tbl hudi_config["hoodie.database.name"] = tgt_db
hudi_config["hoodie.datasource.write.recordkey.field"] = "id" hudi_config["hoodie.datasource.write.operation"] = hudi_operation hudi_config["hoodie.datasource.hive_sync.table"] = tgt_tbl hudi_config["hoodie.datasource.hive_sync.database"] = tgt_db hudi_config["hoodie.index.type"] = "RECORD_INDEX" hudi_config["hoodie.metadata.enable"] = "true" hudi_config["hoodie.datasource.write.table.name"] = tgt_tbl hudi_config["hoodie.datasource.write.precombine.field"] = "eff_fm_cent_tz" hudi_config["hoodie.metadata.record.index.enable"] = "true" res_pnr_df.write.format("org.apache.hudi").options(**hudi_config).mode("append").save(hudi_tbl_path + tgt_tbl)

soumilshah1995 commented 8 months ago

Got it got it

Can we do a small test here Can you run job in UPSERT mode and not INSERT at start and then see if it makes any difference

I know UPSERT fresh table RLI index would be build for sure could we do this test ?

SudhirSaxena commented 8 months ago

so you mean i shouldn't give bulk_insert operation option in job, just run to check for fresh table. hudi_operation = "bulk_insert"

one question : for testing upsert only for new table, first time i will run as upsert hudi operation and if job completes then again i will run as upsert operation . is that correct ?

soumilshah1995 commented 8 months ago

yes without bulk_insert use UPSERT remove settings bulk_insert settings

SudhirSaxena commented 8 months ago

@soumilshah1995 @ad1happy2go i have run this job as upsert operation first time to load the data into fresh Hudi table and it's completed in 7 minutes(attached screenshot). I have verified data into Hudi table and it's 2003 records.

now, when i am running again as upsert operation to do actual upsert so job is keep running past 40 mins and not finishing.not sure why upsert operation is creating an issues and how to fix this issues. Can you please help me on this to fix ?

job_RLI_logs
SudhirSaxena commented 8 months ago

@soumilshah1995 @ad1happy2go @nsivabalan just sharing hoodie.properties and metadata/hoodie.properties. I see bydefault it's taking hoodie.table.type=MERGE_ON_READ in .hoodie/metadata/.hoodie/hoodie.properties and hoodie.table.type=COPY_ON_WRITE in .hoodie/hoodie.properties . please have a look if there is any changes required.

*s3:///res_pnr_upsert_test/.hoodie/hoodie.properties**

Updated at 2024-01-31T00:02:47.700Z

Wed Jan 31 00:02:47 UTC 2024

hoodie.table.timeline.timezone=LOCAL hoodie.table.keygenerator.class=org.apache.hudi.keygen.ComplexKeyGenerator hoodie.table.precombine.field=eff_fm_cent_tz hoodie.table.version=6 hoodie.database.name=datalake_dev1_entp_cds hoodie.datasource.write.hive_style_partitioning=true hoodie.table.metadata.partitions.inflight= hoodie.table.checksum=966479762 hoodie.partition.metafile.use.base.format=false hoodie.table.cdc.enabled=false hoodie.archivelog.folder=archived hoodie.table.name=res_pnr_upsert_test hoodie.populate.meta.fields=true hoodie.table.type=COPY_ON_WRITE hoodie.datasource.write.partitionpath.urlencode=false hoodie.table.base.file.format=PARQUET hoodie.datasource.write.drop.partition.columns=false hoodie.table.metadata.partitions=files,record_index hoodie.timeline.layout.version=1 hoodie.table.partition.fields=year,month hoodie.table.recordkey.fields=id

*s3:///res_pnr_upsert_test/.hoodie/metadata/.hoodie/hoodie.properties**

Properties saved on 2024-01-31T00:02:33.236Z

Wed Jan 31 00:02:33 UTC 2024

hoodie.datasource.write.drop.partition.columns=false hoodie.table.type=MERGE_ON_READ hoodie.archivelog.folder=archived hoodie.populate.meta.fields=false hoodie.compaction.payload.class=org.apache.hudi.metadata.HoodieMetadataPayload hoodie.timeline.layout.version=1 hoodie.table.version=6 hoodie.table.base.file.format=HFILE hoodie.table.recordkey.fields=key hoodie.table.name=res_pnr_upsert_test_metadata hoodie.table.keygenerator.class=org.apache.hudi.metadata.HoodieTableMetadataKeyGenerator hoodie.compaction.record.merger.strategy=eeb8d96f-b1e4-49fd-bbf8-28ac514178e5 hoodie.table.checksum=2433971754

ad1happy2go commented 8 months ago

@SudhirSaxena Metadata table is always MOR, so that's expected. Can you identify from spark UI where exactly the job is taking time.

SudhirSaxena commented 8 months ago

Hi @ad1happy2go , @soumilshah1995 ,@nsivabalan

I am trying to see where job is getting stuck . i see driver which is in Executor id summary (below screenshot) is running more than 1 hour and not moving. I am not sure what could be the reason and why it's happening. any idea why it's happening and how to resolve this to run the job for upsert operation using Record level index in hudi 0.14 in EMR 6.15. appreciate your help on this.

Executor ID Address Status RDD Blocks Storage Memory Disk Used Cores Active Tasks Failed Tasks Complete Tasks Total Tasks Task Time (GC Time) Input Shuffle Read Shuffle Write Logs
driver * Active 0 0.0 B / 15.8 GiB 0.0 B 0 0 0 0 0 1.2 h (0.0 ms) 0.0 B 0.0 B 0.0 B
spark-4 spark-log3 spark-log2 spark-6 spark-5
ad1happy2go commented 8 months ago

That's strange! looks like it has stalled on driver. Can you check driver logs during this time.

SudhirSaxena commented 8 months ago

Where can i find driver logs, i don't see any option to get the driver logs?

while checking all the logs, i am noticing, there were 9 executors created but 2 of them ran and finished the task. looks like drive is only active. i see task are getting created and finished without any execution as per stderr in EMR steps. but as per spark-ui, none of the executors are running and driver is showing only active. do you have any idea why it's happening and how to resolve this issues.

24/01/31 18:46:27 INFO TaskSetManager: Starting task 5.0 in stage 13.0 (TID 272) (ip--19-171.ec2.internal, executor 2, partition 5, RACK_LOCAL, 8203 bytes) 24/01/31 18:46:27 INFO TaskSetManager: Finished task 11.0 in stage 12.0 (TID 254) in 202 ms on ip--19-171.ec2.internal (executor 2) (5/24) 24/01/31 18:46:27 INFO TaskSetManager: Starting task 6.0 in stage 13.0 (TID 273) (ip--18-33.ec2.internal, executor 4, partition 6, RACK_LOCAL, 8203 bytes) 24/01/31 18:46:27 INFO TaskSetManager: Finished task 22.0 in stage 11.0 (TID 241) in 350 ms on ip--18-33.ec2.internal (executor 4) (23/24) 24/01/31 18:46:27 INFO TaskSetManager: Starting task 7.0 in stage 13.0 (TID 274) (ip--18-154.ec2.internal, executor 6, partition 7, RACK_LOCAL, 8203 bytes) 24/01/31 18:46:27 INFO TaskSetManager: Finished task 5.0 in stage 12.0 (TID 248) in 279 ms on ip--18-154.ec2.internal (executor 6) (6/24) 24/01/31 18:46:27 INFO BlockManagerInfo: Added broadcast_37_piece0 in memory on ip--18-33.ec2.internal:37841 (size: 44.1 KiB, free: 2.3 GiB) 24/01/31 18:46:27 INFO BlockManagerInfo: Added broadcast_35_piece0 in memory on ip--18-154.ec2.internal:33879 (size: 46.2 KiB, free: 2.3 GiB) 24/01/31 18:46:27 INFO TaskSetManager: Starting task 8.0 in stage 13.0 (TID 275) (ip--18-167.ec2.internal, executor 5, partition 8, RACK_LOCAL, 8203 bytes) 24/01/31 18:46:27 INFO TaskSetManager: Finished task 10.0 in stage 12.0 (TID 253) in 229 ms on ip--18-167.ec2.internal (executor 5) (7/24) 24/01/31 18:46:27 INFO BlockManagerInfo: Added broadcast_35_piece0 in memory on ip--19-171.ec2.internal:36785 (size: 46.2 KiB, free: 2.3 GiB) 24/01/31 18:46:27 INFO BlockManagerInfo: Added broadcast_35_piece0 in memory on ip--18-33.ec2.internal:41631 (size: 46.2 KiB, free: 2.3 GiB) 24/01/31 18:46:27 INFO TaskSetManager: Starting task 9.0 in stage 13.0 (TID 276) (ip--19-171.ec2.internal, executor 1, partition 9, RACK_LOCAL, 8203 bytes) 24/01/31 18:46:27 INFO TaskSetManager: Finished task 0.0 in stage 12.0 (TID 243) in 346 ms on ip--19-171.ec2.internal (executor 1) (8/24) 24/01/31 18:46:27 INFO BlockManagerInfo: Added broadcast_37_piece0 in memory on ip--18-167.ec2.internal:39949 (size: 44.1 KiB, free: 2.3 GiB) 24/01/31 18:46:27 INFO BlockManagerInfo: Added broadcast_37_piece0 in memory on ip--19-171.ec2.internal:34997 (size: 44.1 KiB, free: 2.3 GiB) 24/01/31 18:46:27 INFO TaskSetManager: Starting task 10.0 in stage 13.0 (TID 277) (ip--18-33.ec2.internal, executor 4, partition 10, RACK_LOCAL, 8203 bytes) 24/01/31 18:46:27 INFO TaskSetManager: Finished task 6.0 in stage 12.0 (TID 249) in 316 ms on ip--18-33.ec2.internal (executor 4) (9/24) 24/01/31 18:46:27 INFO TaskSetManager: Starting task 11.0 in stage 13.0 (TID 278) (ip--18-167.ec2.internal, executor 5, partition 11, RACK_LOCAL, 8203 bytes) 24/01/31 18:46:27 INFO TaskSetManager: Finished task 7.0 in stage 11.0 (TID 226) in 638 ms on ip--18-167.ec2.internal (executor 5) (24/24) 24/01/31 18:46:27 INFO YarnScheduler: Removed TaskSet 11.0, whose tasks have all completed, from pool 24/01/31 18:46:27 INFO TaskSetManager: Starting task 12.0 in stage 13.0 (TID 279) (ip--18-167.ec2.internal, executor 5, partition 12, RACK_LOCAL, 8203 bytes) 24/01/31 18:46:27 INFO DAGScheduler: ShuffleMapStage 11 (parquet at NativeMethodAccessorImpl.java:0) finished in 7.215 s 24/01/31 18:46:27 INFO DAGScheduler: looking for newly runnable stages 24/01/31 18:46:27 INFO DAGScheduler: running: Set(ShuffleMapStage 15, ShuffleMapStage 30, ShuffleMapStage 16, ShuffleMapStage 24, ShuffleMapStage 18, ShuffleMapStage 26, ShuffleMapStage 12, ShuffleMapStage 13, ShuffleMapStage 20, ShuffleMapStage 28, ShuffleMapStage 22, ShuffleMapStage 14) 24/01/31 18:46:27 INFO TaskSetManager: Finished task 1.0 in stage 12.0 (TID 244) in 377 ms on ip--18-167.ec2.internal (executor 5) (10/24) 24/01/31 18:46:27 INFO DAGScheduler: waiting: Set() 24/01/31 18:46:27 INFO DAGScheduler: failed: Set() 24/01/31 18:46:27 INFO BlockManagerInfo: Added broadcast_35_piece0 in memory on ip--18-33.ec2.internal:37841 (size: 46.2 KiB, free: 2.3 GiB) 24/01/31 18:46:27 INFO TaskSetManager: Starting task 13.0 in stage 13.0 (TID 280) (ip--18-33.ec2.internal, executor 9, partition 13, RACK_LOCAL, 8203 bytes) 24/01/31 18:46:27 INFO TaskSetManager: Finished task 13.0 in stage 12.0 (TID 256) in 243 ms on ip--18-33.ec2.internal (executor 9) (11/24) 24/01/31 18:46:27 INFO TaskSetManager: Starting task 14.0 in stage 13.0 (TID 281) (ip--18-33.ec2.internal, executor 4, partition 14, RACK_LOCAL, 8203 bytes) 24/01/31 18:46:27 INFO TaskSetManager: Finished task 16.0 in stage 12.0 (TID 259) in 231 ms on ip--18-33.ec2.internal (executor 4) (12/24) 24/01/31 18:46:27 INFO BlockManagerInfo: Added broadcast_35_piece0 in memory on ip--19-171.ec2.internal:34997 (size: 46.2 KiB, free: 2.3 GiB) 24/01/31 18:46:27 INFO BlockManagerInfo: Added broadcast_35_piece0 in memory on ip--18-167.ec2.internal:39949 (size: 46.2 KiB, free: 2.3 GiB) 24/01/31 18:46:27 INFO TaskSetManager: Starting task 15.0 in stage 13.0 (TID 282) (.internal, executor 1, partition 15, RACK_LOCAL, 8203 bytes) 24/01/31 18:46:27 INFO TaskSetManager: Finished task 12.0 in stage 12.0 (TID 255) in 276 ms on ip--19-171.ec2.internal (executor 1) (13/24) 24/01/31 18:46:27 INFO TaskSetManager: Starting task 16.0 in stage 13.0 (TID 283) (ip--18-33.ec2.internal, executor 3, partition 16, RACK_LOCAL, 8203 bytes) 24/01/31 18:46:27 INFO TaskSetManager: Finished task 9.0 in stage 12.0 (TID 252) in 320 ms on ip--18-33.ec2.internal (executor 3) (14/24) 24/01/31 18:46:27 INFO BlockManagerInfo: Added broadcast_37_piece0 in memory on ip--18-33.ec2.internal:35711 (size: 44.1 KiB, free: 2.3 GiB) 24/01/31 18:46:27 INFO TaskSetManager: Starting task 17.0 in stage 13.0 (TID 284) (ip--19-153.ec2.internal, executor 7, partition 17, RACK_LOCAL, 8203 bytes) 24/01/31 18:46:27 INFO TaskSetManager: Finished task 19.0 in stage 12.0 (TID 262) in 245 ms on ip--19-153.ec2.internal (executor 7) (15/24) 24/01/31 18:46:27 INFO TaskSetManager: Starting task 18.0 in stage 13.0 (TID 285) (ip--19-153.ec2.internal, executor 7, partition 18, RACK_LOCAL, 8203 bytes) 24/01/31 18:46:27 INFO TaskSetManager: Finished task 20.0 in stage 12.0 (TID 263) in 239 ms on ip--19-153.ec2.internal (executor 7) (16/24) 24/01/31 18:46:27 INFO TaskSetManager: Starting task 19.0 in stage 13.0 (TID 286) (ip--19-171.ec2.internal, executor 2, partition 19, RACK_LOCAL, 8203 bytes) 24/01/31 18:46:27 INFO TaskSetManager: Finished task 14.0 in stage 12.0 (TID 257) in 302 ms on ip--19-171.ec2.internal (executor 2) (17/24) 24/01/31 18:46:27 INFO BlockManagerInfo: Added broadcast_37_piece0 in memory on ip--19-153.ec2.internal:37591 (size: 44.1 KiB, free: 2.3 GiB) 24/01/31 18:46:27 INFO TaskSetManager: Starting task 20.0 in stage 13.0 (TID 287) (ip--19-171.ec2.internal, executor 8, partition 20, RACK_LOCAL, 8203 bytes) 24/01/31 18:46:27 INFO TaskSetManager: Finished task 22.0 in stage 12.0 (TID 265) in 189 ms on ip--19-171.ec2.internal (executor 8) (18/24) 24/01/31 18:46:27 INFO TaskSetManager: Starting task 21.0 in stage 13.0 (TID 288) (ip--18-33.ec2.internal, executor 3, partition 21, RACK_LOCAL, 8203 bytes) 24/01/31 18:46:27 INFO TaskSetManager: Finished task 8.0 in stage 12.0 (TID 251) in 353 ms on ip--18-33.ec2.internal (executor 3) (19/24) 24/01/31 18:46:27 INFO TaskSetManager: Starting task 22.0 in stage 13.0 (TID 289) (ip--18-33.ec2.internal, executor 3, partition 22, RACK_LOCAL, 8203 bytes) 24/01/31 18:46:27 INFO TaskSetManager: Finished task 15.0 in stage 12.0 (TID 258) in 305 ms on ip--18-33.ec2.internal (executor 3) (20/24) 24/01/31 18:46:27 INFO BlockManagerInfo: Added broadcast_37_piece0 in memory on ip--19-171.ec2.internal:43677 (size: 44.1 KiB, free: 2.3 GiB) 24/01/31 18:46:27 INFO BlockManagerInfo: Added broadcast_35_piece0 in memory on ip--18-33.ec2.internal:35711 (size: 46.2 KiB, free: 2.3 GiB) 24/01/31 18:46:27 INFO TaskSetManager: Starting task 23.0 in stage 13.0 (TID 290) (ip--19-153.ec2.internal, executor 7, partition 23, RACK_LOCAL, 8203 bytes) 24/01/31 18:46:27 INFO TaskSetManager: Finished task 23.0 in stage 12.0 (TID 266) in 227 ms on ip--19-153.ec2.internal (executor 7) (21/24) 24/01/31 18:46:27 INFO BlockManagerInfo: Added broadcast_35_piece0 in memory on ip--19-171.ec2.internal:43677 (size: 46.2 KiB, free: 2.3 GiB) 24/01/31 18:46:27 INFO BlockManagerInfo: Added broadcast_35_piece0 in memory on ip--19-153.ec2.internal:37591 (size: 46.2 KiB, free: 2.3 GiB) 24/01/31 18:46:27 INFO TaskSetManager: Starting task 0.0 in stage 14.0 (TID 291) (ip--19-171.ec2.internal, executor 1, partition 0, RACK_LOCAL, 8203 bytes) 24/01/31 18:46:27 INFO TaskSetManager: Finished task 21.0 in stage 12.0 (TID 264) in 277 ms on ip-*-19-171.ec2.internal (executor 1) (22/24)

ad1happy2go commented 8 months ago

@SudhirSaxena The above are driver logs only. But can you see what logs it is printing during that one hour period. We can sync up on slack and connect on call also to understand it more.

SudhirSaxena commented 8 months ago

@ad1happy2go , it's printing the same which i have posted above even for 2-3 hours, creating task, finishing task but there is no actual execution happening, even there is no executor_id active in spark-UI , sure, we can connect over call. Can you please schedule time for call, i will connect or let me know how to connect so that i can do working session with you/your team. thanks

SudhirSaxena commented 7 months ago

Hi @ad1happy2go , @soumilshah1995 ,@nsivabalan, thought of checking for Hudi upsert operation issues, did you get a chance to check on this. i am still facing the same issues, insert is running fine, when i am running 2nd time same job to perform upsert operation, it's running for forever and not completing. looking like driver is running, creating task and finishing but executor side no task is getting creating so executor is idle it seems as per Spark-ui. will you be able to help on this. appreciate for your help.

SudhirSaxena commented 7 months ago

@ad1happy2go as discussed for this issue, please find the driver logs, 24/02/06 05:04:07 INFO MemoryStore: 2 blocks selected for dropping (468.0 KiB bytes) 24/02/06 05:04:07 INFO BlockManager: Dropping block broadcast_12999_piece0 from memory 24/02/06 05:04:07 INFO BlockManager: Writing block broadcast_12999_piece0 to disk 24/02/06 05:04:07 INFO BlockManagerInfo: Updated broadcast_12999_piece0 on disk on ip-10-156-17-116.ec2.internal:35559 (current size: 47.4 KiB, original size: 0.0 B) 24/02/06 05:04:07 INFO BlockManager: Dropping block broadcast_13000 from memory 24/02/06 05:04:07 INFO BlockManager: Writing block broadcast_13000 to disk 24/02/06 05:04:07 INFO MemoryStore: After dropping 2 blocks, free memory is 1342.0 KiB 24/02/06 05:04:07 INFO MemoryStore: Block broadcast_48457 stored as values in memory (estimated size 420.5 KiB, free 921.5 KiB) 24/02/06 05:04:07 INFO MemoryStore: Block broadcast_48457_piece0 stored as bytes in memory (estimated size 47.4 KiB, free 874.0 KiB) 24/02/06 05:04:07 INFO BlockManagerInfo: Added broadcast_48457_piece0 in memory on ip-10-156-17-116.ec2.internal:35559 (size: 47.4 KiB, free: 14.2 GiB) 24/02/06 05:04:07 INFO SparkContext: Created broadcast 48457 from take at /mnt/tmp/spark-7f0d7e5b-8199-405d-aadf-6da6fd1c5cd0/RES_PNR_prod_HRLI.py:1191 24/02/06 05:04:07 INFO MemoryStore: 2 blocks selected for dropping (468.0 KiB bytes) 24/02/06 05:04:07 INFO BlockManager: Dropping block broadcast_13000_piece0 from memory 24/02/06 05:04:07 INFO BlockManager: Writing block broadcast_13000_piece0 to disk 24/02/06 05:04:07 INFO BlockManagerInfo: Updated broadcast_13000_piece0 on disk on ip-10-156-17-116.ec2.internal:35559 (current size: 47.4 KiB, original size: 0.0 B) 24/02/06 05:04:07 INFO BlockManager: Dropping block broadcast_13001 from memory 24/02/06 05:04:07 INFO BlockManager: Writing block broadcast_13001 to disk 24/02/06 05:04:07 INFO MemoryStore: After dropping 2 blocks, free memory is 1342.0 KiB 24/02/06 05:04:07 INFO MemoryStore: Block broadcast_48458 stored as values in memory (estimated size 420.5 KiB, free 921.5 KiB) 24/02/06 05:04:07 INFO MemoryStore: Block broadcast_48458_piece0 stored as bytes in memory (estimated size 47.4 KiB, free 874.0 KiB) 24/02/06 05:04:07 INFO BlockManagerInfo: Added broadcast_48458_piece0 in memory on ip-10-156-17-116.ec2.internal:35559 (size: 47.4 KiB, free: 14.2 GiB) 24/02/06 05:04:07 INFO SparkContext: Created broadcast 48458 from take at /mnt/tmp/spark-7f0d7e5b-8199-405d-aadf-6da6fd1c5cd0/RES_PNR_prod_HRLI.py:1191 24/02/06 05:04:07 INFO MemoryStore: 2 blocks selected for dropping (468.0 KiB bytes) 24/02/06 05:04:07 INFO BlockManager: Dropping block broadcast_13001_piece0 from memory 24/02/06 05:04:07 INFO BlockManager: Writing block broadcast_13001_piece0 to disk 24/02/06 05:04:07 INFO BlockManagerInfo: Updated broadcast_13001_piece0 on disk on ip-10-156-17-116.ec2.internal:35559 (current size: 47.4 KiB, original size: 0.0 B) 24/02/06 05:04:07 INFO BlockManager: Dropping block broadcast_13002 from memory 24/02/06 05:04:07 INFO BlockManager: Writing block broadcast_13002 to disk 24/02/06 05:04:07 INFO MemoryStore: After dropping 2 blocks, free memory is 1342.0 KiB 24/02/06 05:04:07 INFO MemoryStore: Block broadcast_48459 stored as values in memory (estimated size 420.5 KiB, free 921.5 KiB) 24/02/06 05:04:07 INFO MemoryStore: Block broadcast_48459_piece0 stored as bytes in memory (estimated size 47.4 KiB, free 874.0 KiB) 24/02/06 05:04:07 INFO BlockManagerInfo: Added broadcast_48459_piece0 in memory on ip-10-156-17-116.ec2.internal:35559 (size: 47.4 KiB, free: 14.2 GiB) 24/02/06 05:04:07 INFO SparkContext: Created broadcast 48459 from take at /mnt/tmp/spark-7f0d7e5b-8199-405d-aadf-6da6fd1c5cd0/RES_PNR_prod_HRLI.py:1191 24/02/06 05:04:07 INFO MemoryStore: 2 blocks selected for dropping (468.0 KiB bytes) 24/02/06 05:04:07 INFO BlockManager: Dropping block broadcast_13002_piece0 from memory 24/02/06 05:04:07 INFO BlockManager: Writing block broadcast_13002_piece0 to disk 24/02/06 05:04:07 INFO BlockManagerInfo: Updated broadcast_13002_piece0 on disk on ip-10-156-17-116.ec2.internal:35559 (current size: 47.4 KiB, original size: 0.0 B) 24/02/06 05:04:07 INFO BlockManager: Dropping block broadcast_13003 from memory 24/02/06 05:04:07 INFO BlockManager: Writing block broadcast_13003 to disk 24/02/06 05:04:07 INFO MemoryStore: After dropping 2 blocks, free memory is 1342.0 KiB 24/02/06 05:04:07 INFO MemoryStore: Block broadcast_48460 stored as values in memory (estimated size 420.5 KiB, free 921.5 KiB) 24/02/06 05:04:07 INFO MemoryStore: Block broadcast_48460_piece0 stored as bytes in memory (estimated size 47.4 KiB, free 874.0 KiB) 24/02/06 05:04:07 INFO BlockManagerInfo: Added broadcast_48460_piece0 in memory on ip-10-156-17-116.ec2.internal:35559 (size: 47.4 KiB, free: 14.2 GiB) 24/02/06 05:04:07 INFO SparkContext: Created broadcast 48460 from take at /mnt/tmp/spark-7f0d7e5b-8199-405d-aadf-6da6fd1c5cd0/RES_PNR_prod_HRLI.py:1191 24/02/06 05:04:07 INFO MemoryStore: 2 blocks selected for dropping (468.0 KiB bytes) 24/02/06 05:04:07 INFO BlockManager: Dropping block broadcast_13003_piece0 from memory 24/02/06 05:04:07 INFO BlockManager: Writing block broadcast_13003_piece0 to disk 24/02/06 05:04:07 INFO BlockManagerInfo: Updated broadcast_13003_piece0 on disk on ip-10-156-17-116.ec2.internal:35559 (current size: 47.4 KiB, original size: 0.0 B) 24/02/06 05:04:07 INFO BlockManager: Dropping block broadcast_13004 from memory 24/02/06 05:04:07 INFO BlockManager: Writing block broadcast_13004 to disk

ad1happy2go commented 7 months ago

Had a conversation with @SudhirSaxena on this and looked at his setup. He is using emr-6.15 with OSS hudi 0.14.0.

  1. With RLI enabled, the upsert job is getting stuck for hours, no progress. Also no useful logs. No running stage in UI. driver Logs - Above comment.
  2. We tried with RLI disabled keeping everything else same, But similar behaviour. So, RLI may not have any issue.

    Next steps -

    • Try creating a test script which do bulk insert and insert from quickstart and see if its working.
    • Try same setup with 0.14.1 version.
    • Try degrading EMR version which supports spark 3.3. Try running hudi 0.14.0 if we see same behaviour.
    • Then just degrade hudi version to 0..12.3 which was used before and confirm if that works fine.
SudhirSaxena commented 7 months ago

Thanks @ad1happy2go , i will follow these steps, and let you know..

ad1happy2go commented 7 months ago

@SudhirSaxena Did you get a chance to try this out. Thanks in advance for all the contribution on this.

nsivabalan commented 5 months ago

hey @ad1happy2go : do let me know if we find any data consistency issues w/ MDT or RLI. thanks.