apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.43k stars 2.42k forks source link

[ISSUE] Hudi 0.13.0. Spark 3.3.2 Deltastreamed table read failure #9282

Open rmnlchh opened 1 year ago

rmnlchh commented 1 year ago

As part of our pipelines, we use tables that are being deltastreamed. Trying to upgrade to EMR 6.11 (which brings hudi 0.13.0/spark 3.3.2) we started facing issue which is discussed in https://github.com/apache/hudi/issues/8061#issuecomment-1447657892 The fix with sc.set("spark.sql.legacy.parquet.nanosAsLong", "false"); sc.set("spark.sql.parquet.binaryAsString", "false"); sc.set("spark.sql.parquet.int96AsTimestamp", "true"); sc.set("spark.sql.caseSensitive", "false"); worked for all the cases except for those where we call an action on a df created by reading delta streamed tables.

Steps to reproduce the behavior:

  1. Use hudi 0.13.0, spark 3.3.2
  2. Used spark configs: spark.shuffle.spill.compress -> true spark.serializer -> org.apache.spark.serializer.KryoSerializer spark.sql.warehouse.dir -> file:/XXX/cdp-datapipeline-curation/datalake-deltastreamer/spark-warehouse spark.sql.parquet.int96AsTimestamp -> true spark.io.compression.lz4.blockSize -> 64k spark.executor.extraJavaOptions -> -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED spark.driver.host -> 127.0.0.1 spark.sql.hive.convertMetastoreParquet -> false spark.broadcast.compress -> true spark.io.compression.codec -> snappy spark.sql.adaptive.skewJoin.enabled -> true spark.sql.parquet.binaryAsString -> false spark.driver.port -> 36083 spark.rdd.compress -> true spark.io.compression.zstd.level -> 1 spark.sql.caseSensitive -> false spark.shuffle.compress -> true spark.io.compression.zstd.bufferSize -> 64k spark.sql.catalog -> org.apache.spark.sql.hudi.catalog.HoodieCatalog spark.sql.parquet.int96RebaseModeInRead -> LEGACY spark.memory.storageFraction -> 0.20 spark.app.name -> CreativeDeltaStreamerTest-creative-deltastreamer-1689954313 spark.sql.parquet.datetimeRebaseModeInWrite -> LEGACY spark.sql.parquet.outputTimestampType -> TIMESTAMP_MICROS spark.sql.avro.datetimeRebaseModeInWrite -> LEGACY spark.sql.avro.compression.codec -> snappy spark.sql.legacy.parquet.nanosAsLong -> false spark.sql.extension -> org.apache.spark.sql.hudi.HoodieSparkSessionExtension spark.app.startTime -> 1689968713919 spark.executor.id -> driver spark.sql.parquet.enableVectorizedReader -> true spark.sql.legacy.timeParserPolicy -> LEGACY spark.driver.extraJavaOptions -> -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED spark.sql.parquet.datetimeRebaseModeInRead -> LEGACY spark.driver.memoryOverheadFactor -> 0.15 spark.master -> local[*] spark.sql.parquet.filterPushdown -> true spark.executor.cores -> 1 spark.memory.fraction -> 0.50 spark.sql.avro.datetimeRebaseModeInRead -> LEGACY spark.executor.memoryOverheadFactor -> 0.20 spark.sql.parquet.compression.codec -> snappy spark.sql.parquet.recordLevelFilter.enabled -> true spark.app.id -> local-1689968714613
  3. Used Delta streamer configs hoodie.datasource.hive_sync.database -> datalake_ods_local hoodie.datasource.hive_sync.support_timestamp -> true hoodie.datasource.write.precombine.field -> StartDateUtc hoodie.datasource.hive_sync.partition_fields -> CampaignId hoodie.metadata.index.column.stats.enable -> true hoodie.cleaner.fileversions.retained -> 2 hoodie.parquet.max.file.size -> 6291456 hoodie.datasource.write.keygenerator.consistent.logical.timestamp.enabled -> true hoodie.bloom.index.prune.by.ranges -> true hoodie.parquet.block.size -> 6291456 hoodie.metadata.enable -> true hoodie.datasource.hive_sync.table -> published_ad hoodie.index.type -> BLOOM hoodie.parquet.compression.codec -> snappy hoodie.datasource.write.recordkey.field -> AdId hoodie.table.name -> published_ad hoodie.datasource.write.hive_style_partitioning -> true hoodie.datasource.meta.sync.base.path -> /XXXX/cdp-datapipeline-curation/datalake-deltastreamer/./tmp/AdDeltaStreamerTest/Domain=CampaignBuild/Table=published_ad/ hoodie.metadata.insert.parallelism -> 1 hoodie.enable.data.skipping -> false hoodie.metadata.index.column.stats.parallelism -> 1 hoodie.datasource.write.keygenerator.class -> org.apache.hudi.keygen.SimpleKeyGenerator hoodie.meta.sync.client.tool.class -> org.apache.hudi.aws.sync.AwsGlueCatalogSyncTool hoodie.datasource.write.partitionpath.field -> CampaignId hoodie.index.bloom.num_entries -> 60000
  4. delta stream a table locally
  5. try reading it using like: println(s"READ CONFIG: ${readConfigurations.mkString("\n")}") val df = spark.read.format("hudi") .options(readConfigurations) .load(basePath)

