apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.45k stars 2.43k forks source link

[SUPPORT] Exception when write null value to table with timestamp partitioning #11900

Open stream2000 opened 2 months ago

stream2000 commented 2 months ago

Tips before filing an issue

Describe the problem you faced

Exception when write null value to table with timestamp partitioning To Reproduce

  test("Test various data types as partition fields") {
    withRecordType()(withTempDir { tmp =>
      val tableName = generateTableName
      spark.sql(
        s"""
           |CREATE TABLE $tableName (
           |  id INT,
           |  boolean_field BOOLEAN,
           |  float_field FLOAT,
           |  byte_field BYTE,
           |  short_field SHORT,
           |  decimal_field DECIMAL(10, 5),
           |  date_field DATE,
           |  string_field STRING,
           |  timestamp_field TIMESTAMP
           |) USING hudi
           | TBLPROPERTIES (primaryKey = 'id')
           | PARTITIONED BY (boolean_field, float_field, byte_field, short_field, decimal_field, date_field, string_field, timestamp_field)
           |LOCATION '${tmp.getCanonicalPath}'
     """.stripMargin)

      // Insert data into partitioned table
      spark.sql(
        s"""
           |INSERT INTO $tableName VALUES
           |(1, TRUE, CAST(1.0 as FLOAT), 1, 1, 1234.56789, DATE '2021-01-05', 'partition1', null),
           |(2, FALSE,CAST(2.0 as FLOAT), 2, 2, 6789.12345, DATE '2021-01-06', 'partition2', TIMESTAMP '2021-01-06 11:00:00')
     """.stripMargin)

      checkAnswer(s"SELECT id, boolean_field FROM $tableName ORDER BY id")(
        Seq(1, true),
        Seq(2, false)
      )
    })
  }

Expected behavior

No exception encoutered

Environment Description

0.15.x

3.5

OSS

No

Additional context

Add any other context about the problem here.

Stacktrace

Add the stacktrace of the error.

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 9.0 failed 1 times, most recent failure: Lost task 0.0 in stage 9.0 (TID 8) (30.221.116.216 executor driver): java.lang.NumberFormatException: For input string: "HIVE_DEFAULT_PARTITION" at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) at java.lang.Long.parseLong(Long.java:589) at java.lang.Long.parseLong(Long.java:631) at scala.collection.immutable.StringLike.toLong(StringLike.scala:315) at scala.collection.immutable.StringLike.toLong$(StringLike.scala:315) at scala.collection.immutable.StringOps.toLong(StringOps.scala:33) at org.apache.spark.sql.hudi.command.SqlKeyGenerator.$anonfun$convertPartitionPathToSqlType$1(SqlKeyGenerator.scala:170) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286) at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) at scala.collection.TraversableLike.map(TraversableLike.scala:286) at scala.collection.TraversableLike.map$(TraversableLike.scala:279) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198) at org.apache.spark.sql.hudi.command.SqlKeyGenerator.convertPartitionPathToSqlType(SqlKeyGenerator.scala:156) at org.apache.spark.sql.hudi.command.SqlKeyGenerator.getPartitionPath(SqlKeyGenerator.scala:106) at org.apache.hudi.HoodieCreateRecordUtils$.getHoodieKeyAndMaybeLocationFromAvroRecord(HoodieCreateRecordUtils.scala:212) at org.apache.hudi.HoodieCreateRecordUtils$.$anonfun$createHoodieRecordRdd$5(HoodieCreateRecordUtils.scala:133) at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:224) at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:352) at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1614) at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1524) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1588) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1389) at org.apache.spark.storage.BlockManager.getOrElseUpdateRDDBlock(BlockManager.scala:1343) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:379) at org.apache.spark.rdd.RDD.iterator(RDD.scala:329) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367) at org.apache.spark.rdd.RDD.iterator(RDD.scala:331) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367) at org.apache.spark.rdd.RDD.iterator(RDD.scala:331) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166) at org.apache.spark.scheduler.Task.run(Task.scala:141) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750)

rangareddy commented 1 month ago

Hi @stream2000

I am able to reproduce this issue while running the following spark code. Needs to test the Hudi test case.

spark-shell \
    --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
    --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
    --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
    --conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar' \
    --conf "spark.sql.hive.convertMetastoreParquet=false"
