apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.34k stars 2.42k forks source link

[SUPPORT] upgrade from 0.11.1 to 0.14.0 #11126

Open ghrahul opened 4 months ago

ghrahul commented 4 months ago

Problem

We were running Spark 3.2.1 along with HUDI 0.11.1. The jar link is: https://repo1.maven.org/maven2/org/apache/hudi/hudi-spark3.2-bundle_2.12/0.11.1/hudi-spark3.2-bundle_2.12-0.11.1.jar.

We want to upgrade Spark to Spark 3.4.1 and HUDI to HUDI 0.14.0. The jar link is: https://repo1.maven.org/maven2/org/apache/hudi/hudi-spark3.4-bundle_2.12/0.14.0/hudi-spark3.4-bundle_2.12-0.14.0.jar

The HUDI configuration that we are using for both the above cases for a sample table is:

hudi_config = {

               'className': 'org.apache.hudi',
               'hoodie.datasource.hive_sync.use_jdbc': 'false',
               'hoodie.datasource.write.precombine.field': 'transact_id',
               'hoodie.datasource.write.recordkey.field': 'id,op',
               'hoodie.table.name': 'users_masteruser_spark_test_v9',
               'hoodie.consistency.check.enabled': 'false',
               'hoodie.datasource.hive_sync.table': 'users_masteruser_spark_test_v9',
               'hoodie.datasource.hive_sync.database': 'lake_luna_for_payments',
               'hoodie.datasource.hive_sync.enable': 'true',
               'hoodie.datasource.hive_sync.mode': 'hms',
               'hoodie.datasource.hive_sync.support_timestamp': 'true',
               'hoodie.datasource.write.reconcile.schema': 'true',
               'path': 's3_target_path',
               'hoodie.datasource.write.keygenerator.class': 'org.apache.hudi.keygen.ComplexKeyGenerator',
               'hoodie.datasource.write.partitionpath.field': '',
               'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor',
               'hoodie.datasource.hive_sync.partition_fields': '',
               'hoodie.datasource.write.hive_style_partitioning': 'true',
               'hoodie.upsert.shuffle.parallelism': 40,
               'hoodie.datasource.write.operation': 'upsert',
               'hoodie.cleaner.policy': 'KEEP_LATEST_COMMITS',
               'hoodie.cleaner.commits.retained': 1

              }

Step 1: Loading the table using Spark 3.2.1 along with HUDI 0.11.1. This is how all of our pipelines work.

Step 2: We want to append data in the same table using Spark 3.4.1 along with HUDI 0.14.0. Here is where we are getting an error:

ArrayIndexOutOfBoundsException            Traceback (most recent call last)
Cell In[26], line 1
----> 1 df.write.format("org.apache.hudi").mode('append').options(**hudi_config).save('s3a://trusted-luna-sbox/luna_for_payments/public/users_masteruser_spark_test_v9')

File /opt/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py:1398, in DataFrameWriter.save(self, path, format, mode, partitionBy, **options)
   1396     self._jwrite.save()
   1397 else:
-> 1398     self._jwrite.save(path)

File /opt/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py:1322, in JavaMember.__call__(self, *args)
   1316 command = proto.CALL_COMMAND_NAME +\
   1317     self.command_header +\
   1318     args_command +\
   1319     proto.END_COMMAND_PART
   1321 answer = self.gateway_client.send_command(command)
-> 1322 return_value = get_return_value(
   1323     answer, self.gateway_client, self.target_id, self.name)
   1325 for temp_arg in temp_args:
   1326     if hasattr(temp_arg, "_detach"):

File /opt/spark/python/lib/pyspark.zip/pyspark/errors/exceptions/captured.py:175, in capture_sql_exception.<locals>.deco(*a, **kw)
    171 converted = convert_exception(e.java_exception)
    172 if not isinstance(converted, UnknownException):
    173     # Hide where the exception came from that shows a non-Pythonic
    174     # JVM exception message.
--> 175     raise converted from None
    176 else:
    177     raise

ArrayIndexOutOfBoundsException: Index 0 out of bounds for length 0

Other Info: For Spark3.4.1

We are using this as base spark image: public.ecr.aws/ocean-spark/spark:platform-3.4.1-hadoop-3.3.4-java-11-scala-2.12-python-3.10-gen21 And these jars:

RUN wget https://repo1.maven.org/maven2/org/postgresql/postgresql/42.6.0/postgresql-42.6.0.jar && \
    wget https://repo1.maven.org/maven2/org/apache/hudi/hudi-spark3.4-bundle_2.12/0.14.0/hudi-spark3.4-bundle_2.12-0.14.0.jar && \
    wget https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.3.0/mysql-connector-j-8.3.0.jar && \
    mv postgresql-42.6.0.jar /opt/spark/jars && \
    mv hudi-spark3.4-bundle_2.12-0.14.0.jar /opt/spark/jars && \
    mv mysql-connector-j-8.3.0.jar /opt/spark/jars

