apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.32k stars 2.41k forks source link

[SUPPORT] Exception in thread "main" java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(Lscala/PartialFunction;)Lorg/apache/spark/sql/catalyst/plans/logical/LogicalPlan; #8614

Closed abdkumar closed 3 months ago

abdkumar commented 1 year ago

Description: We are attempting to test Deltastreamer on local files and apply some transformations before writing the data into the destination table. However, we keep encountering a SQL-related error as mentioned in the title.

I am running the HoodieDeltaStreamer with the SqlQueryBasedTransformer on Spark version 3.3.2 and Apache Hudi version 0.13.0. When running the Spark submit command, I am encountering the following error:

To Reproduce

Steps to reproduce the behavior:

  1. Download data from https://drive.google.com/drive/folders/1BwNEK649hErbsWcYLZhqCWnaXFX3mIsg and utility package from https://mvnrepository.com/artifact/org.apache.hudi/hudi-utilities-bundle_2.12/0.13.0
  2. create required folders
  3. running spark-submit command spark-submit \ --conf spark.jars=/home/kumar/hudi-utilities-bundle_2.12-0.13.0.jar \ --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \ --conf spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension \ --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog \ --conf spark.sql.hive.convertMetastoreParquet=false \ --conf mapreduce.fileoutputcommitter.marksuccessfuljobs=false \ --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer /home/kumar/hudi-utilities-bundle_2.12-0.13.0.jar \ --enable-sync \ --source-ordering-field replicadmstimestamp \ --source-class org.apache.hudi.utilities.sources.ParquetDFSSource \ --target-table invoice \ --target-base-path /home/kumar/hudi_project/deltastreamer_file_transformer/transformed_data \ --table-type COPY_ON_WRITE \ --op UPSERT \ --hoodie-conf hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.ComplexKeyGenerator \ --hoodie-conf hoodie.datasource.write.recordkey.field=invoiceid \ --hoodie-conf hoodie.deltastreamer.source.dfs.root=file:////home/kumar/hudi_project/deltastreamer_file_transformer/sample_data_files \ --hoodie-conf hoodie.datasource.write.precombine.field=replicadmstimestamp \ --hoodie-conf hoodie.database.name=metastore \ --hoodie-conf hoodie.datasource.hive_sync.enable=true \ --hoodie-conf hoodie.datasource.hive_sync.table=invoice \ --transformer-class org.apache.hudi.utilities.transform.SqlQueryBasedTransformer \ --hoodie-conf hoodie.deltastreamer.transformer.sql="SELECT * ,extract(year from replicadmstimestamp) as year, extract(month from replicadmstimestamp) as month, extract(day from replicadmstimestamp) as day FROM <SRC> a;" \ --hoodie-conf hoodie.datasource.write.partitionpath.field=year,month,day \ --hoodie-conf hoodie.datasource.hive_sync.partition_fields=year,month,day \

Environment Description

Additional context

Added required jars in spark-defaults.conf file spark.jars.packages io.delta:delta-core_2.12:2.3.0,org.apache.hudi:hudi-spark3.3-bundle_2.12:0.13.0,org.apache.spark:spark-avro_2.12:3.3.2

Stacktrace


:: loading settings :: url = jar:file:/home/kumar/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /home/kumar/.ivy2/cache
The jars for the packages stored in: /home/kumar/.ivy2/jars
io.delta#delta-core_2.12 added as a dependency
org.apache.hudi#hudi-spark3.3-bundle_2.12 added as a dependency
org.apache.spark#spark-avro_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-91f79469-e32d-4f8c-8dc0-3be403ce410d;1.0
    confs: [default]
    found io.delta#delta-core_2.12;2.3.0 in central
    found io.delta#delta-storage;2.3.0 in central
    found org.antlr#antlr4-runtime;4.8 in central
    found org.apache.hudi#hudi-spark3.3-bundle_2.12;0.13.0 in central
    found org.apache.spark#spark-avro_2.12;3.3.2 in central
    found org.tukaani#xz;1.9 in central
    found org.spark-project.spark#unused;1.0.0 in central
