apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.36k stars 2.42k forks source link

[SUPPORT] Multi Writer Jobs with OCC (U1 and U2) with Async Cleaner #11378

Open soumilshah1995 opened 4 months ago

soumilshah1995 commented 4 months ago

Hello,

We have been experimenting with a multi-writer setup and have confirmed that it works perfectly with two writers. The image below shows our sample setup:

image

To further enhance our setup, we wanted to test running the cleaner in parallel asynchronously. The first run of the cleaner was successful, but subsequent runs have been failing.

In our setup, we have two jobs: u1 and u2.

u1 touches partitions in NY. u2 touches partitions in CA. Both jobs have the following common configurations:

# "hoodie.clean.automatic": "false",
# "hoodie.clean.async": "true",
"hoodie.write.concurrency.mode": "optimistic_concurrency_control",
"hoodie.cleaner.policy.failed.writes": "LAZY",
"hoodie.write.lock.provider": "org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider",

Note : Also tried # "hoodie.clean.automatic": true and false

We tested the setup both with and without the following flags:

# "hoodie.clean.automatic": "false",
# "hoodie.clean.async": "true",

Here is our cleaner async configuration:

spark-submit \
    --class 'org.apache.hudi.utilities.HoodieCleaner' \
    --packages 'org.apache.hudi:hudi-spark3.4-bundle_2.12:0.14.0' \
    --properties-file spark-config.properties \
    --master 'local[*]' \
    --executor-memory 3g \
    /Users/soumilshah/IdeaProjects/SparkProject/deltastreamerBroadcastJoins/jar/hudi-utilities-slim-bundle_2.12-0.14.0.jar \
    --target-base-path 'file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/' \
    --hoodie-conf 'hoodie.cleaner.policy=KEEP_LATEST_FILE_VERSIONS' \
    --hoodie-conf 'hoodie.cleaner.fileversions.retained=1' \
    --hoodie-conf 'hoodie.cleaner.parallelism=100' \
    --hoodie-conf 'hoodie.clean.automatic=true' \
    --hoodie-conf 'hoodie.clean.async=true' \
    --hoodie-conf 'hoodie.write.concurrency.mode=optimistic_concurrency_control' \
    --hoodie-conf 'hoodie.cleaner.policy.failed.writes=LAZY' \
    --hoodie-conf 'hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider'

The cleaner fails when running together with both u1 and u2 jobs.

Logs


retrieving :: org.apache.spark#spark-submit-parent-bd84a89e-a4f1-4b97-a1d5-225d51043b44
    confs: [default]
    0 artifacts copied, 1 already retrieved (0kB/3ms)