For Spark 3.2.1

Base Image: linux/amd64 gcr.io/datamechanics/spark:platform-3.2.1-hadoop-3.3.1-java-11-scala-2.12-python-3.8-dm18 Jars:

RUN wget https://repo1.maven.org/maven2/org/postgresql/postgresql/42.3.7/postgresql-42.3.7.jar && \
    wget https://repo1.maven.org/maven2/org/apache/hudi/hudi-spark3.2-bundle_2.12/0.11.1/hudi-spark3.2-bundle_2.12-0.11.1.jar && \
    wget https://repo1.maven.org/maven2/org/apache/hive/hcatalog/hive-hcatalog-core/3.1.3/hive-hcatalog-core-3.1.3.jar && \
    wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.30/mysql-connector-java-8.0.30.jar && \
         mv postgresql-42.3.7.jar /opt/spark/jars && \
         mv hudi-spark3.2-bundle_2.12-0.11.1.jar /opt/spark/jars && \
         mv hive-hcatalog-core-3.1.3.jar /opt/spark/jars && \
         mv mysql-connector-java-8.0.30.jar /opt/spark/jars
ad1happy2go commented 4 months ago

@ghrahul If this is a test setup, then can you try removing 'hoodie.datasource.write.reconcile.schema' config to be sure if that is causing. Did you tried to run upgrade tool ? Can you try to upgrade to 0.13.1 first.

ghrahul commented 4 months ago

Hi @ad1happy2go , I tried to turn off hoodie.datasource.write.reconcile.schema and rerun the process, I am still facing the same error.

ad1happy2go commented 4 months ago

@ghrahul Can you try to upgrade to 0.12.3 or 0.13.1 first.

soumilshah1995 commented 4 months ago

did you try hudi cli ?

ad1happy2go commented 4 months ago

@ghrahul Any updates here, let us know if you are still stuck here.

soumilshah1995 commented 4 months ago

CLi should make it easy let us know if you need any help

soumilshah1995 commented 4 months ago

image https://hudi.apache.org/docs/cli/#upgrade-and-downgrade-table

ghrahul commented 4 months ago

Hi @ad1happy2go ,

I created an Athena table using spark3.2 and HUDI 0.11.1 . In that same table when I am appending data using spark3.2 and HUDI 0.13.1 I am facing this error:

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
/opt/spark/work-dir/launch_ipykernel.py in <module>
----> 1 df.write.format("org.apache.hudi").mode('append').options(**hudi_config).save({s3_path})

/opt/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py in save(self, path, format, mode, partitionBy, **options)
    738             self._jwrite.save()
    739         else:
--> 740             self._jwrite.save(path)
    741 
    742     @since(1.4)

/opt/spark/python/lib/py4j-0.10.9.3-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1319 
   1320         answer = self.gateway_client.send_command(command)
-> 1321         return_value = get_return_value(
   1322             answer, self.gateway_client, self.target_id, self.name)
   1323 

/opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py in deco(*a, **kw)
    109     def deco(*a, **kw):
    110         try:
--> 111             return f(*a, **kw)
    112         except py4j.protocol.Py4JJavaError as e:
    113             converted = convert_exception(e.java_exception)

/opt/spark/python/lib/py4j-0.10.9.3-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    324             value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325             if answer[1] == REFERENCE_TYPE:
--> 326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
    328                     format(target_id, ".", name), value)