println(s"Querying hudi table: ${query}") df.createOrReplaceTempView(tempViewName) val selectedDF = spark.sql(query) the readConfigurations are: READ CONFIG: hoodie.datasource.hive_sync.database -> datalake_ods_local hoodie.datasource.hive_sync.support_timestamp -> true hoodie.datasource.write.precombine.field -> StartDateUtc hoodie.datasource.hive_sync.partition_fields -> CampaignId hoodie.metadata.index.column.stats.enable -> true hoodie.cleaner.fileversions.retained -> 2 hoodie.parquet.max.file.size -> 6291456 hoodie.datasource.write.keygenerator.consistent.logical.timestamp.enabled -> true hoodie.bloom.index.prune.by.ranges -> true hoodie.parquet.block.size -> 6291456 hoodie.metadata.enable -> true hoodie.datasource.hive_sync.table -> published_ad hoodie.index.type -> BLOOM hoodie.parquet.compression.codec -> snappy hoodie.datasource.write.recordkey.field -> AdId hoodie.table.name -> published_ad hoodie.datasource.write.hive_style_partitioning -> true hoodie.datasource.meta.sync.base.path -> /XXXX/cdp-datapipeline-curation/cdp-datapipeline-curation/datalake-deltastreamer/./tmp/AdDeltaStreamerTest/Domain=CampaignBuild/Table=published_ad/ hoodie.metadata.insert.parallelism -> 1 hoodie.enable.data.skipping -> false hoodie.metadata.index.column.stats.parallelism -> 1 hoodie.datasource.write.keygenerator.class -> org.apache.hudi.keygen.SimpleKeyGenerator hoodie.meta.sync.client.tool.class -> org.apache.hudi.aws.sync.AwsGlueCatalogSyncTool hoodie.datasource.write.partitionpath.field -> CampaignId hoodie.index.bloom.num_entries -> 60000

spark.sql is where it will fail, same behaviour on .show

I expected the table to be read successfully and have an ability to execute actions on the df.

Environment Description