24/06/01 10:16:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/06/01 10:16:11 INFO SparkContext: Running Spark version 3.4.0
24/06/01 10:16:11 INFO ResourceUtils: ==============================================================
24/06/01 10:16:11 INFO ResourceUtils: No custom resources configured for spark.driver.
24/06/01 10:16:11 INFO ResourceUtils: ==============================================================
24/06/01 10:16:11 INFO SparkContext: Submitted application: hoodie-cleaner-table_name=orders
24/06/01 10:16:11 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
24/06/01 10:16:11 INFO ResourceProfile: Limiting resource is cpu
24/06/01 10:16:11 INFO ResourceProfileManager: Added ResourceProfile id: 0
24/06/01 10:16:11 INFO SecurityManager: Changing view acls to: soumilshah
24/06/01 10:16:11 INFO SecurityManager: Changing modify acls to: soumilshah
24/06/01 10:16:11 INFO SecurityManager: Changing view acls groups to: 
24/06/01 10:16:11 INFO SecurityManager: Changing modify acls groups to: 
24/06/01 10:16:11 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: soumilshah; groups with view permissions: EMPTY; users with modify permissions: soumilshah; groups with modify permissions: EMPTY
24/06/01 10:16:11 INFO deprecation: mapred.output.compression.codec is deprecated. Instead, use mapreduce.output.fileoutputformat.compress.codec
24/06/01 10:16:11 INFO deprecation: mapred.output.compress is deprecated. Instead, use mapreduce.output.fileoutputformat.compress
24/06/01 10:16:11 INFO deprecation: mapred.output.compression.type is deprecated. Instead, use mapreduce.output.fileoutputformat.compress.type
24/06/01 10:16:11 INFO Utils: Successfully started service 'sparkDriver' on port 65007.
24/06/01 10:16:11 INFO SparkEnv: Registering MapOutputTracker
24/06/01 10:16:11 INFO SparkEnv: Registering BlockManagerMaster
24/06/01 10:16:11 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
24/06/01 10:16:11 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
24/06/01 10:16:11 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
24/06/01 10:16:11 INFO DiskBlockManager: Created local directory at /private/var/folders/qq/s_1bjv516pn_mck29cwdwxnm0000gp/T/blockmgr-2e982a52-ad15-457a-8856-7b9857ec303d
24/06/01 10:16:11 INFO MemoryStore: MemoryStore started with capacity 434.4 MiB
24/06/01 10:16:11 INFO SparkEnv: Registering OutputCommitCoordinator
24/06/01 10:16:11 INFO JettyUtils: Start Jetty 0.0.0.0:8090 for SparkUI
24/06/01 10:16:11 INFO Utils: Successfully started service 'SparkUI' on port 8090.
24/06/01 10:16:11 INFO SparkContext: Added JAR file:///Users/soumilshah/.ivy2/jars/org.apache.hudi_hudi-spark3.4-bundle_2.12-0.14.0.jar at spark://10.14.129.125:65007/jars/org.apache.hudi_hudi-spark3.4-bundle_2.12-0.14.0.jar with timestamp 1717251371606
24/06/01 10:16:11 INFO SparkContext: Added JAR file:/Users/soumilshah/IdeaProjects/SparkProject/deltastreamerBroadcastJoins/jar/hudi-utilities-slim-bundle_2.12-0.14.0.jar at spark://10.14.129.125:65007/jars/hudi-utilities-slim-bundle_2.12-0.14.0.jar with timestamp 1717251371606
24/06/01 10:16:11 INFO Executor: Starting executor ID driver on host 10.14.129.125
24/06/01 10:16:11 INFO Executor: Starting executor with user classpath (userClassPathFirst = false): ''
24/06/01 10:16:11 INFO Executor: Fetching spark://10.14.129.125:65007/jars/hudi-utilities-slim-bundle_2.12-0.14.0.jar with timestamp 1717251371606
24/06/01 10:16:11 INFO TransportClientFactory: Successfully created connection to /10.14.129.125:65007 after 15 ms (0 ms spent in bootstraps)
24/06/01 10:16:11 INFO Utils: Fetching spark://10.14.129.125:65007/jars/hudi-utilities-slim-bundle_2.12-0.14.0.jar to /private/var/folders/qq/s_1bjv516pn_mck29cwdwxnm0000gp/T/spark-61fbec77-5617-4513-8b0d-a0b6101feee8/userFiles-ecf41982-1cd7-4af4-a911-e46b337eb2dc/fetchFileTemp10727026823756083903.tmp
24/06/01 10:16:12 INFO Executor: Adding file:/private/var/folders/qq/s_1bjv516pn_mck29cwdwxnm0000gp/T/spark-61fbec77-5617-4513-8b0d-a0b6101feee8/userFiles-ecf41982-1cd7-4af4-a911-e46b337eb2dc/hudi-utilities-slim-bundle_2.12-0.14.0.jar to class loader
24/06/01 10:16:12 INFO Executor: Fetching spark://10.14.129.125:65007/jars/org.apache.hudi_hudi-spark3.4-bundle_2.12-0.14.0.jar with timestamp 1717251371606
24/06/01 10:16:12 INFO Utils: Fetching spark://10.14.129.125:65007/jars/org.apache.hudi_hudi-spark3.4-bundle_2.12-0.14.0.jar to /private/var/folders/qq/s_1bjv516pn_mck29cwdwxnm0000gp/T/spark-61fbec77-5617-4513-8b0d-a0b6101feee8/userFiles-ecf41982-1cd7-4af4-a911-e46b337eb2dc/fetchFileTemp13175198092609918093.tmp
24/06/01 10:16:12 INFO Executor: Adding file:/private/var/folders/qq/s_1bjv516pn_mck29cwdwxnm0000gp/T/spark-61fbec77-5617-4513-8b0d-a0b6101feee8/userFiles-ecf41982-1cd7-4af4-a911-e46b337eb2dc/org.apache.hudi_hudi-spark3.4-bundle_2.12-0.14.0.jar to class loader
24/06/01 10:16:12 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 65010.
24/06/01 10:16:12 INFO NettyBlockTransferService: Server created on 10.14.129.125:65010
24/06/01 10:16:12 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
24/06/01 10:16:12 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 10.14.129.125, 65010, None)
24/06/01 10:16:12 INFO BlockManagerMasterEndpoint: Registering block manager 10.14.129.125:65010 with 434.4 MiB RAM, BlockManagerId(driver, 10.14.129.125, 65010, None)
24/06/01 10:16:12 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 10.14.129.125, 65010, None)
24/06/01 10:16:12 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 10.14.129.125, 65010, None)
24/06/01 10:16:12 WARN DFSPropertiesConfiguration: Cannot find HUDI_CONF_DIR, please set it as the dir of hudi-defaults.conf
24/06/01 10:16:12 WARN DFSPropertiesConfiguration: Properties file file:/etc/hudi/conf/hudi-defaults.conf not found. Ignoring to load props file
24/06/01 10:16:12 INFO HoodieCleaner: Creating Cleaner with configs : {hoodie.cleaner.fileversions.retained=1, hoodie.clean.automatic=true, hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider, hoodie.cleaner.policy.failed.writes=LAZY, hoodie.write.concurrency.mode=optimistic_concurrency_control, hoodie.cleaner.policy=KEEP_LATEST_FILE_VERSIONS, hoodie.clean.async=true, hoodie.cleaner.parallelism=100}
24/06/01 10:16:12 INFO HoodieWriteConfig: Automatically set hoodie.cleaner.policy.failed.writes=LAZY since optimistic concurrency control is used
24/06/01 10:16:12 INFO EmbeddedTimelineService: Starting Timeline service !!
24/06/01 10:16:12 INFO EmbeddedTimelineService: Overriding hostIp to (10.14.129.125) found in spark-conf. It was null
24/06/01 10:16:12 INFO FileSystemViewManager: Creating View Manager with storage type :MEMORY
24/06/01 10:16:12 INFO FileSystemViewManager: Creating in-memory based Table View
24/06/01 10:16:12 INFO log: Logging initialized @2012ms to org.apache.hudi.org.eclipse.jetty.util.log.Slf4jLog
24/06/01 10:16:12 INFO Javalin: 
       __                      __ _            __ __
      / /____ _ _   __ ____ _ / /(_)____      / // /
 __  / // __ `/| | / // __ `// // // __ \    / // /_