Py4JJavaError: An error occurred while calling o186.save.
: org.apache.hudi.exception.HoodieRollbackException: Failed to rollback {s3 path}
    at org.apache.hudi.client.BaseHoodieTableServiceClient.rollback(BaseHoodieTableServiceClient.java:826)
    at org.apache.hudi.client.BaseHoodieTableServiceClient.rollbackFailedWrites(BaseHoodieTableServiceClient.java:730)
    at org.apache.hudi.client.BaseHoodieTableServiceClient.rollbackFailedWrites(BaseHoodieTableServiceClient.java:714)
    at org.apache.hudi.client.BaseHoodieTableServiceClient.rollbackFailedWrites(BaseHoodieTableServiceClient.java:709)
    at org.apache.hudi.client.BaseHoodieWriteClient.lambda$startCommitWithTime$97cdbdca$1(BaseHoodieWriteClient.java:837)
    at org.apache.hudi.common.util.CleanerUtils.rollbackFailedWrites(CleanerUtils.java:156)
    at org.apache.hudi.client.BaseHoodieWriteClient.startCommitWithTime(BaseHoodieWriteClient.java:836)
    at org.apache.hudi.client.BaseHoodieWriteClient.startCommitWithTime(BaseHoodieWriteClient.java:829)
    at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:371)
    at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:150)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:110)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:110)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:106)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457)
    at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:106)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:93)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:91)
    at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:128)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:848)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:382)
    at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:355)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.hudi.exception.HoodieException: Unknown versionCode:6
    at org.apache.hudi.common.table.HoodieTableVersion.lambda$versionFromCode$1(HoodieTableVersion.java:60)
    at java.base/java.util.Optional.orElseThrow(Optional.java:408)
    at org.apache.hudi.common.table.HoodieTableVersion.versionFromCode(HoodieTableVersion.java:60)
    at org.apache.hudi.common.table.HoodieTableConfig.getTableVersion(HoodieTableConfig.java:501)
    at org.apache.hudi.table.upgrade.UpgradeDowngrade.needsUpgradeOrDowngrade(UpgradeDowngrade.java:66)
    at org.apache.hudi.client.BaseHoodieWriteClient.tryUpgrade(BaseHoodieWriteClient.java:1257)
    at org.apache.hudi.client.BaseHoodieWriteClient.doInitTable(BaseHoodieWriteClient.java:1124)
    at org.apache.hudi.client.BaseHoodieWriteClient.initTable(BaseHoodieWriteClient.java:1162)
    at org.apache.hudi.client.BaseHoodieWriteClient.initTable(BaseHoodieWriteClient.java:1191)
    at org.apache.hudi.client.SparkRDDWriteClient.upsertPreppedRecords(SparkRDDWriteClient.java:151)
    at org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter.commit(SparkHoodieBackedTableMetadataWriter.java:185)
    at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.update(HoodieBackedTableMetadataWriter.java:962)
    at org.apache.hudi.table.action.BaseActionExecutor.lambda$writeTableMetadata$2(BaseActionExecutor.java:76)
    at org.apache.hudi.common.util.Option.ifPresent(Option.java:97)
    at org.apache.hudi.table.action.BaseActionExecutor.writeTableMetadata(BaseActionExecutor.java:76)
    at org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor.finishRollback(BaseRollbackActionExecutor.java:246)
    at org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor.runRollback(BaseRollbackActionExecutor.java:114)
    at org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor.execute(BaseRollbackActionExecutor.java:135)
    at org.apache.hudi.table.HoodieSparkCopyOnWriteTable.rollback(HoodieSparkCopyOnWriteTable.java:270)
    at org.apache.hudi.client.BaseHoodieTableServiceClient.rollback(BaseHoodieTableServiceClient.java:809)
    ... 50 more

ad1happy2go commented 4 months ago

@ghrahul Can you post hoodie.properties. Looks like the table got written using 0.14. That's why it has table version as 6. So for 0.13.1 which uses table version as 5 table version is unknown.

Try to write using 0.14.X version

ghrahul commented 1 month ago

Hi @ad1happy2go ,

Thank you for sharing the above details. We were able to follow the process and tested it with multiple tables, it worked.

We rolled out the upgrade to Spark 3.2.1 and HUDI HUDI 0.13.1 for around 500+ table in a stepwise migration. Then rolled out the upgrade again for Spark 3.4.1 and HUDI 0.14.0. This process worked for 95% of the tables.

But we are seeing issues for some tables where daily bulk_insert is happening. These pipelines are taking 10-20x time and 10-20x cost after the upgrade. There is a specific spark job which is taking a lot of time to complete after the upgrade which we have observed: save at DatasetBulkInsertCommitActionExecutor.java:81

Could you please tell me if we are missing something in the HUDI configuration.

Spark HUDI Bundle Used

https://repo1.maven.org/maven2/org/apache/hudi/hudi-spark3.4-bundle_2.12/0.14.0/hudi-spark3.4-bundle_2.12-0.14.0.jar

https://repo1.maven.org/maven2/org/apache/hudi/hudi-spark3.2-bundle_2.12/0.13.1/hudi-spark3.2-bundle_2.12-0.13.1.jar

HUDI CONFIG for a Table with this issue