Stacktrace Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182) at scala.Option.foreach(Option.scala:407) ... Cause: java.lang.IllegalArgumentException: For input string: "null" at scala.collection.immutable.StringLike.parseBoolean(StringLike.scala:330) at scala.collection.immutable.StringLike.toBoolean(StringLike.scala:289) at scala.collection.immutable.StringLike.toBoolean$(StringLike.scala:289) at scala.collection.immutable.StringOps.toBoolean(StringOps.scala:33) at org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.(ParquetSchemaConverter.scala:70) at org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormatHelper$.buildImplicitSchemaChangeInfo(HoodieParquetFileFormatHelper.scala:30) at org.apache.spark.sql.execution.datasources.parquet.Spark32PlusHoodieParquetFileFormat.$anonfun$buildReaderWithPartitionValues$2(Spark32PlusHoodieParquetFileFormat.scala:231) at org.apache.hudi.HoodieDataSourceHelper$.$anonfun$buildHoodieParquetReader$1(HoodieDataSourceHelper.scala:71) at org.apache.hudi.HoodieBaseRelation.$anonfun$createBaseFileReader$1(HoodieBaseRelation.scala:554) at org.apache.hudi.HoodieBaseRelation$BaseFileReader.apply(HoodieBaseRelation.scala:613) ... [Executor task launch worker for task 1.0 in stage 85.0 (TID 123)] ERROR org.apache.spark.executor.Executor - Exception in task 1.0 in stage 85.0 (TID 123) ava.lang.IllegalArgumentException: For input string: "null" at scala.collection.immutable.StringLike.parseBoolean(StringLike.scala:330) ~[scala-library-2.12.17.jar:?] at scala.collection.immutable.StringLike.toBoolean(StringLike.scala:289) ~[scala-library-2.12.17.jar:?] at scala.collection.immutable.StringLike.toBoolean$(StringLike.scala:289) ~[scala-library-2.12.17.jar:?] at scala.collection.immutable.StringOps.toBoolean(StringOps.scala:33) ~[scala-library-2.12.17.jar:?] at org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.(ParquetSchemaConverter.scala:70) ~[spark-sql_2.12-3.3.2.jar:3.3.2] at org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormatHelper$.buildImplicitSchemaChangeInfo(HoodieParquetFileFormatHelper.scala:30) ~[hudi-spark3.3-bundle_2.12-0.13.0.jar:3.3.2] at org.apache.spark.sql.execution.datasources.parquet.Spark32PlusHoodieParquetFileFormat.$anonfun$buildReaderWithPartitionValues$2(Spark32PlusHoodieParquetFileFormat.scala:231) ~[hudi-spark3.3-bundle_2.12-0.13.0.jar:3.3.2] at org.apache.hudi.HoodieDataSourceHelper$.$anonfun$buildHoodieParquetReader$1(HoodieDataSourceHelper.scala:71) ~[hudi-spark3.3-bundle_2.12-0.13.0.jar:0.13.0] at org.apache.hudi.HoodieBaseRelation.$anonfun$createBaseFileReader$1(HoodieBaseRelation.scala:554) ~[hudi-spark3.3-bundle_2.12-0.13.0.jar:0.13.0] at org.apache.hudi.HoodieBaseRelation$BaseFileReader.apply(HoodieBaseRelation.scala:613) ~[hudi-spark3.3-bundle_2.12-0.13.0.jar:0.13.0] at org.apache.hudi.HoodieMergeOnReadRDD.compute(HoodieMergeOnReadRDD.scala:87) ~[hudi-spark3.3-bundle_2.12-0.13.0.jar:0.13.0] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365) ~[spark-core_2.12-3.3.2.jar:3.3.2] at org.apache.spark.rdd.RDD.iterator(RDD.scala:329) ~[spark-core_2.12-3.3.2.jar:3.3.2] at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) ~[spark-core_2.12-3.3.2.jar:3.3.2] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365) ~[spark-core_2.12-3.3.2.jar:3.3.2] at org.apache.spark.rdd.RDD.iterator(RDD.scala:329) ~[spark-core_2.12-3.3.2.jar:3.3.2] at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) ~[spark-core_2.12-3.3.2.jar:3.3.2] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365) ~[spark-core_2.12-3.3.2.jar:3.3.2] at org.apache.spark.rdd.RDD.iterator(RDD.scala:329) ~[spark-core_2.12-3.3.2.jar:3.3.2] at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) ~[spark-core_2.12-3.3.2.jar:3.3.2] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365) ~[spark-core_2.12-3.3.2.jar:3.3.2] at org.apache.spark.rdd.RDD.iterator(RDD.scala:329) ~[spark-core_2.12-3.3.2.jar:3.3.2] at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) ~[spark-core_2.12-3.3.2.jar:3.3.2] at org.apache.spark.scheduler.Task.run(Task.scala:136) ~[spark-core_2.12-3.3.2.jar:3.3.2] at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548) ~[spark-core_2.12-3.3.2.jar:3.3.2] at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504) ~[spark-core_2.12-3.3.2.jar:3.3.2] at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551) ~[spark-core_2.12-3.3.2.jar:3.3.2] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?] at java.lang.Thread.run(Thread.java:1589) ~[?:?]