spark.sql(
        s"""
           |CREATE TABLE hudi_test_null_partition (
           |  id INT,
           |  boolean_field BOOLEAN,
           |  float_field FLOAT,
           |  byte_field BYTE,
           |  short_field SHORT,
           |  decimal_field DECIMAL(10, 5),
           |  date_field DATE,
           |  string_field STRING,
           |  timestamp_field TIMESTAMP
           |) USING hudi
           | TBLPROPERTIES (primaryKey = 'id')
           | PARTITIONED BY (boolean_field, float_field, byte_field, short_field, decimal_field, date_field, string_field, timestamp_field)
     """.stripMargin)

spark.sql(
s"""
   |INSERT INTO hudi_test_null_partition VALUES
   |(1, TRUE, CAST(1.0 as FLOAT), 1, 1, 1234.56789, DATE '2021-01-05', 'partition1', null),
   |(2, FALSE,CAST(2.0 as FLOAT), 2, 2, 6789.12345, DATE '2021-01-06', 'partition2', TIMESTAMP '2021-01-06 11:00:00')
""".stripMargin)

ERROR:

Caused by: java.lang.NumberFormatException: For input string: "__HIVE_DEFAULT_PARTITION__"
  at java.base/java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
  at java.base/java.lang.Long.parseLong(Long.java:692)
  at java.base/java.lang.Long.parseLong(Long.java:817)
  at scala.collection.immutable.StringLike.toLong(StringLike.scala:315)
  at scala.collection.immutable.StringLike.toLong$(StringLike.scala:315)
  at scala.collection.immutable.StringOps.toLong(StringOps.scala:33)
  at org.apache.spark.sql.hudi.command.SqlKeyGenerator.$anonfun$convertPartitionPathToSqlType$1(SqlKeyGenerator.scala:170)
  at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
  at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
  at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
  at scala.collection.TraversableLike.map(TraversableLike.scala:286)
  at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
  at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
  at org.apache.spark.sql.hudi.command.SqlKeyGenerator.convertPartitionPathToSqlType(SqlKeyGenerator.scala:156)
  at org.apache.spark.sql.hudi.command.SqlKeyGenerator.getPartitionPath(SqlKeyGenerator.scala:106)
  at org.apache.hudi.HoodieCreateRecordUtils$.getHoodieKeyAndMaybeLocationFromAvroRecord(HoodieCreateRecordUtils.scala:209)
  at org.apache.hudi.HoodieCreateRecordUtils$.$anonfun$createHoodieRecordRdd$5(HoodieCreateRecordUtils.scala:130)
rangareddy commented 1 month ago

Spark with Hive

spark.sql(
        s"""
           |CREATE TABLE hive_test_null_partition (
           |  id INT,
           |  boolean_field BOOLEAN,
           |  float_field FLOAT,
           |  byte_field BYTE,
           |  short_field SHORT,
           |  decimal_field DECIMAL(10, 5),
           |  date_field DATE,
           |  string_field STRING,
           |  timestamp_field TIMESTAMP
           |) 
           | PARTITIONED BY (boolean_field, float_field, byte_field, short_field, decimal_field, date_field, string_field, timestamp_field)
     """.stripMargin)

spark.sql("set hive.exec.dynamic.partition.mode=nonstrict")

spark.sql(
s"""
   |INSERT INTO hive_test_null_partition VALUES
   |(1, TRUE, CAST(1.0 as FLOAT), 1, 1, 1234.56789, DATE '2021-01-05', 'partition1', null),
   |(2, FALSE,CAST(2.0 as FLOAT), 2, 2, 6789.12345, DATE '2021-01-06', 'partition2', TIMESTAMP '2021-01-06 11:00:00')
""".stripMargin)
# tree hive_test_null_partition
hive_test_null_partition
├── boolean_field=false
│   └── float_field=2.0
│       └── byte_field=2
│           └── short_field=2
│               └── decimal_field=6789.12345
│                   └── date_field=2021-01-06
│                       └── string_field=partition2
│                           └── timestamp_field=2021-01-06 11%3A00%3A00
│                               └── part-00001-106ff079-b48f-4428-9bc6-c55ea4b8caec.c000
└── boolean_field=true
    └── float_field=1.0
        └── byte_field=1
            └── short_field=1
                └── decimal_field=1234.56789
                    └── date_field=2021-01-05
                        └── string_field=partition1
                            └── timestamp_field=__HIVE_DEFAULT_PARTITION__
                                └── part-00000-106ff079-b48f-4428-9bc6-c55ea4b8caec.c000
rangareddy commented 1 month ago

Created upstream jira for this issue - https://issues.apache.org/jira/browse/HUDI-8315