apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.44k stars 2.42k forks source link

[SUPPORT] - Debezium PostgreSQL #8159

Closed lenhardtx closed 1 year ago

lenhardtx commented 1 year ago

I have an environment with Oracle replicating to PostgreSQL via kafka (about 2000 tables) and I intend to put another connector with Debezium in PostgreSQL to use the approach described in this guide -> https://hudi.apache.org/blog/2022/01 /14/change-data-capture-with-debezium-and-apache-hudi/.

I created a replication of a table to test the solution but I am facing the error shown below.

Caused by: java.lang.NoSuchMethodError: 'org.apache.spark.sql.catalyst.plans.logical.LogicalPlan org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(scala.PartialFunction)'

Are there any plans to make a source class available for Debezium Oracle Envelope?

Environment Description

Additional context Job: [root@spark-hudi ~]# cat testeHudi_v3.sh

!/bin/bash

spark-submit --name "hudi_sbr_ped_venda" --master spark://0.0.0.0:7077 --deploy-mode client --driver-memory 1G \ --packages org.apache.hadoop:hadoop-aws:3.3.4 \ --jars "/root/hudi/hudi-utilities-bundle_2.12-0.13.0.jar,/root/hudi/spark-avro_2.13-3.3.2.jar" \ --conf spark.executor.memory=2g --conf spark.cores.max=100 \ --conf 'spark.hadoop.fs.s3a.access.key=admin' \ --conf 'spark.hadoop.fs.s3a.secret.key=XXXXXXX'\ --conf 'spark.hadoop.fs.s3a.endpoint=http://0.0.0.0:9000' \ --conf 'spark.hadoop.fs.s3a.path.style.access=true' \ --conf 'fs.s3a.signing-algorithm=S3SignerType' \ --conf 'spark.sql.catalogImplementation=hive' \ --conf 'spark.debug.maxToStringFields=500' \ --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer /root/hudi/hudi-utilities-bundle_2.12-0.13.0.jar \ --table-type COPY_ON_WRITE --op UPSERT \ --target-base-path s3a://hudi/sbr_ped_venda \ --target-table sbr_ped_venda --continuous \ --min-sync-interval-seconds 60 \ --source-class org.apache.hudi.utilities.sources.debezium.PostgresDebeziumSource \ --source-ordering-field _event_lsn \ --payload-class org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload \ --hoodie-conf schema.registry.url=http://0.0.0.0:8081 \ --hoodie-conf hoodie.deltastreamer.schemaprovider.registry.url=http://0.0.0.0:8081/subjects/pgprd.public.sbr_ped_venda-value/versions/latest \ --hoodie-conf bootstrap.servers=0.0.0.0:9092 \ --hoodie-conf hoodie.deltastreamer.source.kafka.value.deserializer.class=io.confluent.kafka.serializers.KafkaAvroDeserializer \ --hoodie-conf hoodie.deltastreamer.source.kafka.topic=pgprd.public.sbr_ped_venda \ --hoodie-conf hoodie.deltastreamer.source.kafka.group.id=datalake \ --hoodie-conf auto.offset.reset=earliest \ --hoodie-conf group.id=datalake \ --hoodie-conf hoodie.datasource.write.recordkey.field=id \ --hoodie-conf validate.non.null=false \ --hoodie-conf hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.NonpartitionedKeyGenerator \ --hoodie-conf hoodie.datasource.write.hive_style_partitioning=false

I've tried several ways, this is the last test I did.

image

Stacktrace [root@spark-hudi ~]# ./testeHudi_v3.sh Warning: Ignoring non-Spark config property: fs.s3a.signing-algorithm :: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml Ivy Default Cache set to: /root/.ivy2/cache The jars for the packages stored in: /root/.ivy2/jars org.apache.hadoop#hadoop-aws added as a dependency :: resolving dependencies :: org.apache.spark#spark-submit-parent-5dcabf23-e567-4306-b4b1-acb277c8e07d;1.0 confs: [default] found org.apache.hadoop#hadoop-aws;3.3.4 in central found com.amazonaws#aws-java-sdk-bundle;1.12.262 in central found org.wildfly.openssl#wildfly-openssl;1.0.7.Final in central :: resolution report :: resolve 466ms :: artifacts dl 25ms :: modules in use: com.amazonaws#aws-java-sdk-bundle;1.12.262 from central in [default] org.apache.hadoop#hadoop-aws;3.3.4 from central in [default] org.wildfly.openssl#wildfly-openssl;1.0.7.Final from central in [default]

    |                  |            modules            ||   artifacts   |
    |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
    ---------------------------------------------------------------------
    |      default     |   3   |   0   |   0   |   0   ||   3   |   0   |
    ---------------------------------------------------------------------