bhasudha commented 1 year ago

@yihua @ad1happy2go if you can help reproduce and trige this further.

ad1happy2go commented 1 year ago

@rmnlchh Just curious, Did you set these configs

sc.set("spark.sql.legacy.parquet.nanosAsLong", "false");
sc.set("spark.sql.parquet.binaryAsString", "false");
sc.set("spark.sql.parquet.int96AsTimestamp", "true");
sc.set("spark.sql.caseSensitive", "false");

with your deltastreamer also? I will try to reproduce this issue .

rmnlchh commented 1 year ago

@rmnlchh Just curious, Did you set these configs

sc.set("spark.sql.legacy.parquet.nanosAsLong", "false");
sc.set("spark.sql.parquet.binaryAsString", "false");
sc.set("spark.sql.parquet.int96AsTimestamp", "true");
sc.set("spark.sql.caseSensitive", "false");

with your deltastreamer also? I will try to reproduce this issue .

Yes, adding all the DS configs println(s"hoodieDeltaStreamerConfig=$hoodieDeltaStreamerConfig") println(s"typedProperties=$typedProperties") println("HERE JSC" + jsc.getConf.getAll.mkString) val hoodieDeltaStreamer = new HoodieDeltaStreamer(hoodieDeltaStreamerConfig, jsc , FSUtils.getFs(hoodieDeltaStreamerConfig.targetBasePath, conf), jsc.hadoopConfiguration , org.apache.hudi.common.util.Option.of(typedProperties) ) hoodieDeltaStreamerConfig=Config{targetBasePath='/XXX/cdp-datapipeline-curation/cdp-datapipeline-curation/datalake-deltastreamer/./tmp/CreativeDeltaStreamerTest/Domain=CampaignBuild/Table=published_creative/', targetTableName='published_creative', tableType='MERGE_ON_READ', baseFileFormat='PARQUET', propsFilePath='file://XXX/cdp-datapipeline-curation/cdp-datapipeline-curation/datalake-deltastreamer/src/test/resources/delta-streamer-config/dfs-source.properties', configs=[], sourceClassName='org.apache.hudi.utilities.sources.AvroKafkaSource', sourceOrderingField='AssetValue', payloadClassName='org.apache.hudi.common.model.OverwriteWithLatestAvroPayload', schemaProviderClassName='com.cardlytics.datapipeline.deltastreamer.schema.ResourceBasedSchemaProvider', transformerClassNames=[org.apache.hudi.utilities.transform.SqlQueryBasedTransformer], sourceLimit=9223372036854775807, operation=UPSERT, filterDupes=false, enableHiveSync=false, enableMetaSync=false, forceEmptyMetaSync=false, syncClientToolClassNames=org.apache.hudi.aws.sync.AwsGlueCatalogSyncTool, maxPendingCompactions=5, maxPendingClustering=5, continuousMode=false, minSyncIntervalSeconds=0, sparkMaster='', commitOnErrors=false, deltaSyncSchedulingWeight=1, compactSchedulingWeight=1, clusterSchedulingWeight=1, deltaSyncSchedulingMinShare=0, compactSchedulingMinShare=0, clusterSchedulingMinShare=0, forceDisableCompaction=true, checkpoint='null', initialCheckpointProvider='null', help=false} typedProperties={spark.sql.avro.compression.codec=snappy, hoodie.datasource.hive_sync.table=published_creative, hoodie.datasource.hive_sync.partition_fields=Entity, hoodie.metadata.index.column.stats.enable=false, hoodie.index.type=BLOOM, hoodie.datasource.write.reconcile.schema=true, hoodie.deltastreamer.schemaprovider.source.schema.file=domain/campaignbuild/schema/creative.avsc, bootstrap.servers=PLAINTEXT://localhost:34873, hoodie.compact.inline=false, hoodie.deltastreamer.transformer.sql= SELECT 'Creative' Entity ,o.CreativeId ,o.PreMessageImpression ,o.PostMessageImpression ,o.Assets.Type AssetType ,o.Assets.Slot AssetSlot ,o.Assets.Label AssetLabel ,o.Assets.Value AssetValue FROM (SELECT a.CreativeId, a.PreMessageImpression, a.PostMessageImpression, explode(a.Assets) Assets FROM

a) o , hoodie.parquet.max.file.size=6291456, hoodie.datasource.write.recordkey.field=CreativeId,AssetSlot, hoodie.index.bloom.num_entries=60000, hoodie.datasource.hive_sync.support_timestamp=true, hoodie.metadata.enable=false, schema.registry.url=http://localhost:34874, hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.ComplexKeyGenerator, hoodie.datasource.write.table.type=MERGE_ON_READ, hoodie.deltastreamer.source.kafka.topic=CMPN-CmpnPub-AdServer-Creative, hoodie.datasource.write.hive_style_partitioning=true, hoodie.metadata.insert.parallelism=1, hoodie.deltastreamer.schemaprovider.spark_avro_post_processor.enable=false, hoodie.parquet.compression.codec=snappy, spark.io.compression.codec=snappy, hoodie.deltastreamer.schemaprovider.target.schema.file=domain/campaignbuild/schema/published_creative_table.json, hoodie.bloom.index.prune.by.ranges=true, hoodie.datasource.write.partitionpath.field=Entity, hoodie.datasource.write.keygenerator.consistent.logical.timestamp.enabled=true, hoodie.parquet.block.size=6291456, hoodie.cleaner.fileversions.retained=2, hoodie.table.name=published_creative, hoodie.upsert.shuffle.parallelism=4, hoodie.meta.sync.client.tool.class=org.apache.hudi.aws.sync.AwsGlueCatalogSyncTool, spark.sql.parquet.compression.codec=snappy, hoodie.datasource.write.precombine.field=AssetValue, hoodie.datasource.write.payload.class=org.apache.hudi.common.model.DefaultHoodieRecordPayload, hoodie.datasource.meta.sync.base.path=/XXX/cdp-datapipeline-curation/cdp-datapipeline-curation/datalake-deltastreamer/./tmp/CreativeDeltaStreamerTest/Domain=CampaignBuild/Table=published_creative/, hoodie.datasource.hive_sync.database=datalake_ods_local, hoodie.datasource.write.operation=upsert, auto.offset.reset=earliest, hoodie.metadata.index.column.stats.parallelism=1, hoodie.compact.inline.max.delta.commits=5} HERE JSC(spark.sql.hive.convertMetastoreParquet,false)(spark.driver.extraJavaOptions,-XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED)(spark.sql.parquet.datetimeRebaseModeInRead,LEGACY)(spark.sql.legacy.timeParserPolicy,LEGACY)(spark.memory.fraction,0.50)(spark.sql.parquet.outputTimestampType,TIMESTAMP_MICROS)(spark.driver.port,44679)(spark.sql.parquet.int96AsTimestamp,true)(spark.sql.parquet.binaryAsString,false)(spark.memory.storageFraction,0.20)(spark.serializer,org.apache.spark.serializer.KryoSerializer)(spark.sql.avro.datetimeRebaseModeInWrite,LEGACY)(spark.executor.memoryOverheadFactor,0.20)(spark.sql.parquet.compression.codec,snappy)(spark.master,local[*])(spark.sql.caseSensitive,false)(spark.app.name,CreativeDeltaStreamerTest-creative-deltastreamer-1690266341)(spark.app.id,local-1690280742465)(spark.sql.extension,org.apache.spark.sql.hudi.HoodieSparkSessionExtension)(spark.sql.legacy.parquet.nanosAsLong,false)(spark.io.compression.codec,snappy)(spark.driver.host,127.0.0.1)(spark.io.compression.zstd.bufferSize,64k)(spark.shuffle.compress,true)(spark.sql.avro.datetimeRebaseModeInRead,LEGACY)(spark.sql.avro.compression.codec,snappy)(spark.app.startTime,1690280741849)(spark.executor.id,driver)(spark.sql.parquet.int96RebaseModeInRead,LEGACY)(spark.shuffle.spill.compress,true)(spark.rdd.compress,true)(spark.sql.adaptive.skewJoin.enabled,true)(spark.broadcast.compress,true)(spark.sql.parquet.datetimeRebaseModeInWrite,LEGACY)(spark.io.compression.zstd.level,1)(spark.driver.memoryOverheadFactor,0.15)(spark.sql.catalog,org.apache.spark.sql.hudi.catalog.HoodieCatalog)(spark.executor.extraJavaOptions,-XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED)(spark.executor.cores,1)(spark.io.compression.lz4.blockSize,64k)
ad1happy2go commented 1 year ago