{'className': 'org.apache.hudi',
 'hoodie.datasource.hive_sync.use_jdbc': 'false',
 'hoodie.datasource.write.precombine.field': 'created_at',
 'hoodie.datasource.write.recordkey.field': 'master_user_id,created_at',
 'hoodie.table.name': 'table_name',
 'hoodie.consistency.check.enabled': 'false',
 'hoodie.datasource.hive_sync.table': 'table_name',
 'hoodie.datasource.hive_sync.database': 'lake_useractivity',
 'hoodie.datasource.hive_sync.enable': 'true',
 'hoodie.datasource.hive_sync.mode': 'hms',
 'path': 's3a://bucket_name/table_name/public/table_name/',
 'hoodie.index.type': 'BLOOM',
 'hoodie.bloom.index.update.partition.path': 'true',
 'hoodie.bloom.index.filter.type': 'DYNAMIC_V0',
 'hoodie.datasource.hive_sync.support_timestamp': 'true',
 'hoodie.datasource.write.reconcile.schema': 'true',
 'hoodie.write.markers.type': 'direct',
 'hoodie.datasource.write.keygenerator.class': 'org.apache.hudi.keygen.ComplexKeyGenerator',
 'hoodie.datasource.write.partitionpath.field': 'master_user_id_partition',
 'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor',
 'hoodie.datasource.hive_sync.partition_fields': 'master_user_id_partition',
 'hoodie.datasource.write.hive_style_partitioning': 'true',
 'hoodie.upsert.shuffle.parallelism': 40,
 'hoodie.datasource.write.operation': 'bulk_insert',
 'hoodie.cleaner.policy': 'KEEP_LATEST_COMMITS',
 'hoodie.cleaner.commits.retained': 1}

We use the above config for a same job with HUDI 0.11.1 it took 5 minutes and 0.36 dollars to complete and for HUDI 0.13.1 it took 1 hour 44 mins and 36 dollars to complete and with HUDI 0.14.0 as well we saw same kind of cost hike and time hike.

We compared the jobs which are getting created in Spark UI and we saw this job is getting created which is taking a lot of after the upgrade. And this is failing as well for some tables.

image
ghrahul commented 1 month ago

Hi @ad1happy2go , can you please share your inputs regarding this.

SudhirSaxena commented 1 month ago

i am looking for further solution on this if we have done.. i have to do similar activities from Hudi 0.10 to Hudi 0.14 for our all existing Hudi table which are running in production. have to upgrade all the existing hudi table version from 0.10 to 0.14 and looking for suggestion what are changes required in config level or any hudi properties. appreciate for your help.

ghrahul commented 1 month ago

Hi @SudhirSaxena ,

As discussed in this thread, here we have to first upgrade from HUDI 0.10 to HUDI 0.13.1 by running an incremental job and then we have to again run 1 incremental job using HUDI 0.14.

I did not have to change any config in HUDI upsert configuration.

And these are the bundles that I have used:

Spark HUDI Bundle Used: https://repo1.maven.org/maven2/org/apache/hudi/hudi-spark3.4-bundle_2.12/0.14.0/hudi-spark3.4-bundle_2.12-0.14.0.jar

https://repo1.maven.org/maven2/org/apache/hudi/hudi-spark3.2-bundle_2.12/0.13.1/hudi-spark3.2-bundle_2.12-0.13.1.jar

SudhirSaxena commented 1 month ago

thanks @ghrahul for the replying, have some follow up question on this from your answer.

" here we have to first upgrade from HUDI 0.10 to HUDI 0.13.1 by running an incremental job". - i have to run incremental job with new hudi (version 0.13) and compare the data against existing hudi(old v 0.10) table as target and load the data into existing Hudi(with new version 0.13 ) as target now. do i need to do any changes on hudi properties or hudi config which i should know ? Can you please elaborate somewhat more if my understanding is not correct here.

" and then we have to again run 1 incremental job using HUDI 0.14." - i have to run incremental job with new hudi (version 0.14) and compare the data against existing hudi(updated version 0.13) table as target and load the data into existing Hudi(with new version 0.14 ) as target now.

also , can you please share what EMR version is compatible with hudi 0.13 and EMR version for hudi 0.14 to migrate the data from Hudi 0.10 to hudi 0.14

thanks for your quick response.

ghrahul commented 4 weeks ago

Hi @SudhirSaxena ,

If you are running data ingestion pipeline in an incremental way. You have to just change HUDI Spark bundle and run 1 incremental job in the same table without changing any HUDI properties. HUDI upsert will take care of updating the table properties.

For EMR compatibility you have to check with AWS support. I am using emr-6.15.0 along with hudi-spark3-bundle_2.12-0.14.0-amzn-0.jar for some use cases.

ghrahul commented 4 weeks ago

Hi @ad1happy2go ,

can you please help me with this?

https://github.com/apache/hudi/issues/11126#issuecomment-2264735034

ad1happy2go commented 4 weeks ago

@ghrahul It shouldn't be taking so much time ideally. to traiage you may try to disable metadata table to see if that is impacting the performance.

Also can you share us the spark event logs and hoodie timeline to look into this further?

ghrahul commented 4 weeks ago

Okay thanks @ad1happy2go .

ghrahul commented 3 weeks ago

Hi @ad1happy2go , here is the spark driver log file. spark_34_hudi_14_logs.zip