:: resolution report :: resolve 616ms :: artifacts dl 29ms
    :: modules in use:
    io.delta#delta-core_2.12;2.3.0 from central in [default]
    io.delta#delta-storage;2.3.0 from central in [default]
    org.antlr#antlr4-runtime;4.8 from central in [default]
    org.apache.hudi#hudi-spark3.3-bundle_2.12;0.13.0 from central in [default]
    org.apache.spark#spark-avro_2.12;3.3.2 from central in [default]
    org.spark-project.spark#unused;1.0.0 from central in [default]
    org.tukaani#xz;1.9 from central in [default]
    ---------------------------------------------------------------------
    |                  |            modules            ||   artifacts   |
    |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
    ---------------------------------------------------------------------
    |      default     |   7   |   0   |   0   |   0   ||   7   |   0   |
    ---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-91f79469-e32d-4f8c-8dc0-3be403ce410d
    confs: [default]
    0 artifacts copied, 7 already retrieved (0kB/23ms)
23/05/01 13:23:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/05/01 13:23:10 WARN SchedulerConfGenerator: Job Scheduling Configs will not be in effect as spark.scheduler.mode is not set to FAIR at instantiation time. Continuing without scheduling configs
23/05/01 13:23:10 INFO SparkContext: Running Spark version 3.3.2
23/05/01 13:23:10 INFO ResourceUtils: ==============================================================
23/05/01 13:23:10 INFO ResourceUtils: No custom resources configured for spark.driver.
23/05/01 13:23:10 INFO ResourceUtils: ==============================================================
23/05/01 13:23:10 INFO SparkContext: Submitted application: delta-streamer-invoice
23/05/01 13:23:11 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
23/05/01 13:23:11 INFO ResourceProfile: Limiting resource is cpu
23/05/01 13:23:11 INFO ResourceProfileManager: Added ResourceProfile id: 0
23/05/01 13:23:11 INFO SecurityManager: Changing view acls to: kumar
23/05/01 13:23:11 INFO SecurityManager: Changing modify acls to: kumar
23/05/01 13:23:11 INFO SecurityManager: Changing view acls groups to: 
23/05/01 13:23:11 INFO SecurityManager: Changing modify acls groups to: 
23/05/01 13:23:11 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(kumar); groups with view permissions: Set(); users  with modify permissions: Set(kumar); groups with modify permissions: Set()
23/05/01 13:23:11 INFO deprecation: mapred.output.compression.codec is deprecated. Instead, use mapreduce.output.fileoutputformat.compress.codec
23/05/01 13:23:11 INFO deprecation: mapred.output.compress is deprecated. Instead, use mapreduce.output.fileoutputformat.compress
23/05/01 13:23:11 INFO deprecation: mapred.output.compression.type is deprecated. Instead, use mapreduce.output.fileoutputformat.compress.type
23/05/01 13:23:11 INFO Utils: Successfully started service 'sparkDriver' on port 43729.
23/05/01 13:23:11 INFO SparkEnv: Registering MapOutputTracker
23/05/01 13:23:11 INFO SparkEnv: Registering BlockManagerMaster
23/05/01 13:23:12 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
23/05/01 13:23:12 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
23/05/01 13:23:12 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
23/05/01 13:23:12 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-227cf5db-3cef-4bff-9c4b-17f81aa8aa1b
23/05/01 13:23:12 INFO MemoryStore: MemoryStore started with capacity 366.3 MiB
23/05/01 13:23:12 INFO SparkEnv: Registering OutputCommitCoordinator
23/05/01 13:23:12 INFO Utils: Successfully started service 'SparkUI' on port 8090.
23/05/01 13:23:12 INFO SparkContext: Added JAR file:///home/kumar/hudi-utilities-bundle_2.12-0.13.0.jar at spark://hudi-vm.internal.cloudapp.net:43729/jars/hudi-utilities-bundle_2.12-0.13.0.jar with timestamp 1682947390924
23/05/01 13:23:12 INFO SparkContext: Added JAR file:///home/kumar/.ivy2/jars/io.delta_delta-core_2.12-2.3.0.jar at spark://hudi-vm.internal.cloudapp.net:43729/jars/io.delta_delta-core_2.12-2.3.0.jar with timestamp 1682947390924
23/05/01 13:23:12 INFO SparkContext: Added JAR file:///home/kumar/.ivy2/jars/org.apache.hudi_hudi-spark3.3-bundle_2.12-0.13.0.jar at spark://hudi-vm.internal.cloudapp.net:43729/jars/org.apache.hudi_hudi-spark3.3-bundle_2.12-0.13.0.jar with timestamp 1682947390924
23/05/01 13:23:12 INFO SparkContext: Added JAR file:///home/kumar/.ivy2/jars/org.apache.spark_spark-avro_2.12-3.3.2.jar at spark://hudi-vm.internal.cloudapp.net:43729/jars/org.apache.spark_spark-avro_2.12-3.3.2.jar with timestamp 1682947390924
23/05/01 13:23:12 INFO SparkContext: Added JAR file:///home/kumar/.ivy2/jars/io.delta_delta-storage-2.3.0.jar at spark://hudi-vm.internal.cloudapp.net:43729/jars/io.delta_delta-storage-2.3.0.jar with timestamp 1682947390924
23/05/01 13:23:12 INFO SparkContext: Added JAR file:///home/kumar/.ivy2/jars/org.antlr_antlr4-runtime-4.8.jar at spark://hudi-vm.internal.cloudapp.net:43729/jars/org.antlr_antlr4-runtime-4.8.jar with timestamp 1682947390924
23/05/01 13:23:12 INFO SparkContext: Added JAR file:///home/kumar/.ivy2/jars/org.tukaani_xz-1.9.jar at spark://hudi-vm.internal.cloudapp.net:43729/jars/org.tukaani_xz-1.9.jar with timestamp 1682947390924
23/05/01 13:23:12 INFO SparkContext: Added JAR file:///home/kumar/.ivy2/jars/org.spark-project.spark_unused-1.0.0.jar at spark://hudi-vm.internal.cloudapp.net:43729/jars/org.spark-project.spark_unused-1.0.0.jar with timestamp 1682947390924
23/05/01 13:23:12 INFO SparkContext: The JAR file:/home/kumar/hudi-utilities-bundle_2.12-0.13.0.jar at spark://hudi-vm.internal.cloudapp.net:43729/jars/hudi-utilities-bundle_2.12-0.13.0.jar has been added already. Overwriting of added jar is not supported in the current version.
23/05/01 13:23:12 INFO Executor: Starting executor ID driver on host hudi-vm.internal.cloudapp.net
23/05/01 13:23:12 INFO Executor: Starting executor with user classpath (userClassPathFirst = false): ''
23/05/01 13:23:12 INFO Executor: Fetching spark://hudi-vm.internal.cloudapp.net:43729/jars/io.delta_delta-storage-2.3.0.jar with timestamp 1682947390924
23/05/01 13:23:12 INFO TransportClientFactory: Successfully created connection to hudi-vm.internal.cloudapp.net/10.1.0.4:43729 after 49 ms (0 ms spent in bootstraps)
23/05/01 13:23:12 INFO Utils: Fetching spark://hudi-vm.internal.cloudapp.net:43729/jars/io.delta_delta-storage-2.3.0.jar to /tmp/spark-5d07761f-de34-404e-ba00-73a56a32d135/userFiles-84300702-dcb6-46f6-aba7-b2af5f66e946/fetchFileTemp6875321353190243857.tmp
23/05/01 13:23:12 INFO Executor: Adding file:/tmp/spark-5d07761f-de34-404e-ba00-73a56a32d135/userFiles-84300702-dcb6-46f6-aba7-b2af5f66e946/io.delta_delta-storage-2.3.0.jar to class loader
23/05/01 13:23:12 INFO Executor: Fetching spark://hudi-vm.internal.cloudapp.net:43729/jars/org.apache.hudi_hudi-spark3.3-bundle_2.12-0.13.0.jar with timestamp 1682947390924
23/05/01 13:23:12 INFO Utils: Fetching spark://hudi-vm.internal.cloudapp.net:43729/jars/org.apache.hudi_hudi-spark3.3-bundle_2.12-0.13.0.jar to /tmp/spark-5d07761f-de34-404e-ba00-73a56a32d135/userFiles-84300702-dcb6-46f6-aba7-b2af5f66e946/fetchFileTemp7911131019630374440.tmp
23/05/01 13:23:14 INFO Executor: Adding file:/tmp/spark-5d07761f-de34-404e-ba00-73a56a32d135/userFiles-84300702-dcb6-46f6-aba7-b2af5f66e946/org.apache.hudi_hudi-spark3.3-bundle_2.12-0.13.0.jar to class loader
23/05/01 13:23:14 INFO Executor: Fetching spark://hudi-vm.internal.cloudapp.net:43729/jars/hudi-utilities-bundle_2.12-0.13.0.jar with timestamp 1682947390924
23/05/01 13:23:14 INFO Utils: Fetching spark://hudi-vm.internal.cloudapp.net:43729/jars/hudi-utilities-bundle_2.12-0.13.0.jar to /tmp/spark-5d07761f-de34-404e-ba00-73a56a32d135/userFiles-84300702-dcb6-46f6-aba7-b2af5f66e946/fetchFileTemp3321144361109611039.tmp
23/05/01 13:23:16 INFO Executor: Adding file:/tmp/spark-5d07761f-de34-404e-ba00-73a56a32d135/userFiles-84300702-dcb6-46f6-aba7-b2af5f66e946/hudi-utilities-bundle_2.12-0.13.0.jar to class loader
23/05/01 13:23:16 INFO Executor: Fetching spark://hudi-vm.internal.cloudapp.net:43729/jars/io.delta_delta-core_2.12-2.3.0.jar with timestamp 1682947390924
23/05/01 13:23:16 INFO Utils: Fetching spark://hudi-vm.internal.cloudapp.net:43729/jars/io.delta_delta-core_2.12-2.3.0.jar to /tmp/spark-5d07761f-de34-404e-ba00-73a56a32d135/userFiles-84300702-dcb6-46f6-aba7-b2af5f66e946/fetchFileTemp738084884534076528.tmp
23/05/01 13:23:17 INFO Executor: Adding file:/tmp/spark-5d07761f-de34-404e-ba00-73a56a32d135/userFiles-84300702-dcb6-46f6-aba7-b2af5f66e946/io.delta_delta-core_2.12-2.3.0.jar to class loader
23/05/01 13:23:17 INFO Executor: Fetching spark://hudi-vm.internal.cloudapp.net:43729/jars/org.apache.spark_spark-avro_2.12-3.3.2.jar with timestamp 1682947390924
23/05/01 13:23:17 INFO Utils: Fetching spark://hudi-vm.internal.cloudapp.net:43729/jars/org.apache.spark_spark-avro_2.12-3.3.2.jar to /tmp/spark-5d07761f-de34-404e-ba00-73a56a32d135/userFiles-84300702-dcb6-46f6-aba7-b2af5f66e946/fetchFileTemp6464200383284256273.tmp
23/05/01 13:23:17 INFO Executor: Adding file:/tmp/spark-5d07761f-de34-404e-ba00-73a56a32d135/userFiles-84300702-dcb6-46f6-aba7-b2af5f66e946/org.apache.spark_spark-avro_2.12-3.3.2.jar to class loader
23/05/01 13:23:17 INFO Executor: Fetching spark://hudi-vm.internal.cloudapp.net:43729/jars/org.antlr_antlr4-runtime-4.8.jar with timestamp 1682947390924
23/05/01 13:23:17 INFO Utils: Fetching spark://hudi-vm.internal.cloudapp.net:43729/jars/org.antlr_antlr4-runtime-4.8.jar to /tmp/spark-5d07761f-de34-404e-ba00-73a56a32d135/userFiles-84300702-dcb6-46f6-aba7-b2af5f66e946/fetchFileTemp5710292961021659934.tmp
23/05/01 13:23:17 INFO Executor: Adding file:/tmp/spark-5d07761f-de34-404e-ba00-73a56a32d135/userFiles-84300702-dcb6-46f6-aba7-b2af5f66e946/org.antlr_antlr4-runtime-4.8.jar to class loader
23/05/01 13:23:17 INFO Executor: Fetching spark://hudi-vm.internal.cloudapp.net:43729/jars/org.tukaani_xz-1.9.jar with timestamp 1682947390924
23/05/01 13:23:17 INFO Utils: Fetching spark://hudi-vm.internal.cloudapp.net:43729/jars/org.tukaani_xz-1.9.jar to /tmp/spark-5d07761f-de34-404e-ba00-73a56a32d135/userFiles-84300702-dcb6-46f6-aba7-b2af5f66e946/fetchFileTemp5943086360298064670.tmp
23/05/01 13:23:17 INFO Executor: Adding file:/tmp/spark-5d07761f-de34-404e-ba00-73a56a32d135/userFiles-84300702-dcb6-46f6-aba7-b2af5f66e946/org.tukaani_xz-1.9.jar to class loader
23/05/01 13:23:17 INFO Executor: Fetching spark://hudi-vm.internal.cloudapp.net:43729/jars/org.spark-project.spark_unused-1.0.0.jar with timestamp 1682947390924
23/05/01 13:23:17 INFO Utils: Fetching spark://hudi-vm.internal.cloudapp.net:43729/jars/org.spark-project.spark_unused-1.0.0.jar to /tmp/spark-5d07761f-de34-404e-ba00-73a56a32d135/userFiles-84300702-dcb6-46f6-aba7-b2af5f66e946/fetchFileTemp2308988093976298818.tmp
23/05/01 13:23:17 INFO Executor: Adding file:/tmp/spark-5d07761f-de34-404e-ba00-73a56a32d135/userFiles-84300702-dcb6-46f6-aba7-b2af5f66e946/org.spark-project.spark_unused-1.0.0.jar to class loader
23/05/01 13:23:17 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 40465.
23/05/01 13:23:17 INFO NettyBlockTransferService: Server created on hudi-vm.internal.cloudapp.net:40465
23/05/01 13:23:17 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
23/05/01 13:23:17 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, hudi-vm.internal.cloudapp.net, 40465, None)
23/05/01 13:23:17 INFO BlockManagerMasterEndpoint: Registering block manager hudi-vm.internal.cloudapp.net:40465 with 366.3 MiB RAM, BlockManagerId(driver, hudi-vm.internal.cloudapp.net, 40465, None)
23/05/01 13:23:17 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, hudi-vm.internal.cloudapp.net, 40465, None)
23/05/01 13:23:17 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, hudi-vm.internal.cloudapp.net, 40465, None)
23/05/01 13:23:18 WARN DFSPropertiesConfiguration: Cannot find HUDI_CONF_DIR, please set it as the dir of hudi-defaults.conf
23/05/01 13:23:18 WARN DFSPropertiesConfiguration: Properties file file:/etc/hudi/conf/hudi-defaults.conf not found. Ignoring to load props file
23/05/01 13:23:18 INFO UtilHelpers: Adding overridden properties to file properties.
23/05/01 13:23:18 WARN SparkContext: Using an existing SparkContext; some configuration may not take effect.
23/05/01 13:23:18 INFO HoodieTableMetaClient: Loading HoodieTableMetaClient from /home/kumar/hudi_project/deltastreamer_file_transformer/transformed_data
23/05/01 13:23:18 INFO HoodieTableConfig: Loading table properties from /home/kumar/hudi_project/deltastreamer_file_transformer/transformed_data/.hoodie/hoodie.properties
23/05/01 13:23:18 INFO HoodieTableMetaClient: Finished Loading Table of type COPY_ON_WRITE(version=1, baseFileFormat=PARQUET) from /home/kumar/hudi_project/deltastreamer_file_transformer/transformed_data
23/05/01 13:23:18 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir.
23/05/01 13:23:18 INFO SharedState: Warehouse path is 'file:/home/kumar/hudi_project/deltastreamer_file_transformer/spark-warehouse'.
23/05/01 13:23:21 INFO HoodieDeltaStreamer: Creating delta streamer with configs:
hoodie.auto.adjust.lock.configs: true
hoodie.database.name: metastore
hoodie.datasource.hive_sync.enable: true
hoodie.datasource.hive_sync.partition_fields: year,month,day
hoodie.datasource.hive_sync.table: invoice
hoodie.datasource.write.keygenerator.class: org.apache.hudi.keygen.ComplexKeyGenerator
hoodie.datasource.write.partitionpath.field: year,month,day
hoodie.datasource.write.precombine.field: replicadmstimestamp
hoodie.datasource.write.reconcile.schema: false
hoodie.datasource.write.recordkey.field: invoiceid
hoodie.deltastreamer.source.dfs.root: [file:////home/kumar/hudi_project/deltastreamer_file_transformer/sample_data_files](file://home/kumar/hudi_project/deltastreamer_file_transformer/sample_data_files)
hoodie.deltastreamer.transformer.sql: SELECT * ,extract(year from replicadmstimestamp) as year, extract(month from replicadmstimestamp) as month, extract(day from replicadmstimestamp) as day FROM <SRC> a;

23/05/01 13:23:21 INFO HoodieTableMetaClient: Loading HoodieTableMetaClient from /home/kumar/hudi_project/deltastreamer_file_transformer/transformed_data
23/05/01 13:23:21 INFO HoodieTableConfig: Loading table properties from /home/kumar/hudi_project/deltastreamer_file_transformer/transformed_data/.hoodie/hoodie.properties
23/05/01 13:23:21 INFO HoodieTableMetaClient: Finished Loading Table of type COPY_ON_WRITE(version=1, baseFileFormat=PARQUET) from /home/kumar/hudi_project/deltastreamer_file_transformer/transformed_data
23/05/01 13:23:21 INFO HoodieActiveTimeline: Loaded instants upto : Optional.empty
23/05/01 13:23:21 INFO DFSPathSelector: Using path selector org.apache.hudi.utilities.sources.helpers.DFSPathSelector
23/05/01 13:23:21 INFO HoodieDeltaStreamer: Delta Streamer running only single round
23/05/01 13:23:21 INFO HoodieTableMetaClient: Loading HoodieTableMetaClient from /home/kumar/hudi_project/deltastreamer_file_transformer/transformed_data
23/05/01 13:23:21 INFO HoodieTableConfig: Loading table properties from /home/kumar/hudi_project/deltastreamer_file_transformer/transformed_data/.hoodie/hoodie.properties
23/05/01 13:23:21 INFO HoodieTableMetaClient: Finished Loading Table of type COPY_ON_WRITE(version=1, baseFileFormat=PARQUET) from /home/kumar/hudi_project/deltastreamer_file_transformer/transformed_data
23/05/01 13:23:21 INFO HoodieActiveTimeline: Loaded instants upto : Optional.empty
23/05/01 13:23:21 INFO DeltaSync: Checkpoint to resume from : Optional.empty
23/05/01 13:23:21 INFO DFSPathSelector: Root path => [file:////home/kumar/hudi_project/deltastreamer_file_transformer/sample_data_files](file://home/kumar/hudi_project/deltastreamer_file_transformer/sample_data_files) source limit => 9223372036854775807
23/05/01 13:23:21 INFO InMemoryFileIndex: It took 64 ms to list leaf files for 2 paths.
23/05/01 13:23:23 INFO SparkContext: Starting job: parquet at ParquetDFSSource.java:55
23/05/01 13:23:23 INFO DAGScheduler: Got job 0 (parquet at ParquetDFSSource.java:55) with 1 output partitions
23/05/01 13:23:23 INFO DAGScheduler: Final stage: ResultStage 0 (parquet at ParquetDFSSource.java:55)
23/05/01 13:23:23 INFO DAGScheduler: Parents of final stage: List()
23/05/01 13:23:23 INFO DAGScheduler: Missing parents: List()
23/05/01 13:23:23 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[1] at parquet at ParquetDFSSource.java:55), which has no missing parents
23/05/01 13:23:23 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 106.0 KiB, free 366.2 MiB)
23/05/01 13:23:23 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 38.2 KiB, free 366.2 MiB)
23/05/01 13:23:23 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on hudi-vm.internal.cloudapp.net:40465 (size: 38.2 KiB, free: 366.3 MiB)
23/05/01 13:23:23 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1513
23/05/01 13:23:24 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (MapPartitionsRDD[1] at parquet at ParquetDFSSource.java:55) (first 15 tasks are for partitions Vector(0))
23/05/01 13:23:24 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks resource profile 0
23/05/01 13:23:24 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0) (hudi-vm.internal.cloudapp.net, executor driver, partition 0, PROCESS_LOCAL, 4449 bytes) taskResourceAssignments Map()
23/05/01 13:23:24 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
23/05/01 13:23:25 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1606 bytes result sent to driver
23/05/01 13:23:25 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 1696 ms on hudi-vm.internal.cloudapp.net (executor driver) (1/1)
23/05/01 13:23:25 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
23/05/01 13:23:25 INFO DAGScheduler: ResultStage 0 (parquet at ParquetDFSSource.java:55) finished in 2.676 s
23/05/01 13:23:25 INFO DAGScheduler: Job 0 is finished. Cancelling potential speculative or zombie tasks for this job
23/05/01 13:23:25 INFO TaskSchedulerImpl: Killing all running tasks in stage 0: Stage finished
23/05/01 13:23:25 INFO DAGScheduler: Job 0 finished: parquet at ParquetDFSSource.java:55, took 2.834363 s
23/05/01 13:23:26 INFO BlockManagerInfo: Removed broadcast_0_piece0 on hudi-vm.internal.cloudapp.net:40465 in memory (size: 38.2 KiB, free: 366.3 MiB)
23/05/01 13:23:30 INFO SqlQueryBasedTransformer: Registering tmp table : HOODIE_SRC_TMP_TABLE_a1962c9c_edde_45ce_88db_a73c519c000d
23/05/01 13:23:30 INFO DeltaSync: Shutting down embedded timeline server
23/05/01 13:23:30 INFO HoodieDeltaStreamer: Shut down delta streamer
23/05/01 13:23:30 INFO SparkUI: Stopped Spark web UI at http://hudi-vm.internal.cloudapp.net:8090/
23/05/01 13:23:30 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
23/05/01 13:23:31 INFO MemoryStore: MemoryStore cleared
23/05/01 13:23:31 INFO BlockManager: BlockManager stopped
23/05/01 13:23:31 INFO BlockManagerMaster: BlockManagerMaster stopped
23/05/01 13:23:31 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
23/05/01 13:23:31 INFO SparkContext: Successfully stopped SparkContext
Exception in thread "main" java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(Lscala/PartialFunction;)Lorg/apache/spark/sql/catalyst/plans/logical/LogicalPlan;
    at org.apache.spark.sql.hudi.analysis.HoodiePruneFileSourcePartitions.apply(HoodiePruneFileSourcePartitions.scala:42)
    at org.apache.spark.sql.hudi.analysis.HoodiePruneFileSourcePartitions.apply(HoodiePruneFileSourcePartitions.scala:40)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:211)
    at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
    at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
    at scala.collection.immutable.List.foldLeft(List.scala:91)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:208)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:200)
    at scala.collection.immutable.List.foreach(List.scala:431)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:200)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:179)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:179)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$optimizedPlan$1(QueryExecution.scala:126)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:185)
    at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:185)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:184)
    at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:122)
    at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:118)
    at org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:136)
    at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:154)
    at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:151)
    at org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:204)
    at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:249)
    at org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:218)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:103)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
    at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
    at org.apache.spark.sql.Dataset.<init>(Dataset.scala:220)
    at org.apache.spark.sql.Dataset$.$anonfun$ofRows$1(Dataset.scala:92)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:89)
    at org.apache.spark.sql.Dataset.withPlan(Dataset.scala:3887)
    at org.apache.spark.sql.Dataset.createOrReplaceTempView(Dataset.scala:3455)
    at org.apache.hudi.utilities.transform.SqlQueryBasedTransformer.apply(SqlQueryBasedTransformer.java:63)
    at org.apache.hudi.utilities.transform.ChainedTransformer.apply(ChainedTransformer.java:50)
    at org.apache.hudi.utilities.deltastreamer.DeltaSync.lambda$fetchFromSource$0(DeltaSync.java:495)
    at org.apache.hudi.common.util.Option.map(Option.java:108)
    at org.apache.hudi.utilities.deltastreamer.DeltaSync.fetchFromSource(DeltaSync.java:495)
    at org.apache.hudi.utilities.deltastreamer.DeltaSync.readFromSource(DeltaSync.java:460)
    at org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:364)
    at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.lambda$sync$2(HoodieDeltaStreamer.java:206)
    at org.apache.hudi.common.util.Option.ifPresent(Option.java:97)
    at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:204)
    at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:573)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
    at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:958)
    at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
    at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
    at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
    at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1046)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1055)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