@rmnlchh I couldn't reproduce this issue.

Code I tried - https://gist.github.com/ad1happy2go/1391a679de49efa1872563062f04e29b

Can you let us know the schema of the table, I can try with all the datatype combinations as you have in your dataset. Can you check the code also once in case I missed anything.

rmnlchh commented 1 year ago

@ad1happy2go Kafka topic record schema: { "type": "record", "name": "Creative", "namespace": "Cardlytics.Ops.Messages.Portal", "fields": [ { "name": "CreativeId", "type": "string" }, { "name": "PreMessageImpression", "type": "string", "default": "" }, { "name": "PostMessageImpression", "type": "string", "default": "" }, { "name": "Assets", "type": { "type": "array", "items": { "type": "record", "name": "AdAsset", "fields": [ { "name": "Type", "type": { "type": "enum", "name": "AssetType", "symbols": [ "Other", "Text", "Image", "Video", "Link" ], "default": "Other" } }, { "name": "Slot", "type": "string" }, { "name": "Label", "type": [ "null", "string" ], "default": null }, { "name": "Value", "type": "string" } ] } } } ] } DS transform query SELECT 'Creative' Entity ,o.CreativeId ,o.PreMessageImpression ,o.PostMessageImpression ,o.Assets.Type AssetType ,o.Assets.Slot AssetSlot ,o.Assets.Label AssetLabel ,o.Assets.Value AssetValue FROM (SELECT a.CreativeId, a.PreMessageImpression, a.PostMessageImpression, explode(a.Assets) Assets FROM

a) o expected result table schema: { "type": "record", "name": "Creative", "namespace": "Cardlytics.Ops.Messages.Portal", "fields": [ { "name": "Entity", "type": "string" }, { "name": "CreativeId", "type": "string" }, { "name": "PreMessageImpression", "type": "string", "default": "" }, { "name": "PostMessageImpression", "type": "string", "default": "" }, { "name": "AssetType", "type": "string" }, { "name": "AssetSlot", "type": "string" }, { "name": "AssetLabel", "type": [ "null", "string" ], "default": null }, { "name": "AssetValue", "type": "string" } ] }
amrishlal commented 1 year ago