/ /_/ // /_/ / | |/ // /_/ // // // / / /   /__  __/
\____/ \__,_/  |___/ \__,_//_//_//_/ /_/      /_/

          https://javalin.io/documentation

24/06/01 10:16:12 INFO Javalin: Starting Javalin ...
24/06/01 10:16:12 INFO Javalin: You are running Javalin 4.6.7 (released October 24, 2022. Your Javalin version is 586 days old. Consider checking for a newer version.).
24/06/01 10:16:12 INFO Server: jetty-9.4.48.v20220622; built: 2022-06-21T20:42:25.880Z; git: 6b67c5719d1f4371b33655ff2d047d24e171e49a; jvm 11.0.22+0
24/06/01 10:16:12 INFO Server: Started @2143ms
24/06/01 10:16:12 INFO Javalin: Listening on http://localhost:65011/
24/06/01 10:16:12 INFO Javalin: Javalin started in 68ms \o/
24/06/01 10:16:12 INFO TimelineService: Starting Timeline server on port :65011
24/06/01 10:16:12 INFO EmbeddedTimelineService: Started embedded timeline server at 10.14.129.125:65011
24/06/01 10:16:12 INFO BaseHoodieClient: Timeline Server already running. Not restarting the service
24/06/01 10:16:12 INFO CleanerUtils: Cleaned failed attempts if any
24/06/01 10:16:12 INFO HoodieTableMetaClient: Loading HoodieTableMetaClient from file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/
24/06/01 10:16:12 INFO HoodieTableConfig: Loading table properties from file:/Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/.hoodie/hoodie.properties
24/06/01 10:16:12 INFO HoodieTableMetaClient: Finished Loading Table of type COPY_ON_WRITE(version=1, baseFileFormat=PARQUET) from file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/
24/06/01 10:16:12 INFO HoodieTableMetaClient: Loading Active commit timeline for file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/
24/06/01 10:16:12 INFO HoodieActiveTimeline: Loaded instants upto : Option{val=[==>20240601101606185__commit__INFLIGHT__20240601101606821]}
24/06/01 10:16:12 INFO HoodieTableMetaClient: Loading HoodieTableMetaClient from file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/
24/06/01 10:16:12 INFO HoodieTableConfig: Loading table properties from file:/Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/.hoodie/hoodie.properties
24/06/01 10:16:12 INFO HoodieTableMetaClient: Finished Loading Table of type COPY_ON_WRITE(version=1, baseFileFormat=PARQUET) from file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/
24/06/01 10:16:12 INFO HoodieTableMetaClient: Loading HoodieTableMetaClient from file:/Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/.hoodie/metadata
24/06/01 10:16:12 INFO HoodieTableConfig: Loading table properties from file:/Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/.hoodie/metadata/.hoodie/hoodie.properties
24/06/01 10:16:12 INFO HoodieTableMetaClient: Finished Loading Table of type MERGE_ON_READ(version=1, baseFileFormat=HFILE) from file:/Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/.hoodie/metadata
24/06/01 10:16:12 INFO HoodieActiveTimeline: Loaded instants upto : Option{val=[==>20240601101606185__deltacommit__INFLIGHT__20240601101612494]}
24/06/01 10:16:12 INFO AbstractTableFileSystemView: Took 1 ms to read  0 instants, 0 replaced file groups
24/06/01 10:16:12 INFO ClusteringUtils: Found 0 files in pending clustering operations
24/06/01 10:16:12 INFO FileSystemViewManager: Creating View Manager with storage type :REMOTE_FIRST
24/06/01 10:16:12 INFO FileSystemViewManager: Creating remote first table view
24/06/01 10:16:12 INFO HoodieHeartbeatClient: Heartbeat not found in internal map, falling back to reading from DFS
24/06/01 10:16:12 INFO HoodieHeartbeatClient: Heartbeat not found in internal map, falling back to reading from DFS
24/06/01 10:16:12 INFO HoodieTableMetaClient: Loading HoodieTableMetaClient from file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/
24/06/01 10:16:12 INFO HoodieTableConfig: Loading table properties from file:/Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/.hoodie/hoodie.properties
24/06/01 10:16:12 INFO HoodieTableMetaClient: Finished Loading Table of type COPY_ON_WRITE(version=1, baseFileFormat=PARQUET) from file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/
24/06/01 10:16:12 INFO HoodieTableMetaClient: Loading Active commit timeline for file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/
24/06/01 10:16:12 INFO HoodieActiveTimeline: Loaded instants upto : Option{val=[==>20240601101606185__commit__INFLIGHT__20240601101606821]}
24/06/01 10:16:12 INFO HoodieTableMetaClient: Loading HoodieTableMetaClient from file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/
24/06/01 10:16:12 INFO HoodieTableConfig: Loading table properties from file:/Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/.hoodie/hoodie.properties
24/06/01 10:16:12 INFO HoodieTableMetaClient: Finished Loading Table of type COPY_ON_WRITE(version=1, baseFileFormat=PARQUET) from file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/
24/06/01 10:16:12 INFO HoodieTableMetaClient: Loading HoodieTableMetaClient from file:/Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/.hoodie/metadata
24/06/01 10:16:12 INFO HoodieTableConfig: Loading table properties from file:/Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/.hoodie/metadata/.hoodie/hoodie.properties
24/06/01 10:16:12 INFO HoodieTableMetaClient: Finished Loading Table of type MERGE_ON_READ(version=1, baseFileFormat=HFILE) from file:/Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/.hoodie/metadata
24/06/01 10:16:12 INFO HoodieActiveTimeline: Loaded instants upto : Option{val=[==>20240601101606185__deltacommit__INFLIGHT__20240601101612494]}
24/06/01 10:16:12 INFO AbstractTableFileSystemView: Took 0 ms to read  0 instants, 0 replaced file groups
24/06/01 10:16:12 INFO ClusteringUtils: Found 0 files in pending clustering operations
24/06/01 10:16:12 INFO FileSystemViewManager: Creating View Manager with storage type :REMOTE_FIRST
24/06/01 10:16:12 INFO FileSystemViewManager: Creating remote first table view
24/06/01 10:16:12 INFO BaseHoodieWriteClient: Cleaner started
24/06/01 10:16:12 INFO HoodieTableMetaClient: Loading HoodieTableMetaClient from file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/
24/06/01 10:16:12 INFO HoodieTableConfig: Loading table properties from file:/Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/.hoodie/hoodie.properties
24/06/01 10:16:12 INFO HoodieTableMetaClient: Finished Loading Table of type COPY_ON_WRITE(version=1, baseFileFormat=PARQUET) from file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/
24/06/01 10:16:12 INFO HoodieTableMetaClient: Loading Active commit timeline for file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/
24/06/01 10:16:12 INFO HoodieActiveTimeline: Loaded instants upto : Option{val=[==>20240601101606185__commit__INFLIGHT__20240601101606821]}
24/06/01 10:16:12 INFO HoodieTableMetaClient: Loading HoodieTableMetaClient from file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/
24/06/01 10:16:12 INFO HoodieTableConfig: Loading table properties from file:/Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/.hoodie/hoodie.properties
24/06/01 10:16:12 INFO HoodieTableMetaClient: Finished Loading Table of type COPY_ON_WRITE(version=1, baseFileFormat=PARQUET) from file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/
24/06/01 10:16:12 INFO HoodieTableMetaClient: Loading HoodieTableMetaClient from file:/Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/.hoodie/metadata
24/06/01 10:16:12 INFO HoodieTableConfig: Loading table properties from file:/Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/.hoodie/metadata/.hoodie/hoodie.properties
24/06/01 10:16:12 INFO HoodieTableMetaClient: Finished Loading Table of type MERGE_ON_READ(version=1, baseFileFormat=HFILE) from file:/Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/.hoodie/metadata
24/06/01 10:16:12 INFO HoodieActiveTimeline: Loaded instants upto : Option{val=[==>20240601101606185__deltacommit__INFLIGHT__20240601101612494]}
24/06/01 10:16:12 INFO AbstractTableFileSystemView: Took 0 ms to read  0 instants, 0 replaced file groups
24/06/01 10:16:12 INFO ClusteringUtils: Found 0 files in pending clustering operations
24/06/01 10:16:12 INFO FileSystemViewManager: Creating View Manager with storage type :REMOTE_FIRST
24/06/01 10:16:12 INFO FileSystemViewManager: Creating remote first table view
24/06/01 10:16:12 INFO BaseHoodieWriteClient: Scheduling cleaning at instant time :20240601101612647
24/06/01 10:16:12 INFO BaseHoodieClient: Stopping Timeline service !!
24/06/01 10:16:12 INFO EmbeddedTimelineService: Closing Timeline server
24/06/01 10:16:12 INFO TimelineService: Closing Timeline Service
24/06/01 10:16:12 INFO Javalin: Stopping Javalin ...
24/06/01 10:16:12 INFO Javalin: Javalin has stopped
24/06/01 10:16:12 INFO TimelineService: Closed Timeline Service
24/06/01 10:16:12 INFO EmbeddedTimelineService: Closed Timeline server
24/06/01 10:16:12 INFO TransactionManager: Transaction manager closed
24/06/01 10:16:12 INFO TransactionManager: Transaction manager closed
24/06/01 10:16:12 ERROR HoodieCleaner: Failed to run cleaning for file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=orders/
java.lang.ExceptionInInitializerError
    at java.base/java.lang.Class.forName0(Native Method)
    at java.base/java.lang.Class.forName(Class.java:398)
    at org.apache.avro.util.ClassUtils.forName(ClassUtils.java:95)
    at org.apache.avro.util.ClassUtils.forName(ClassUtils.java:75)
    at org.apache.avro.specific.SpecificData.lambda$getClass$2(SpecificData.java:257)
    at java.base/java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1705)
    at org.apache.avro.util.MapUtil.computeIfAbsent(MapUtil.java:42)
    at org.apache.avro.specific.SpecificData.getClass(SpecificData.java:255)
    at org.apache.avro.specific.SpecificData.newRecord(SpecificData.java:488)
    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:237)
    at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180)
    at org.apache.avro.generic.GenericDatumReader.readMap(GenericDatumReader.java:355)
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:186)
    at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:136)
    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:248)
    at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154)
    at org.apache.avro.file.DataFileStream.next(DataFileStream.java:263)
    at org.apache.avro.file.DataFileStream.next(DataFileStream.java:248)
    at org.apache.hudi.common.table.timeline.TimelineMetadataUtils.deserializeAvroMetadata(TimelineMetadataUtils.java:209)
    at org.apache.hudi.common.table.timeline.TimelineMetadataUtils.deserializeHoodieCleanMetadata(TimelineMetadataUtils.java:173)
    at org.apache.hudi.table.action.clean.CleanPlanActionExecutor.getCommitsSinceLastCleaning(CleanPlanActionExecutor.java:74)
    at org.apache.hudi.table.action.clean.CleanPlanActionExecutor.needsCleaning(CleanPlanActionExecutor.java:89)
    at org.apache.hudi.table.action.clean.CleanPlanActionExecutor.execute(CleanPlanActionExecutor.java:173)
    at org.apache.hudi.table.HoodieSparkCopyOnWriteTable.scheduleCleaning(HoodieSparkCopyOnWriteTable.java:217)
    at org.apache.hudi.client.BaseHoodieTableServiceClient.scheduleTableServiceInternal(BaseHoodieTableServiceClient.java:628)
    at org.apache.hudi.client.BaseHoodieTableServiceClient.clean(BaseHoodieTableServiceClient.java:751)
    at org.apache.hudi.client.BaseHoodieWriteClient.clean(BaseHoodieWriteClient.java:861)
    at org.apache.hudi.client.BaseHoodieWriteClient.clean(BaseHoodieWriteClient.java:834)
    at org.apache.hudi.client.BaseHoodieWriteClient.clean(BaseHoodieWriteClient.java:865)
    at org.apache.hudi.utilities.HoodieCleaner.run(HoodieCleaner.java:70)
    at org.apache.hudi.utilities.HoodieCleaner.main(HoodieCleaner.java:114)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
    at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1020)
    at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:192)
    at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:215)
    at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91)
    at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1111)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1120)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.IllegalStateException: Recursive update
    at java.base/java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1760)
    at org.apache.avro.util.MapUtil.computeIfAbsent(MapUtil.java:42)
    at org.apache.avro.specific.SpecificData.getClass(SpecificData.java:255)
    at org.apache.avro.specific.SpecificData.getForSchema(SpecificData.java:162)
    at org.apache.avro.specific.SpecificDatumWriter.<init>(SpecificDatumWriter.java:47)
    at org.apache.hudi.avro.model.HoodieCleanPartitionMetadata.<clinit>(HoodieCleanPartitionMetadata.java:532)
    ... 47 more