23/05/01 13:23:31 INFO ShutdownHookManager: Shutdown hook called
23/05/01 13:23:31 INFO ShutdownHookManager: Deleting directory /tmp/spark-5d07761f-de34-404e-ba00-73a56a32d135
23/05/01 13:23:31 INFO ShutdownHookManager: Deleting directory /tmp/spark-7c0ada38-e349-47fb-bf8d-9f7c288f4e24```
danny0405 commented 1 year ago

There is known in-compatibility for spark 3.3.2 and Hudi 0.13.0: https://github.com/apache/hudi/pull/8082, can you try the patch to see if it resolves your problem?

ad1happy2go commented 1 year ago

@abdkumar Were you able to test out with this patch.

pushpavanthar commented 10 months ago

@danny0405 we are facing same issue in Hudi version 0.13.1 and spark version 3.2.1 and 3.3.2. Below is the command we use to run, Same command used to work fine with Hudi 0.11.1. spark-submit --master yarn --packages org.apache.spark:spark-avro_2.12:3.2.1,org.apache.hudi:hudi-utilities-bundle_2.12:0.13.1,org.apache.hudi:hudi-spark3.2-bundle_2.12:0.13.1,org.apache.hudi:hudi-aws-bundle:0.13.1 --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer --conf spark.executor.cores=5 --conf spark.driver.memory=3200m --conf spark.driver.memoryOverhead=800m --conf spark.executor.memoryOverhead=1400m --conf spark.executor.memory=14600m --conf spark.dynamicAllocation.enabled=true --conf spark.dynamicAllocation.initialExecutors=1 --conf spark.dynamicAllocation.minExecutors=1 --conf spark.dynamicAllocation.maxExecutors=21 --conf spark.scheduler.mode=FAIR --conf spark.task.maxFailures=5 --conf spark.rdd.compress=true --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.shuffle.service.enabled=true --conf spark.sql.hive.convertMetastoreParquet=false --conf spark.yarn.max.executor.failures=5 --conf spark.driver.userClassPathFirst=true --conf spark.executor.userClassPathFirst=true --conf spark.sql.catalogImplementation=hive --deploy-mode client s3://bucket_name/custom_jar-2.0.jar --hoodie-conf hoodie.parquet.compression.codec=snappy --hoodie-conf hoodie.deltastreamer.source.hoodieincr.num_instants=100 --table-type COPY_ON_WRITE --source-class org.apache.hudi.utilities.sources.HoodieIncrSource --hoodie-conf hoodie.deltastreamer.source.hoodieincr.path=s3://bucket_name/ml_attributes/features --hoodie-conf hoodie.metrics.on=true --hoodie-conf hoodie.metrics.reporter.type=PROMETHEUS_PUSHGATEWAY --hoodie-conf hoodie.metrics.pushgateway.host=pushgateway.in --hoodie-conf hoodie.metrics.pushgateway.port=443 --hoodie-conf hoodie.metrics.pushgateway.delete.on.shutdown=false --hoodie-conf hoodie.metrics.pushgateway.job.name=hudi_transformed_features_accounts_hudi --hoodie-conf hoodie.metrics.pushgateway.random.job.name.suffix=false --hoodie-conf hoodie.metadata.enable=true --hoodie-conf hoodie.metrics.reporter.metricsname.prefix=hudi --target-base-path s3://bucket_name_transformed/features_accounts --target-table features_accounts --enable-sync --hoodie-conf hoodie.datasource.hive_sync.database=hudi_transformed --hoodie-conf hoodie.datasource.hive_sync.table=features_accounts --sync-tool-classes org.apache.hudi.aws.sync.AwsGlueCatalogSyncTool --hoodie-conf hoodie.datasource.write.recordkey.field=id,pos --hoodie-conf hoodie.datasource.write.precombine.field=id --hoodie-conf hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.CustomKeyGenerator --hoodie-conf hoodie.datasource.write.partitionpath.field=created_at_dt --hoodie-conf hoodie.datasource.hive_sync.partition_fields=created_at_dt --hoodie-conf hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor --hoodie-conf hoodie.deltastreamer.keygen.timebased.timestamp.type=DATE_STRING --hoodie-conf "hoodie.deltastreamer.keygen.timebased.input.dateformat=yyyy-MM-dd'T'HH:mm:ss.SSSSSS'Z',yyyy-MM-dd' 'HH:mm:ss.SSSSSS,yyyy-MM-dd' 'HH:mm:ss,yyyy-MM-dd'T'HH:mm:ss'Z'" --hoodie-conf hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy/MM/dd --source-ordering-field id --hoodie-conf secret.key.name=some-secret --hoodie-conf transformer.decrypt.cols=features_json --hoodie-conf transformer.uncompress.cols=false --hoodie-conf transformer.jsonToStruct.column=features_json --hoodie-conf transformer.normalize.column=features_json.accounts --hoodie-conf transformer.copy.fields=created_at,created_at_dt --transformer-class com.custom.transform.DecryptTransformer,com.custom.transform.JsonToStructTypeTransformer,com.custom.transform.NormalizeArrayTransformer,com.custom.transform.FlatteningTransformer,com.custom.transform.CopyFieldTransformer

danny0405 commented 10 months ago

@ad1happy2go Would you mind to reproduce with the given cmd by @pushpavanthar ?

pushpavanthar commented 10 months ago

we tried running this on emr-6.7.0 and few other higher labels.

ad1happy2go commented 10 months ago

@danny0405 I think the issue is org.apache.hudi:hudi-utilities-bundle_2.12:0.13.1 . As utilities bundle jar can't have each spark version specific dependency. So dont use the maven one and either try to build your own jar and use that. OR use the slim-bundle package. We should not use both utilities-bundle and spark bundle together. utilities-bundle already have spark-bundle dependency. So ideally use utilities slim bundle.

@pushpavanthar I did asked you to try the same on this slack thread - https://apache-hudi.slack.com/archives/C4D716NPQ/p1697802409713149. Were you able to try out this?

ad1happy2go commented 10 months ago

@pushpavanthar Were you able to resolve this issue?

haggy commented 4 months ago

OR use the slim-bundle package. We should not use both utilities-bundle and spark bundle together. utilities-bundle already have spark-bundle dependency

@ad1happy2go Using the hudi-spark3.2-bundle + the hudi-utilities-slim bundle for version 0.13.1 resolved the issues for me!

EMR Version: emr-6.7.0 Spark Version: 3.2.1 Hudi Version: 0.13.1

Relevant spark-submit command ref:

spark-submit --deploy-mode client \
    --jars ./lib/jars/hudi-spark3.2-bundle_2.12-0.13.1.jar,./lib/jars/hudi-aws-bundle-0.13.1.jar \
    --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
    --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
    ...
    ./lib/jars/hudi-utilities-slim-bundle_2.12-0.13.1.jar
    ...
ad1happy2go commented 3 months ago

Thanks @haggy Closing this issue.