:: retrieving :: org.apache.spark#spark-submit-parent-5dcabf23-e567-4306-b4b1-acb277c8e07d confs: [default] 0 artifacts copied, 3 already retrieved (0kB/14ms) 23/03/12 11:40:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 23/03/12 11:40:39 WARN SchedulerConfGenerator: Job Scheduling Configs will not be in effect as spark.scheduler.mode is not set to FAIR at instantiation time. Continuing without scheduling configs 23/03/12 11:40:39 INFO SparkContext: Running Spark version 3.3.2 23/03/12 11:40:39 INFO ResourceUtils: ============================================================== 23/03/12 11:40:39 INFO ResourceUtils: No custom resources configured for spark.driver. 23/03/12 11:40:39 INFO ResourceUtils: ============================================================== 23/03/12 11:40:39 INFO SparkContext: Submitted application: delta-streamer-sbr_ped_venda 23/03/12 11:40:39 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 2048, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0) 23/03/12 11:40:39 INFO ResourceProfile: Limiting resource is cpu 23/03/12 11:40:39 INFO ResourceProfileManager: Added ResourceProfile id: 0 23/03/12 11:40:39 INFO SecurityManager: Changing view acls to: root 23/03/12 11:40:39 INFO SecurityManager: Changing modify acls to: root 23/03/12 11:40:39 INFO SecurityManager: Changing view acls groups to: 23/03/12 11:40:39 INFO SecurityManager: Changing modify acls groups to: 23/03/12 11:40:39 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); groups with view permissions: Set(); users with modify permissions: Set(root); groups with modify permissions: Set() 23/03/12 11:40:39 INFO deprecation: mapred.output.compression.codec is deprecated. Instead, use mapreduce.output.fileoutputformat.compress.codec 23/03/12 11:40:39 INFO deprecation: mapred.output.compression.type is deprecated. Instead, use mapreduce.output.fileoutputformat.compress.type 23/03/12 11:40:39 INFO deprecation: mapred.output.compress is deprecated. Instead, use mapreduce.output.fileoutputformat.compress 23/03/12 11:40:40 INFO Utils: Successfully started service 'sparkDriver' on port 45603. 23/03/12 11:40:40 INFO SparkEnv: Registering MapOutputTracker 23/03/12 11:40:40 INFO SparkEnv: Registering BlockManagerMaster 23/03/12 11:40:40 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information 23/03/12 11:40:40 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up 23/03/12 11:40:40 INFO SparkEnv: Registering BlockManagerMasterHeartbeat 23/03/12 11:40:40 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-8cfb4a38-de92-4ea7-881f-e49d4d4750ea 23/03/12 11:40:40 INFO MemoryStore: MemoryStore started with capacity 434.4 MiB 23/03/12 11:40:40 INFO SparkEnv: Registering OutputCommitCoordinator 23/03/12 11:40:40 INFO Utils: Successfully started service 'SparkUI' on port 8090. 23/03/12 11:40:40 INFO SparkContext: Added JAR file:///root/hudi/hudi-utilities-bundle_2.12-0.13.0.jar at spark://spark-hudi.smartbr.com:45603/jars/hudi-utilities-bundle_2.12-0.13.0.jar with timestamp 1678632039164 23/03/12 11:40:40 INFO SparkContext: Added JAR file:///root/hudi/spark-avro_2.13-3.3.2.jar at spark://spark-hudi.smartbr.com:45603/jars/spark-avro_2.13-3.3.2.jar with timestamp 1678632039164 23/03/12 11:40:40 INFO SparkContext: Added JAR file:///root/.ivy2/jars/org.apache.hadoop_hadoop-aws-3.3.4.jar at spark://spark-hudi.smartbr.com:45603/jars/org.apache.hadoop_hadoop-aws-3.3.4.jar with timestamp 1678632039164 23/03/12 11:40:40 INFO SparkContext: Added JAR file:///root/.ivy2/jars/com.amazonaws_aws-java-sdk-bundle-1.12.262.jar at spark://spark-hudi.smartbr.com:45603/jars/com.amazonaws_aws-java-sdk-bundle-1.12.262.jar with timestamp 1678632039164 23/03/12 11:40:40 INFO SparkContext: Added JAR file:///root/.ivy2/jars/org.wildfly.openssl_wildfly-openssl-1.0.7.Final.jar at spark://spark-hudi.smartbr.com:45603/jars/org.wildfly.openssl_wildfly-openssl-1.0.7.Final.jar with timestamp 1678632039164 23/03/12 11:40:40 INFO SparkContext: The JAR file:/root/hudi/hudi-utilities-bundle_2.12-0.13.0.jar at spark://spark-hudi.smartbr.com:45603/jars/hudi-utilities-bundle_2.12-0.13.0.jar has been added already. Overwriting of added jar is not supported in the current version. 23/03/12 11:40:40 INFO StandaloneAppClient$ClientEndpoint: Connecting to master spark://0.0.0.0:7077... 23/03/12 11:40:41 INFO TransportClientFactory: Successfully created connection to /0.0.0.0.0:7077 after 50 ms (0 ms spent in bootstraps) 23/03/12 11:40:41 INFO StandaloneSchedulerBackend: Connected to Spark cluster with app ID app-20230312114041-0014 23/03/12 11:40:41 INFO StandaloneAppClient$ClientEndpoint: Executor added: app-20230312114041-0014/0 on worker-20230312102039-0.0.0.0-40923 (192.168.200.15:40923) with 2 core(s) 23/03/12 11:40:41 INFO StandaloneSchedulerBackend: Granted executor ID app-20230312114041-0014/0 on hostPort 0.0.0.0:40923 with 2 core(s), 2.0 GiB RAM 23/03/12 11:40:41 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 44591. 23/03/12 11:40:41 INFO NettyBlockTransferService: Server created on spark-hudi.smartbr.com:44591 23/03/12 11:40:41 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy 23/03/12 11:40:41 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, spark-hudi.smartbr.com, 44591, None) 23/03/12 11:40:41 INFO BlockManagerMasterEndpoint: Registering block manager spark-hudi:44591 with 434.4 MiB RAM, BlockManagerId(driver, spark-hudi, 44591, None) 23/03/12 11:40:41 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, spark-hudi, 44591, None) 23/03/12 11:40:41 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, spark-hudi, 44591, None) 23/03/12 11:40:41 INFO StandaloneAppClient$ClientEndpoint: Executor updated: app-20230312114041-0014/0 is now RUNNING 23/03/12 11:40:41 INFO StandaloneSchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.0 23/03/12 11:40:42 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties 23/03/12 11:40:42 INFO MetricsSystemImpl: Scheduled Metric snapshot period at 10 second(s). 23/03/12 11:40:42 INFO MetricsSystemImpl: s3a-file-system metrics system started 23/03/12 11:40:44 WARN DFSPropertiesConfiguration: Cannot find HUDI_CONF_DIR, please set it as the dir of hudi-defaults.conf 23/03/12 11:40:44 WARN DFSPropertiesConfiguration: Properties file file:/etc/hudi/conf/hudi-defaults.conf not found. Ignoring to load props file 23/03/12 11:40:44 INFO UtilHelpers: Adding overridden properties to file properties. 23/03/12 11:40:44 WARN SparkContext: Using an existing SparkContext; some configuration may not take effect. 23/03/12 11:40:45 INFO HoodieTableMetaClient: Loading HoodieTableMetaClient from s3a://hudi/sbr_ped_venda 23/03/12 11:40:45 INFO HoodieTableConfig: Loading table properties from s3a://hudi/sbr_ped_venda/.hoodie/hoodie.properties 23/03/12 11:40:45 INFO HoodieTableMetaClient: Finished Loading Table of type COPY_ON_WRITE(version=1, baseFileFormat=PARQUET) from s3a://hudi/sbr_ped_venda 23/03/12 11:40:45 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir. 23/03/12 11:40:45 INFO SharedState: Warehouse path is 'file:/root/spark-warehouse'. 23/03/12 11:40:48 INFO HoodieDeltaStreamer: Creating delta streamer with configs: auto.offset.reset: earliest bootstrap.servers: 0.0.0.0:9092 group.id: datalake hoodie.auto.adjust.lock.configs: true hoodie.datasource.write.hive_style_partitioning: false hoodie.datasource.write.keygenerator.class: org.apache.hudi.keygen.NonpartitionedKeyGenerator hoodie.datasource.write.reconcile.schema: false hoodie.datasource.write.recordkey.field: id hoodie.deltastreamer.schemaprovider.registry.url: http://0.0.0.0:8081/subjects/pgprd.public.sbr_ped_venda-value/versions/latest hoodie.deltastreamer.source.kafka.group.id: datalake hoodie.deltastreamer.source.kafka.topic: pgprd.public.sbr_ped_venda hoodie.deltastreamer.source.kafka.value.deserializer.class: io.confluent.kafka.serializers.KafkaAvroDeserializer schema.registry.url: http://0.0.0.0.:8081 validate.non.null: false