24/06/01 10:16:12 INFO SparkContext: SparkContext is stopping with exitCode 0.
24/06/01 10:16:12 INFO SparkUI: Stopped Spark web UI at http://10.14.129.125:8090
24/06/01 10:16:12 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
24/06/01 10:16:12 INFO MemoryStore: MemoryStore cleared
24/06/01 10:16:12 INFO BlockManager: BlockManager stopped
24/06/01 10:16:12 INFO BlockManagerMaster: BlockManagerMaster stopped
24/06/01 10:16:12 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
24/06/01 10:16:12 INFO SparkContext: Successfully stopped SparkContext
24/06/01 10:16:12 INFO ShutdownHookManager: Shutdown hook called
24/06/01 10:16:12 INFO ShutdownHookManager: Deleting directory /private/var/folders/qq/s_1bjv516pn_mck29cwdwxnm0000gp/T/spark-9d9ed177-add3-4aea-bd51-af54389afe1e
24/06/01 10:16:12 INFO ShutdownHookManager: Deleting directory /private/var/folders/qq/s_1bjv516pn_mck29cwdwxnm0000gp/T/spark-61fbec77-5617-4513-8b0d-a0b6101feee8
(base) soumilshah@Soumils-MacBook-Pro conflictdetection % 

U1.py