@rmnlchh @ad1happy2go I am looking at the following part of the stack trace:

Cause: java.lang.IllegalArgumentException: For input string: "null"
at scala.collection.immutable.StringLike.parseBoolean(StringLike.scala:330)
at scala.collection.immutable.StringLike.toBoolean(StringLike.scala:289)
at scala.collection.immutable.StringLike.toBoolean$(StringLike.scala:289)
at scala.collection.immutable.StringOps.toBoolean(StringOps.scala:33)
at org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.(ParquetSchemaConverter.scala:70)
at org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormatHelper$.buildImplicitSchemaChangeInfo(HoodieParquetFileFormatHelper.scala:30)

The stack trace seems to indicate that there was a problem while trying to convert a string value into boolean (see code line at spark v3.3.2 ParquetSchemaConverter.scala:70 which I have pasted below):

conf.get(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key).toBoolean)

This line seems to indicate that you need to set spark.sql.legacy.parquet.nanosAsLong to enter 'true' or 'false' to avoid this exception from coming up (see definition of LEGACY_PARQUET_NANOS_AS_LONG here).

The following configs would need to be set to avoid similar errors:

  def this(conf: Configuration) = this(
    assumeBinaryIsString = conf.get(SQLConf.PARQUET_BINARY_AS_STRING.key).toBoolean,
    assumeInt96IsTimestamp = conf.get(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key).toBoolean,
    caseSensitive = conf.get(SQLConf.CASE_SENSITIVE.key).toBoolean,
    conf.get(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key).toBoolean)