23/03/12 11:40:48 INFO HoodieTableMetaClient: Loading HoodieTableMetaClient from s3a://hudi/sbr_ped_venda 23/03/12 11:40:48 INFO HoodieTableConfig: Loading table properties from s3a://hudi/sbr_ped_venda/.hoodie/hoodie.properties 23/03/12 11:40:48 INFO HoodieTableMetaClient: Finished Loading Table of type COPY_ON_WRITE(version=1, baseFileFormat=PARQUET) from s3a://hudi/sbr_ped_venda 23/03/12 11:40:48 INFO HoodieActiveTimeline: Loaded instants upto : Optional.empty 23/03/12 11:40:49 INFO HoodieTableMetaClient: Loading HoodieTableMetaClient from s3a://hudi/sbr_ped_venda 23/03/12 11:40:49 INFO HoodieTableConfig: Loading table properties from s3a://hudi/sbr_ped_venda/.hoodie/hoodie.properties 23/03/12 11:40:49 INFO HoodieTableMetaClient: Finished Loading Table of type COPY_ON_WRITE(version=1, baseFileFormat=PARQUET) from s3a://hudi/sbr_ped_venda 23/03/12 11:40:49 INFO HoodieActiveTimeline: Loaded instants upto : Optional.empty 23/03/12 11:40:49 INFO DeltaSync: Checkpoint to resume from : Optional.empty 23/03/12 11:40:49 INFO ConsumerConfig: ConsumerConfig values: allow.auto.create.topics = true auto.commit.interval.ms = 5000 auto.offset.reset = earliest bootstrap.servers = [0.0.0.0:9092] check.crcs = true client.dns.lookup = default client.id = client.rack = connections.max.idle.ms = 540000 default.api.timeout.ms = 60000 enable.auto.commit = true exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = datalake group.instance.id = null heartbeat.interval.ms = 3000 interceptor.classes = [] internal.leave.group.on.close = true isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 500 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT security.providers = null send.buffer.bytes = 131072 session.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = https ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class io.confluent.kafka.serializers.KafkaAvroDeserializer