try:
    import os
    import sys
    import uuid
    import pyspark
    import datetime
    from pyspark.sql import SparkSession
    from pyspark import SparkConf, SparkContext
    from faker import Faker
    import datetime
    from datetime import datetime
    import random
    import pandas as pd
    from pyspark.sql.types import StructType, StructField, StringType, DateType, FloatType
    from pyspark.sql.functions import col

    print("Imports loaded ")

except Exception as e:
    print("error", e)

HUDI_VERSION = '0.14.0'
SPARK_VERSION = '3.4'

os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@11"
SUBMIT_ARGS = f"--packages org.apache.hudi:hudi-spark{SPARK_VERSION}-bundle_2.12:{HUDI_VERSION} pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable

spark = SparkSession.builder \
    .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
    .config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
    .config('className', 'org.apache.hudi') \
    .config('spark.sql.hive.convertMetastoreParquet', 'false') \
    .getOrCreate()

schema = StructType([
    StructField("orderID", StringType(), True),
    StructField("productSKU", StringType(), True),
    StructField("customerID", StringType(), True),
    StructField("orderDate", StringType(), True),
    StructField("orderAmount", FloatType(), True),
    StructField("state", StringType(), True)
])

def write_to_hudi(spark_df,
                  table_name,
                  db_name,
                  method='upsert',
                  table_type='COPY_ON_WRITE',
                  recordkey='',
                  precombine='',
                  partition_fields='',
                  index_type='BLOOM'
                  ):
    path = f"file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database={db_name}/table_name={table_name}"

    hudi_options = {
        'hoodie.table.name': table_name,
        'hoodie.datasource.write.table.type': table_type,
        'hoodie.datasource.write.table.name': table_name,
        'hoodie.datasource.write.operation': method,
        'hoodie.datasource.write.recordkey.field': recordkey,
        'hoodie.datasource.write.precombine.field': precombine,
        "hoodie.datasource.write.partitionpath.field": partition_fields,

        # "hoodie.clean.automatic":"false",
        # " hoodie.clean.async":"true",

        "hoodie.write.concurrency.mode": "optimistic_concurrency_control",
        "hoodie.cleaner.policy.failed.writes": "LAZY",
        "hoodie.write.lock.provider": "org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider",

    }
    print(hudi_options)

    print("\n")
    print(path)
    print("\n")

    spark_df.write.format("hudi"). \
        options(**hudi_options). \
        mode("append"). \
        save(path)

