Closed lavakerreddy closed 2 years ago
@minihippo @XuQianJin-Stars @alexeykudinkin any idea how to solve this?
could you try set spark.sql.legacy.parquet.datetimeRebaseModeInRead to 'LEGACY'?
@fengjian428 This issue is still existing with Hudi 0.11. Please note when a Hudi table is created from a spark data frame , with the spark confs there is no issue. However the same spark confs used in delta streamer spark submit throws error for date formats older than 1900-01-01
Please note setting spark.sql.legacy.parquet.datetimeRebaseModeInRead to 'LEGACY’ or any conf in delta streamer spark submit doesn’t work to mitigate the date issue
Can you please re-run your command re-ordering following properties: in your original command it was
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
--conf spark.shuffle.service.enabled=true \
--conf spark.default.parallelism=500 \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.dynamicAllocation.initialExecutors=3 \
--conf spark.dynamicAllocation.cachedExecutorIdleTimeout=90s \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
--conf spark.app.name=SPKNG_TST_SCTN_ITM \
But instead, please first specify all Spark properties only after that specify --class ...HoodieDeltaStreamer like following:
--conf spark.shuffle.service.enabled=true \
--conf spark.default.parallelism=500 \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.dynamicAllocation.initialExecutors=3 \
--conf spark.dynamicAllocation.cachedExecutorIdleTimeout=90s \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
--conf spark.app.name=SPKNG_TST_SCTN_ITM \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
@alexeykudinkin this is followup from your messages on slack channel. Please find the command I am using. The spark configs are above the --class. but it still has the same issue.
_spark-submit --jars "/usr/lib/spark/external/lib/spark-avro.jar,/usr/lib/hudi/hudi-spark-bundle_2.12-0.11.1.jar" --conf spark.sql.legacy.parquet.int96RebaseModeInRead=CORRECTED --conf spark.sql.legacy.parquet.int96RebaseModeInWrite=CORRECTED --conf spark.sql.legacy.parquet.datetimeRebaseModeInRead=corrected --conf spark.sql.legacy.parquet.datetimeRebaseModeInRead=legacy --conf spark.sql.legacy.parquet.datetimeRebaseModeInWrite=CORRECTED --conf spark.sql.parquet.mergeSchema=true --class org.apache.hudi.utilities.deltastreamer.HoodieMultiTableDeltaStreamer --master yarn --deploy-mode client /home/hadoop/hudi/hudi-utilities-bundle_2.12-0.11.1.jar --table-type COPY_ON_WRITE --props "file:///home/hadoop/hudi/config-source.properties" --config-folder "file:///home/hadoop/hudi/" --base-path-prefix "s3://hudi_s3_bucketname/hudi/" --source-class org.apache.hudi.utilities.sources.ParquetDFSSource --source-ordering-field ts \
@brskiran1 can you please try https://github.com/apache/hudi/pull/6352 and see if it resolves the issue
Was able to reproduce by adding the following line in my source: newDataSet = newDataSet.withColumn("invalidDates", functions.lit("1000-01-11").cast(DataTypes.DateType));
Full stacktrace here: https://gist.github.com/rmahindra123/4ab3614ef6ce30ee2c72499f2633de57
Confirmed that #6352 resolves the issue after adding the following config: --conf spark.sql.avro.datetimeRebaseModeInWrite=LEGACY
https://issues.apache.org/jira/browse/HUDI-4584 thanks folks. going ahead and closing out the issue. if you still have some issues, feel free to re-open or open a new one.
@alexeykudinkin @nsivabalan @rmahindra123 in deltastreamer this issue still exists.. --conf spark.sql.avro.datetimeRebaseModeInWrite=LEGACY spark.sql.avro.datetimeRebaseModeInRead=LEGACY does not work. Will have to reopen this issue
@brskiran1 have you tried on the latest master? Can you please paste stacktrace you're observing.
Hi, in deltastreamer this issue still exists :(
@lucabem can you please share more details regarding your environment (Hudi, Spark versions)?
Hi @alexeykudinkin, Im using hudi 0.12.1 and spark 3.1.2. Im trying to execute this command:
spark-submit \
--conf spark.sql.legacy.parquet.datetimeRebaseModeInRead=CORRECTED \
--conf spark.sql.legacy.parquet.datetimeRebaseModeInWrite=CORRECTED \
--conf spark.sql.legacy.parquet.int96RebaseModeInRead=CORRECTED \
--conf spark.sql.legacy.parquet.int96RebaseModeInWrite=CORRECTED \
--conf spark.driver.memory=12g \
--conf spark.driver.maxResultSize=12g \
--driver-cores 8 \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer jars/hudi-utilities-bundle_2.12-0.12.1.jar \
--table-type COPY_ON_WRITE \
--op INSERT \
--source-ordering-field dms_timestamp \
--source-class org.apache.hudi.utilities.sources.ParquetDFSSource \
--target-base-path /home/luis/parquet/test_table \
--target-table gccom_demand_cond \
--transformer-class org.apache.hudi.utilities.transform.AWSDmsTransformer \
--payload-class org.apache.hudi.payload.AWSDmsAvroPayload \
--hoodie-conf hoodie.datasource.write.recordkey.field=id_key \
--hoodie-conf hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.ComplexKeyGenerator \
--hoodie-conf hoodie.datasource.write.partitionpath.field= \
--hoodie-conf hoodie.deltastreamer.source.dfs.root=/home/luis/parquet/data \
--hoodie-conf hoodie.datasource.write.drop.partition.columns=true \
--hoodie-conf hoodie.datasource.write.hive_style_partitioning=true \
--hoodie-conf hoodie.cleaner.commits.retained=1800 \
--hoodie-conf clean.retain_commits=1800 \
--hoodie-conf archive.min_commits=2000 \
--hoodie-conf archive.max_commits=2010 \
--hoodie-conf hoodie.keep.min.commits=2000 \
--hoodie-conf hoodie.keep.max.commits=2010 \
--enable-sync \
--sync-tool-classes org.apache.hudi.hive.HiveSyncTool \
--hoodie-conf hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://localhost:10000 \
--hoodie-conf hoodie.datasource.hive_sync.enable=true \
--hoodie-conf hoodie.datasource.hive_sync.database=database \
--hoodie-conf hoodie.datasource.hive_sync.table=test_table \
--hoodie-conf hoodie.datasource.hive_sync.mode=hms \
--hoodie-conf hoodie.datasource.hive_sync.auto_create_database=true
and when I open spark-ui Environment tab, conf vars appear setted but then when Hudi (Javalin) is executed it throws the exception
Caused by: org.apache.spark.SparkUpgradeException:
You may get a different result due to the upgrading of Spark 3.0:
reading dates before 1582-10-15 or timestamps before 1900-01-01T00:00:00Z from Parquet files can be ambiguous, as the files may be written by Spark 2.x or legacy versions of Hive, which uses a legacy hybrid calendar that is different from Spark 3.0+'s Proleptic Gregorian calendar.
See more details in SPARK-31404.
You can set spark.sql.legacy.parquet.datetimeRebaseModeInRead to 'LEGACY' to rebase the datetime values w.r.t. the calendar difference during reading. Or set spark.sql.legacy.parquet.datetimeRebaseModeInRead to 'CORRECTED' to read the datetime values as it is.
Rigth now, the only solution I have found is reading-writing source parquet using spark 3.1.2 (just read / write) with spark.legacy conf, and then use this parquet output as input of DeltaStreamer
@alexeykudinkin
Hey Alexey,
I'm also still getting the same error after updating to 0.12.1.
Hudi: 0.12.1-amzn-0-SNAPSHOT Spark: 3.3.0 EMR: 6.9.0
`spark-submit --master yarn
--deploy-mode cluster
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer,spark.sql.parquet.datetimeRebaseModeInRead=CORRECTED,spark.sql.parquet.datetimeRebaseModeInWrite=CORRECTED,spark.sql.avro.datetimeRebaseModeInWrite=CORRECTED,spark.sql.avro.datetimeRebaseModeInRead=CORRECTED,spark.sql.legacy.parquet.datetimeRebaseModeInRead=CORRECTED,spark.sql.legacy.parquet.datetimeRebaseModeInWrite=CORRECTED,spark.sql.legacy.parquet.int96RebaseModeInRead=CORRECTED,spark.sql.legacy.parquet.int96RebaseModeInWrite=CORRECTED
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer /usr/lib/hudi/hudi-utilities-bundle.jar
--table-type COPY_ON_WRITE
--source-ordering-field replicadmstimestamp
--source-class org.apache.hudi.utilities.sources.ParquetDFSSource
--target-base-path s3://bucket/folder/folder/table
--target-table table
--payload-class org.apache.hudi.common.model.AWSDmsAvroPayload
--hoodie-conf hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedKeyGenerator
--hoodie-conf hoodie.deltastreamer.keygen.timebased.timestamp.type=DATE_STRING
--hoodie-conf hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy-MM
--hoodie-conf "hoodie.deltastreamer.keygen.timebased.input.dateformat=yyyy-MM-dd HH:mm:ss.SSSSSS"
--hoodie-conf hoodie.datasource.write.recordkey.field=_id
--hoodie-conf hoodie.datasource.write.partitionpath.field=replicadmstimestamp
--hoodie-conf hoodie.deltastreamer.source.dfs.root=s3://bucket/folder/folder/table`
I've tried about the every combination of the datetimeRebaseMode I've managed to think of, and the result is always the same.
stacktrace included, is there any possible workaround for this? I currently have a separate process to change the timestamp columns, which works, but adds a bunch of overhead to the process.
Never mind, I got it working.
I specified the --conf wrong, having the options comma separated instead of separate --conf statements and it needs to have both spark.sql.avro and spark.sql.parquet options set to work as such:
--conf spark.sql.avro.datetimeRebaseModeInRead=CORRECTED --conf spark.sql.avro.datetimeRebaseModeInWrite=CORRECTED --conf spark.sql.parquet.datetimeRebaseModeInRead=CORRECTED --conf spark.sql.parquet.datetimeRebaseModeInWrite=CORRECTED
Not in my case, Im still having this issue
Can you post your exact spark submit? Do you know why it's failing, what is the data type and value in the column?
Hi @Virmaline, it is quite strage. I have downloaded a full table on AWS that gives me 4 parquets (lets call them A, B, C, ,D). I have tested your configuration and works fine with all combinations unless read all of them at the same time.
Combination | Result |
---|---|
A | OK |
B | OK |
C | OK |
D | OK |
A, B | OK |
A, C | OK |
A, D | OK |
B, C | OK |
B, D | OK |
C, D | OK |
A, B, C | OK |
A, B, D | OK |
A, C, D | OK |
B, C, D | OK |
A, B, C, D | KO |
But if I read first three parquets (A, B, C) and then I readlast one (D), it works. It looks like is loosing spark-conf somewhere. This is my code of spark-submit
spark-submit \
--jars jars/hudi-ext-0.12.1.jar,jars/avro-1.11.1.jar \
--conf spark.driver.memory=12g \
--conf spark.driver.maxResultSize=12g \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
--conf spark.sql.parquet.datetimeRebaseModeInRead=CORRECTED \
--conf spark.sql.parquet.datetimeRebaseModeInWrite=CORRECTED \
--conf spark.sql.avro.datetimeRebaseModeInWrite=CORRECTED \
--conf spark.sql.avro.datetimeRebaseModeInRead=CORRECTED \
--conf spark.sql.legacy.parquet.datetimeRebaseModeInRead=CORRECTED \
--conf spark.sql.legacy.parquet.datetimeRebaseModeInWrite=CORRECTED \
--conf spark.sql.legacy.parquet.int96RebaseModeInRead=CORRECTED \
--conf spark.sql.legacy.parquet.int96RebaseModeInWrite=CORRECTED \
--conf spark.sql.legacy.avro.datetimeRebaseModeInWrite=CORRECTED \
--driver-cores 8 \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer jars/hudi-utilities-bundle_2.12-0.12.1.jar \
--table-type COPY_ON_WRITE \
--op BULK_INSERT \
--source-ordering-field dms_timestamp \
--source-class org.apache.hudi.utilities.sources.ParquetDFSSource \
--target-base-path /home/luis/parquet/consolidation/gccc_demand_cond/ \
--target-table gccc_demand_cond \
--hoodie-conf hoodie.datasource.write.recordkey.field=id_demand_cond \
--hoodie-conf hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.ComplexKeyGenerator \
--hoodie-conf hoodie.datasource.write.partitionpath.field= \
--hoodie-conf hoodie.deltastreamer.source.dfs.root=/home/luis/parquet/data/gccc_demand_cond \
--hoodie-conf hoodie.datasource.write.drop.partition.columns=true \
--hoodie-conf hoodie.datasource.write.hive_style_partitioning=true \
--hoodie-conf hoodie.cleaner.commits.retained=1800 \
--hoodie-conf clean.retain_commits=1800 \
--hoodie-conf archive.min_commits=2000 \
--hoodie-conf archive.max_commits=2010 \
--hoodie-conf hoodie.keep.min.commits=2000 \
--hoodie-conf hoodie.keep.max.commits=2010 \
--transformer-class org.apache.hudi.utilities.transform.AWSDmsTransformer \
--payload-class org.apache.hudi.payload.AWSDmsAvroPayload \
--enable-sync \
--sync-tool-classes org.apache.hudi.hive.HiveSyncTool \
--hoodie-conf hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://localhost:10000 \
--hoodie-conf hoodie.datasource.hive_sync.enable=true \
--hoodie-conf hoodie.datasource.hive_sync.database=consolidation \
--hoodie-conf hoodie.datasource.hive_sync.table=gccc_demand_cond \
--hoodie-conf hoodie.datasource.hive_sync.mode=hms \
--hoodie-conf hoodie.datasource.hive_sync.auto_create_database=true
And this is my parquet schema:
############ file meta data ############
created_by: AWS
num_columns: 11
num_rows: 1011052
num_row_groups: 2023
format_version: 1.0
serialized_size: 1897645
############ Columns ############
dms_timestamp
create_date
update_date
update_user
update_program
optimist_lock
id_demand_cond
ini_date
end_date
id_sector_supply
cod_demand_type
############ Column(dms_timestamp) ############
name: dms_timestamp
path: dms_timestamp
max_definition_level: 0
max_repetition_level: 0
physical_type: BYTE_ARRAY
logical_type: String
converted_type (legacy): UTF8
############ Column(create_date) ############
name: create_date
path: create_date
max_definition_level: 1
max_repetition_level: 0
physical_type: INT64
logical_type: Timestamp(isAdjustedToUTC=true, timeUnit=microseconds, is_from_converted_type=true, force_set_converted_type=false)
converted_type (legacy): TIMESTAMP_MICROS
############ Column(update_date) ############
name: update_date
path: update_date
max_definition_level: 1
max_repetition_level: 0
physical_type: INT64
logical_type: Timestamp(isAdjustedToUTC=true, timeUnit=microseconds, is_from_converted_type=true, force_set_converted_type=false)
converted_type (legacy): TIMESTAMP_MICROS
############ Column(update_user) ############
name: update_user
path: update_user
max_definition_level: 1
max_repetition_level: 0
physical_type: BYTE_ARRAY
logical_type: String
converted_type (legacy): UTF8
############ Column(update_program) ############
name: update_program
path: update_program
max_definition_level: 1
max_repetition_level: 0
physical_type: BYTE_ARRAY
logical_type: String
converted_type (legacy): UTF8
############ Column(optimist_lock) ############
name: optimist_lock
path: optimist_lock
max_definition_level: 1
max_repetition_level: 0
physical_type: INT32
logical_type: Int(bitWidth=8, isSigned=true)
converted_type (legacy): INT_8
############ Column(id_demand_cond) ############
name: id_demand_cond
path: id_demand_cond
max_definition_level: 1
max_repetition_level: 0
physical_type: FIXED_LEN_BYTE_ARRAY
logical_type: Decimal(precision=15, scale=0)
converted_type (legacy): DECIMAL
############ Column(ini_date) ############
name: ini_date
path: ini_date
max_definition_level: 1
max_repetition_level: 0
physical_type: INT64
logical_type: Timestamp(isAdjustedToUTC=true, timeUnit=microseconds, is_from_converted_type=true, force_set_converted_type=false)
converted_type (legacy): TIMESTAMP_MICROS
############ Column(end_date) ############
name: end_date
path: end_date
max_definition_level: 1
max_repetition_level: 0
physical_type: INT64
logical_type: Timestamp(isAdjustedToUTC=true, timeUnit=microseconds, is_from_converted_type=true, force_set_converted_type=false)
converted_type (legacy): TIMESTAMP_MICROS
############ Column(id_sector_supply) ############
name: id_sector_supply
path: id_sector_supply
max_definition_level: 1
max_repetition_level: 0
physical_type: FIXED_LEN_BYTE_ARRAY
logical_type: Decimal(precision=15, scale=0)
converted_type (legacy): DECIMAL
############ Column(cod_demand_type) ############
name: cod_demand_type
path: cod_demand_type
max_definition_level: 1
max_repetition_level: 0
physical_type: BYTE_ARRAY
logical_type: String
converted_type (legacy): UTF8
It is quite strange, because I have othe table with only two parquets with same total size (400 mb) that works fine with ur config. The final table doing two steps consolidation first(A, B, C) and second(D) gives me this value for columns ini_date and end_date
# | _hoodie_commit_time | min_ini_date | max_ini_date | min_end_date | max_end_date |
---|---|---|---|---|---|
1 | 20221218094506279 | 0017-09-19 00:00:00.000 | 5201-07-24 00:00:00.000 | 1900-01-01 00:00:00.000 | 2999-12-31 00:00:00.000 |
2 | 20221218095641796 | 2009-03-10 00:00:00.000 | 2022-02-22 00:00:00.000 | 0017-08-02 00:00:00.000 | 2999-12-31 00:00:00.000 |
Hi @Virmaline, I have checked other tables and looks like it cannot read more than four parquets. When I add four or more files, it shows my this error.
Is it a know bug?
Hi lucabem, I haven't run into that, I will have to test that out, maybe it'll get to that tomorrow and I can let you know my results, but I don't actually know how all of this works on a deeper level, I'm just trying to get it working as well.
Folks, if you believe the issue is still there in your use-case in 0.12.1, please feel free to reopen this one
Just as an FYI, I ran into something very similar to this and was unable to get past the error (below) with any of the datetimeRebase*
configurations.
The TL;DR is that Hudi 0.12.1
does not appear to have this issue, whereas 0.11.1
does.
The long version:
We are using Hudi 0.11.1
Deltastreamer ingesting from Kafka into S3.
Our workaround was to "freeze" the dataset that was causing this issue by running a second deltastreamer using 0.12.1
from the checkpoint.key
of the primary deltastreamer into a staging location, then manually move the checkpoint for the primary process ahead of the records that were causing the issue by editing the latest commit
instance file.
The error we were getting from the 0.11.1
Deltastreamer:
23/02/14 14:40:03 WARN TaskSetManager: Lost task 12.0 in stage 3.0 (TID 373) org.apache.spark.SparkUpgradeException: You may get a different result due to the upgrading of Spark 3.0:
writing dates before 1582-10-15 or timestamps before 1900-01-01T00:00:00Z into Avro
files can be dangerous, as the files may be read by Spark 2.x or legacy versions of Hive
later, which uses a legacy hybrid calendar that is different from Spark 3.0+'s Proleptic
Gregorian calendar. See more details in SPARK-31404. You can set spark.sql.avro.datetimeRebaseModeInWrite to 'LEGACY' to
rebase the datetime values w.r.t. the calendar difference during writing, to get maximum
interoperability. Or set spark.sql.avro.datetimeRebaseModeInWrite to 'CORRECTED' to write the datetime values as it is,
if you are 100% sure that the written files will only be read by Spark 3.0+ or other
systems that use Proleptic Gregorian calendar.
at org.apache.spark.sql.errors.QueryExecutionErrors$.sparkUpgradeInWritingDatesError(QueryExecutionErrors.scala:488)
at org.apache.spark.sql.execution.datasources.DataSourceUtils$.newRebaseExceptionInWrite(DataSourceUtils.scala:193)
at org.apache.hudi.org.apache.spark.sql.avro.AvroSerializer$.$anonfun$createTimestampRebaseFuncInWrite$1(AvroSerializer.scala:417)
at org.apache.hudi.org.apache.spark.sql.avro.AvroSerializer.$anonfun$newConverter$17(AvroSerializer.scala:182)
at org.apache.hudi.org.apache.spark.sql.avro.AvroSerializer.$anonfun$newConverter$17$adapted(AvroSerializer.scala:181)
at org.apache.hudi.org.apache.spark.sql.avro.AvroSerializer.$anonfun$newStructConverter$2(AvroSerializer.scala:289)
at org.apache.hudi.org.apache.spark.sql.avro.AvroSerializer.serialize(AvroSerializer.scala:65)
at org.apache.hudi.org.apache.spark.sql.avro.HoodieSpark3_2AvroSerializer.serialize(HoodieSpark3_2AvroSerializer.scala:28)
at org.apache.hudi.AvroConversionUtils$.$anonfun$createInternalRowToAvroConverter$1(AvroConversionUtils.scala:83)
at org.apache.hudi.HoodieSparkUtils$.$anonfun$createRdd$5(HoodieSparkUtils.scala:177)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:224)
at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:352)
at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1498)
at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1408)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1472)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1295)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:335)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:133)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1474)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
We are going to manually inspect the frozen dataset to try and determine what field(s) were causing the issue in 0.11.1
. We only have 3 datetime fields in our model so it should not be hard to locate an invalid date if it does exist (famous last words).
I get the same error with Hudi 0.13.1, but it works fine on Hudi 0.12.2.
We upgraded ourselves from running our Hudi spark-submits from EMR 5.33 to EMR 6.5 that has Spark 3x and then started running into below errors with date and timestamp. Please let us know if someone faced a similar issue and if there is a resolution. spark-submit \ --deploy-mode client \ --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \ --conf spark.shuffle.service.enabled=true \ --conf spark.default.parallelism=500 \ --conf spark.dynamicAllocation.enabled=true \ --conf spark.dynamicAllocation.initialExecutors=3 \ --conf spark.dynamicAllocation.cachedExecutorIdleTimeout=90s \ --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \ --conf spark.app.name=ETS_CUST \ --jars /usr/lib/spark/external/lib/spark-avro.jar, /usr/lib/hudi/hudi-utilities-bundle.jar \ --table-type MERGE_ON_READ \ --op INSERT \ --hoodie-conf hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://localhost:10000 \ --source-ordering-field dms_seq_no \ --props s3://ets-aws-daas-prod-resource/config/TOEFL/DMEREG02/ETS_CUST/ets_cust_full.properties \ --hoodie-conf hoodie.datasource.hive_sync.database=ets_aws_daas_raw_toefl_dmereg02 \ --target-base-path s3://ets-aws-daas-prod-raw/TOEFL/DMEREG02/ETS_CUST \ --target-table ETS_CUST \ --transformer-class org.apache.hudi.utilities.transform.SqlQueryBasedTransformer \ --hoodie-conf hoodie.deltastreamer.source.dfs.root=s3://ets-aws-daas-prod-landing/DMS/FULL/DMEREG02/ETS_CUST/ \ --source-class org.apache.hudi.utilities.sources.ParquetDFSSource --enable-sync 22/08/02 16:24:48 INFO DAGScheduler: ShuffleMapStage 3 (countByKey at BaseSparkCommitActionExecutor.java:175) failed in 27.903 s due to Job aborted due to stage failure: Task 53 in stage 3.0 failed 4 times, most recent failure: Lost task 53.3 in stage 3.0 (TID 105) (ip-172-31-26-128.ec2.internal executor 3): org.apache.spark.SparkUpgradeException: You may get a different result due to the upgrading of Spark 3.0: reading dates before 1582-10-15 or timestamps before 1900-01-01T00:00:00Z from Parquet files can be ambiguous, as the files may be written by Spark 2.x or legacy versions of Hive, which uses a legacy hybrid calendar that is different from Spark 3.0+'s Proleptic Gregorian calendar. See more details in SPARK-31404. You can set spark.sql.legacy.parquet.datetimeRebaseModeInRead to 'LEGACY' to rebase the datetime values w.r.t. the calendar difference during reading. Or set spark.sql.legacy.parquet.datetimeRebaseModeInRead to 'CORRECTED' to read the datetime values as it is. at org.apache.spark.sql.execution.datasources.DataSourceUtils$.newRebaseExceptionInRead(DataSourceUtils.scala:159) at org.apache.spark.sql.execution.datasources.DataSourceUtils.newRebaseExceptionInRead(DataSourceUtils.scala) at org.apache.spark.sql.execution.datasources.parquet.VectorizedPlainValuesReader.readLongsWithRebase(VectorizedPlainValuesReader.java:147) at org.apache.spark.sql.execution.datasources.parquet.VectorizedRleValuesReader.readLongsWithRebase(VectorizedRleValuesReader.java:399) at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readLongBatch(VectorizedColumnReader.java:587) at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:297) at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:295) at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:193) at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:37) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:159) at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:614) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:35) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:907) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221) at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:349) at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1440) at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1350) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1414) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1237) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384) at org.apache.spark.rdd.RDD.iterator(RDD.scala:335) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750)