23/03/12 11:40:49 INFO KafkaAvroDeserializerConfig: KafkaAvroDeserializerConfig values: bearer.auth.token = [hidden] schema.registry.url = [http://0.0.0.0:8081] basic.auth.user.info = [hidden] auto.register.schemas = true max.schemas.per.subject = 1000 basic.auth.credentials.source = URL schema.registry.basic.auth.user.info = [hidden] bearer.auth.credentials.source = STATIC_TOKEN specific.avro.reader = false value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy

23/03/12 11:40:49 WARN ConsumerConfig: The configuration 'validate.non.null' was supplied but isn't a known config. 23/03/12 11:40:49 WARN ConsumerConfig: The configuration 'hoodie.deltastreamer.source.kafka.value.deserializer.class' was supplied but isn't a known config. 23/03/12 11:40:49 INFO AppInfoParser: Kafka version: 2.4.1 23/03/12 11:40:49 INFO AppInfoParser: Kafka commitId: c57222ae8cd7866b 23/03/12 11:40:49 INFO AppInfoParser: Kafka startTimeMs: 1678632049794 23/03/12 11:40:50 INFO Metadata: [Consumer clientId=consumer-datalake-1, groupId=datalake] Cluster ID: 0EpmDQKcTSK7kGDKCviAwQ 23/03/12 11:40:50 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (192.168.200.15:56696) with ID 0, ResourceProfileId 0 23/03/12 11:40:50 INFO KafkaOffsetGen: SourceLimit not configured, set numEvents to default value : 5000000 23/03/12 11:40:50 INFO DebeziumSource: About to read 1532804 from Kafka for topic :pgprd.public.sbr_ped_venda 23/03/12 11:40:51 WARN KafkaUtils: overriding enable.auto.commit to false for executor 23/03/12 11:40:51 WARN KafkaUtils: overriding auto.offset.reset to none for executor 23/03/12 11:40:51 WARN KafkaUtils: overriding executor group.id to spark-executor-datalake 23/03/12 11:40:51 WARN KafkaUtils: overriding receive.buffer.bytes to 65536 see KAFKA-3135 23/03/12 11:40:51 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.200.15:33183 with 1048.8 MiB RAM, BlockManagerId(0, 0.0.0.0, 33183, None) 23/03/12 11:40:52 INFO SparkContext: Starting job: isEmpty at AvroConversionUtils.scala:120 23/03/12 11:40:52 INFO DAGScheduler: Got job 0 (isEmpty at AvroConversionUtils.scala:120) with 1 output partitions 23/03/12 11:40:52 INFO DAGScheduler: Final stage: ResultStage 0 (isEmpty at AvroConversionUtils.scala:120) 23/03/12 11:40:52 INFO DAGScheduler: Parents of final stage: List() 23/03/12 11:40:52 INFO DAGScheduler: Missing parents: List() 23/03/12 11:40:52 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[1] at map at DebeziumSource.java:159), which has no missing parents 23/03/12 11:40:52 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 6.0 KiB, free 434.4 MiB) 23/03/12 11:40:53 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 3.5 KiB, free 434.4 MiB) 23/03/12 11:40:53 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on spark-hudi.smartbr.com:44591 (size: 3.5 KiB, free: 434.4 MiB) 23/03/12 11:40:53 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1513 23/03/12 11:40:53 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (MapPartitionsRDD[1] at map at DebeziumSource.java:159) (first 15 tasks are for partitions Vector(0)) 23/03/12 11:40:53 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks resource profile 0 23/03/12 11:40:58 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0) (0.0.0.0, executor 0, partition 0, PROCESS_LOCAL, 4378 bytes) taskResourceAssignments Map() 23/03/12 11:40:59 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 0.0.0.0:33183 (size: 3.5 KiB, free: 1048.8 MiB) 23/03/12 11:41:02 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 3868 ms on 0.0.0.0 (executor 0) (1/1) 23/03/12 11:41:02 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 23/03/12 11:41:02 INFO DAGScheduler: ResultStage 0 (isEmpty at AvroConversionUtils.scala:120) finished in 9.974 s 23/03/12 11:41:02 INFO DAGScheduler: Job 0 is finished. Cancelling potential speculative or zombie tasks for this job 23/03/12 11:41:02 INFO TaskSchedulerImpl: Killing all running tasks in stage 0: Stage finished 23/03/12 11:41:02 INFO DAGScheduler: Job 0 finished: isEmpty at AvroConversionUtils.scala:120, took 10.185954 s 23/03/12 11:41:03 INFO BlockManagerInfo: Removed broadcast_0_piece0 on 0.0.0.0:33183 in memory (size: 3.5 KiB, free: 1048.8 MiB) 23/03/12 11:41:03 INFO BlockManagerInfo: Removed broadcast_0_piece0 on spark-hudi.smartbr.com:44591 in memory (size: 3.5 KiB, free: 434.4 MiB) 23/03/12 11:41:05 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'. 23/03/12 11:41:07 INFO DebeziumSource: Date fields: [] 23/03/12 11:41:07 INFO HoodieDeltaStreamer: Delta Sync shutdown. Error ?false 23/03/12 11:41:07 INFO HoodieDeltaStreamer: DeltaSync shutdown. Closing write client. Error?true 23/03/12 11:41:07 ERROR HoodieAsyncService: Service shutdown with error java.util.concurrent.ExecutionException: java.lang.NoSuchMethodError: 'org.apache.spark.sql.catalyst.plans.logical.LogicalPlan org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(scala.PartialFunction)' at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999) at org.apache.hudi.async.HoodieAsyncService.waitForShutdown(HoodieAsyncService.java:103) at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.lambda$sync$1(HoodieDeltaStreamer.java:195) at org.apache.hudi.common.util.Option.ifPresent(Option.java:97) at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:192) at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:573) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:958) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1046) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1055) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.NoSuchMethodError: 'org.apache.spark.sql.catalyst.plans.logical.LogicalPlan org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(scala.PartialFunction)' at org.apache.spark.sql.hudi.analysis.HoodiePruneFileSourcePartitions.apply(HoodiePruneFileSourcePartitions.scala:42) at org.apache.spark.sql.hudi.analysis.HoodiePruneFileSourcePartitions.apply(HoodiePruneFileSourcePartitions.scala:40) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:211) at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126) at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122) at scala.collection.immutable.List.foldLeft(List.scala:91) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:208) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:200) at scala.collection.immutable.List.foreach(List.scala:431) at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:200) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:179) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88) at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:179) at org.apache.spark.sql.execution.QueryExecution.$anonfun$optimizedPlan$1(QueryExecution.scala:126) at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:185) at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:185) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:184) at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:122) at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:118) at org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:136) at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:154) at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:151) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:173) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:172) at org.apache.spark.sql.Dataset.rdd$lzycompute(Dataset.scala:3396) at org.apache.spark.sql.Dataset.rdd(Dataset.scala:3394) at org.apache.hudi.utilities.sources.debezium.DebeziumSource.convertColumnToNullable(DebeziumSource.java:220) at org.apache.hudi.utilities.sources.debezium.DebeziumSource.toDataset(DebeziumSource.java:167) at org.apache.hudi.utilities.sources.debezium.DebeziumSource.fetchNextBatch(DebeziumSource.java:123) at org.apache.hudi.utilities.sources.RowSource.fetchNewData(RowSource.java:43) at org.apache.hudi.utilities.sources.Source.fetchNext(Source.java:76) at org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter.fetchNewDataInAvroFormat(SourceFormatAdapter.java:71) at org.apache.hudi.utilities.deltastreamer.DeltaSync.fetchFromSource(DeltaSync.java:530) at org.apache.hudi.utilities.deltastreamer.DeltaSync.readFromSource(DeltaSync.java:460) at org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:364) at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.lambda$startService$1(HoodieDeltaStreamer.java:716) at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) 23/03/12 11:41:07 INFO DeltaSync: Shutting down embedded timeline server 23/03/12 11:41:07 INFO SparkUI: Stopped Spark web UI at http://spark-hudi:8090 23/03/12 11:41:07 INFO StandaloneSchedulerBackend: Shutting down all executors 23/03/12 11:41:07 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Asking each executor to shut down 23/03/12 11:41:08 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 23/03/12 11:41:08 INFO MemoryStore: MemoryStore cleared 23/03/12 11:41:08 INFO BlockManager: BlockManager stopped 23/03/12 11:41:08 INFO BlockManagerMaster: BlockManagerMaster stopped 23/03/12 11:41:08 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 23/03/12 11:41:08 INFO SparkContext: Successfully stopped SparkContext Exception in thread "main" org.apache.hudi.exception.HoodieException: java.lang.NoSuchMethodError: 'org.apache.spark.sql.catalyst.plans.logical.LogicalPlan org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(scala.PartialFunction)' at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.lambda$sync$1(HoodieDeltaStreamer.java:197) at org.apache.hudi.common.util.Option.ifPresent(Option.java:97) at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:192) at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:573) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:958) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1046) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1055) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.util.concurrent.ExecutionException: java.lang.NoSuchMethodError: 'org.apache.spark.sql.catalyst.plans.logical.LogicalPlan org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(scala.PartialFunction)' at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999) at org.apache.hudi.async.HoodieAsyncService.waitForShutdown(HoodieAsyncService.java:103) at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.lambda$sync$1(HoodieDeltaStreamer.java:195) ... 15 more Caused by: java.lang.NoSuchMethodError: 'org.apache.spark.sql.catalyst.plans.logical.LogicalPlan org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(scala.PartialFunction)' at org.apache.spark.sql.hudi.analysis.HoodiePruneFileSourcePartitions.apply(HoodiePruneFileSourcePartitions.scala:42) at org.apache.spark.sql.hudi.analysis.HoodiePruneFileSourcePartitions.apply(HoodiePruneFileSourcePartitions.scala:40) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:211) at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126) at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122) at scala.collection.immutable.List.foldLeft(List.scala:91) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:208) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:200) at scala.collection.immutable.List.foreach(List.scala:431) at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:200) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:179) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88) at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:179) at org.apache.spark.sql.execution.QueryExecution.$anonfun$optimizedPlan$1(QueryExecution.scala:126) at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:185) at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:185) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:184) at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:122) at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:118) at org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:136) at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:154) at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:151) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:173) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:172) at org.apache.spark.sql.Dataset.rdd$lzycompute(Dataset.scala:3396) at org.apache.spark.sql.Dataset.rdd(Dataset.scala:3394) at org.apache.hudi.utilities.sources.debezium.DebeziumSource.convertColumnToNullable(DebeziumSource.java:220) at org.apache.hudi.utilities.sources.debezium.DebeziumSource.toDataset(DebeziumSource.java:167) at org.apache.hudi.utilities.sources.debezium.DebeziumSource.fetchNextBatch(DebeziumSource.java:123) at org.apache.hudi.utilities.sources.RowSource.fetchNewData(RowSource.java:43) at org.apache.hudi.utilities.sources.Source.fetchNext(Source.java:76) at org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter.fetchNewDataInAvroFormat(SourceFormatAdapter.java:71) at org.apache.hudi.utilities.deltastreamer.DeltaSync.fetchFromSource(DeltaSync.java:530) at org.apache.hudi.utilities.deltastreamer.DeltaSync.readFromSource(DeltaSync.java:460) at org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:364) at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.lambda$startService$1(HoodieDeltaStreamer.java:716) at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) 23/03/12 11:41:08 INFO ShutdownHookManager: Shutdown hook called 23/03/12 11:41:08 INFO ShutdownHookManager: Deleting directory /tmp/spark-5dccedc5-681f-446c-987b-0e989840983e 23/03/12 11:41:08 INFO ShutdownHookManager: Deleting directory /tmp/spark-c05b2c1a-a206-4c9d-9e2b-c4f414f7f03b 23/03/12 11:41:08 INFO MetricsSystemImpl: Stopping s3a-file-system metrics system... 23/03/12 11:41:08 INFO MetricsSystemImpl: s3a-file-system metrics system stopped. 23/03/12 11:41:08 INFO MetricsSystemImpl: s3a-file-system metrics system shutdown complete.

ad1happy2go commented 1 year ago

@lenhardtx There is known in-compatibility for spark 3.3.2 and Hudi 0.13.0: https://github.com/apache/hudi/pull/8082, can you try the patch to see if it resolves your problem?

ad1happy2go commented 1 year ago

@lenhardtx Did you got a chance to test out with the patch?

ad1happy2go commented 1 year ago

@lenhardtx Gentle ping.

lenhardtx commented 1 year ago

Sorry for late, I was passing the jars wrong, its works.

ad1happy2go commented 1 year ago

@lenhardtx Thanks. Closing out the issue then.