# Initial data
data = [
    ("order1", "prod001##", "cust001", "2024-01-15", 150.00, "CA"),
    ("order006", "prod006##", "cust006", "2024-01-20", 350.00, "CA"),
]

# Loop to update productSKU and write to Hudi
for i in range(1, 100):  # Number of iterations
    # Update productSKU
    updated_data = [(orderID, f"{productSKU[:-1]}update{i}", customerID, orderDate, orderAmount, state)
                    for (orderID, productSKU, customerID, orderDate, orderAmount, state) in data]

    # Create the DataFrame with updated data
    df = spark.createDataFrame(updated_data, schema)

    # Show the DataFrame with the updated "productSKU" column
    df.show()

    # Write to Hudi
    write_to_hudi(
        spark_df=df,
        method="upsert",
        db_name="default",
        table_name="orders",
        recordkey="orderID",
        precombine="orderDate",
        partition_fields="state",
        index_type="RECORD_INDEX"
    )
    import time
    time.sleep(3)

spark.stop()

u2.py

try:
    import os
    import sys
    import uuid
    import pyspark
    import datetime
    from pyspark.sql import SparkSession
    from pyspark import SparkConf, SparkContext
    from faker import Faker
    import datetime
    from datetime import datetime
    import random
    import pandas as pd
    from pyspark.sql.types import StructType, StructField, StringType, DateType, FloatType
    from pyspark.sql.functions import col

    print("Imports loaded ")