@nsivabalan I am wondering if we need to set default value of these parameters in HoodieParquetFileFormatHelper.buildImplicitSchemaChangeInfo to avoid spark code throwing exception or print a more friendly error message?

nsivabalan commented 1 year ago

@amrishlal : author already confirmed that they are setting the config of interest (spark.sql.legacy.parquet.nanosAsLong). So, somewhere hudi is missing to wire it in properly. but Aditya could not reproduce the issue. also, the null parsing is very strange. even by default if we don't set any value for spark.sql.legacy.parquet.nanosAsLong for the ds job, we should have got the exception right? but thats not the case. We might need to dig in more.

nsivabalan commented 1 year ago

I have some hunch on where the issue could be. The sql conf defaults are actually set in ParquetFileFormat https://github.com/apache/spark/blob/v3.3.2/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala Above referenced one is directly from spark repo. Code snippet of interest:

 conf.set(
      SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key,
      sparkSession.sessionState.conf.legacyParquetNanosAsLong.toString)

But in Hudi, we have overridden the fileformat. https://github.com/apache/hudi/blob/2d779fb5aa1ebfd33676ebf29217f25c60e17d12/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33HoodieParquetFileFormat.scala

    // Using string value of this conf to preserve compatibility across spark versions.
    hadoopConf.setBoolean(
      "spark.sql.legacy.parquet.nanosAsLong",
      sparkSession.sessionState.conf.getConfString("spark.sql.legacy.parquet.nanosAsLong", "false").toBoolean
    )

This is slightly different from how other similar configs are set

hadoopConf.setBoolean(
      SQLConf.PARQUET_BINARY_AS_STRING.key,
      sparkSession.sessionState.conf.isParquetBinaryAsString)
    hadoopConf.setBoolean(
      SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
      sparkSession.sessionState.conf.isParquetINT96AsTimestamp)

@amrishlal : can you dive in more in this direction

nsivabalan commented 1 year ago

But first, we might need to reproduce the issue though on our end.

nsivabalan commented 1 year ago

Aditya: One issue could be, you are trying using local spark. can we try in a cluster set up. CC @amrishlal

rmnlchh commented 1 year ago

@nsivabalan we see this issue when running tests on GitHub Actions workers so probably that's not the case here

ad1happy2go commented 1 year ago

@rmnlchh I tried reproducing this issue on local as well as EMR, tried with multiple data type combinations but couldn't reproduce this issue. I will continue to try more.

Meanwhile can you share any more insights about your environment which you think it will be relevant which could help me reproducing this error on our end.