except Exception as e:
    print("error", e)

HUDI_VERSION = '0.14.0'
SPARK_VERSION = '3.4'

os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@11"
SUBMIT_ARGS = f"--packages org.apache.hudi:hudi-spark{SPARK_VERSION}-bundle_2.12:{HUDI_VERSION} pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable

spark = SparkSession.builder \
    .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
    .config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
    .config('className', 'org.apache.hudi') \
    .config('spark.sql.hive.convertMetastoreParquet', 'false') \
    .getOrCreate()

schema = StructType([
    StructField("orderID", StringType(), True),
    StructField("productSKU", StringType(), True),
    StructField("customerID", StringType(), True),
    StructField("orderDate", StringType(), True),
    StructField("orderAmount", FloatType(), True),
    StructField("state", StringType(), True)
])

def write_to_hudi(spark_df,
                  table_name,
                  db_name,
                  method='upsert',
                  table_type='COPY_ON_WRITE',
                  recordkey='',
                  precombine='',
                  partition_fields='',
                  index_type='BLOOM'
                  ):
    path = f"file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database={db_name}/table_name={table_name}"

    hudi_options = {
        'hoodie.table.name': table_name,
        'hoodie.datasource.write.table.type': table_type,
        'hoodie.datasource.write.table.name': table_name,
        'hoodie.datasource.write.operation': method,
        'hoodie.datasource.write.recordkey.field': recordkey,
        'hoodie.datasource.write.precombine.field': precombine,
        "hoodie.datasource.write.partitionpath.field": partition_fields,

        # "hoodie.clean.automatic":"false",
        # " hoodie.clean.async":"true",

        "hoodie.write.concurrency.mode": "optimistic_concurrency_control",
        "hoodie.cleaner.policy.failed.writes": "LAZY",
        "hoodie.write.lock.provider": "org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider",

    }
    print(hudi_options)

    print("\n")
    print(path)
    print("\n")

    spark_df.write.format("hudi"). \
        options(**hudi_options). \
        mode("append"). \
        save(path)

# Initial data
data = [
    ("order002", "prod002", "cust002", "2024-01-16", 200.00, "NY"),
    ("order007", "prod007", "cust007", "2024-01-21", 400.00, "NY")
]

# Loop to update productSKU and write to Hudi
for i in range(1, 100):  # Number of iterations
    # Update productSKU
    updated_data = [(orderID, f"{productSKU[:-1]}update{i}", customerID, orderDate, orderAmount, state)
                    for (orderID, productSKU, customerID, orderDate, orderAmount, state) in data]

    # Create the DataFrame with updated data
    df = spark.createDataFrame(updated_data, schema)

    # Show the DataFrame with the updated "productSKU" column
    df.show()

    # Write to Hudi
    write_to_hudi(
        spark_df=df,
        method="upsert",
        db_name="default",
        table_name="orders",
        recordkey="orderID",
        precombine="orderDate",
        partition_fields="state",
        index_type="RECORD_INDEX"
    )
    import time

    time.sleep(2)

spark.stop()

Any insights or suggestions on resolving this issue would be greatly appreciated.

nsivabalan commented 1 month ago

did you try disabling cleaning in one writer while enabling it only in the other one. Essentially, in a multi writer set up, its good to have all table services run with just 1 writer while all other writes just do ingestion. can you try that.