databricks / spark-xml

XML data source for Spark SQL and DataFrames
Apache License 2.0
499 stars 226 forks source link

Cannot write dataframe with custom timestampFormat #663

Closed dolfinus closed 10 months ago

dolfinus commented 10 months ago

Hi.

I've created simple dataframe:

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, TimestampType
from datetime import datetime, timezone

spark = SparkSession.builder.config("spark.jars.packages", "com.databricks:spark-xml_2.12:0.17.0").getOrCreate()
schema = StructType([StructField("created-at", TimestampType())])

df = spark.createDataFrame([{"created-at": datetime.now(tz=timezone.utc)}], schema=schema)
df.show(10, False)
created-at
2023-10-09 09:05:24.269352

Then I try to save it as xml:

df.repartition(1).write \
  .format("xml") \
  .mode("overwrite") \
  .option("compression", None) \
  .option("rowTag", "item") \
  .save("2.xml")

Resulting xml:

<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<ROWS>
    <item>
        <created-at>2023-10-09T09:05:24.269352Z</created-at>
    </item>
</ROWS>

Then I want to change timestamp format:

df.repartition(1).write \
  .format("xml") \
  .mode("overwrite") \
  .option("compression", None) \
  .option("rowTag", "item") \
  .option("timestampFormat", "yyyy-MM-dd HH:mm:ss.SSSXXX") \
  .save("2.xml")

I got an exception:

23/10/09 09:13:31 ERROR Utils: Aborting task
java.time.temporal.UnsupportedTemporalTypeException: Unsupported field: YearOfEra
        at java.time.Instant.getLong(Instant.java:603)
        at java.time.format.DateTimePrintContext.getValue(DateTimePrintContext.java:298)
        at java.time.format.DateTimeFormatterBuilder$NumberPrinterParser.format(DateTimeFormatterBuilder.java:2551)
        at java.time.format.DateTimeFormatterBuilder$CompositePrinterParser.format(DateTimeFormatterBuilder.java:2190)
        at java.time.format.DateTimeFormatter.formatTo(DateTimeFormatter.java:1746)
        at java.time.format.DateTimeFormatter.format(DateTimeFormatter.java:1720)
        at com.databricks.spark.xml.parsers.StaxXmlGenerator$.writeElement$1(StaxXmlGenerator.scala:89)
        at com.databricks.spark.xml.parsers.StaxXmlGenerator$.writeChildElement$1(StaxXmlGenerator.scala:57)
        at com.databricks.spark.xml.parsers.StaxXmlGenerator$.writeChild$1(StaxXmlGenerator.scala:79)
        at com.databricks.spark.xml.parsers.StaxXmlGenerator$.$anonfun$apply$12(StaxXmlGenerator.scala:130)
        at com.databricks.spark.xml.parsers.StaxXmlGenerator$.$anonfun$apply$12$adapted(StaxXmlGenerator.scala:128)
        at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
        at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
        at com.databricks.spark.xml.parsers.StaxXmlGenerator$.writeElement$1(StaxXmlGenerator.scala:128)
        at com.databricks.spark.xml.parsers.StaxXmlGenerator$.apply(StaxXmlGenerator.scala:155)
        at com.databricks.spark.xml.util.XmlFile$$anon$1.next(XmlFile.scala:134)
        at com.databricks.spark.xml.util.XmlFile$$anon$1.next(XmlFile.scala:111)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
        at org.apache.spark.internal.io.SparkHadoopWriter$.$anonfun$executeTask$1(SparkHadoopWriter.scala:137)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1563)
        at org.apache.spark.internal.io.SparkHadoopWriter$.executeTask(SparkHadoopWriter.scala:135)
        at org.apache.spark.internal.io.SparkHadoopWriter$.$anonfun$write$1(SparkHadoopWriter.scala:88)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
        at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
        at org.apache.spark.scheduler.Task.run(Task.scala:139)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
23/10/09 09:13:31 ERROR SparkHadoopWriter: Task attempt_20231009091331224191220077987097_0471_m_000000_0 aborted.
23/10/09 09:13:31 ERROR Executor: Exception in task 0.0 in stage 79.0 (TID 131)
org.apache.spark.SparkException: Task failed while writing rows
        at org.apache.spark.internal.io.SparkHadoopWriter$.executeTask(SparkHadoopWriter.scala:163)
        at org.apache.spark.internal.io.SparkHadoopWriter$.$anonfun$write$1(SparkHadoopWriter.scala:88)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
        at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
        at org.apache.spark.scheduler.Task.run(Task.scala:139)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

This looks like https://stackoverflow.com/a/27483371 and caused by this line: https://github.com/databricks/spark-xml/blob/b2611bd20e917a75b7e96f5eb5cbc78f5ab21740/src/main/scala/com/databricks/spark/xml/parsers/StaxXmlGenerator.scala#L87

There is no such error if I pass custom timestamoFormat during reading, and this is probably fixed here: https://github.com/databricks/spark-xml/blob/b2611bd20e917a75b7e96f5eb5cbc78f5ab21740/src/main/scala/com/databricks/spark/xml/util/TypeCast.scala#L155

dolfinus commented 10 months ago

Passing custom dateFormat does not raise exception:

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, TimestampType, DateType
from datetime import date, datetime, timezone

spark = SparkSession.builder.config("spark.jars.packages", "com.databricks:spark-xml_2.12:0.17.0").getOrCreate()
schema = StructType(
    [
        StructField("created_date", DateType()),
        StructField("created_datetime", TimestampType())
    ],
)

df = spark.createDataFrame(
    [
        {"created_date": date.today()},
        {"created_datetime": datetime.now(tz=timezone.utc)},
    ],
    schema=schema,
)
df.show(10, False)

df.repartition(1).write \
  .format("xml") \
  .mode("overwrite") \
  .option("compression", None) \
  .option("rowTag", "item") \
  .option("dateFormat", "yyyy/MM/dd") \
  .save("2.xml")
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<ROWS>
    <item>
        <created_date>2023/10/09</created_date>
    </item>

    <item>
        <created_datetime>2023-10-09T09:34:22.238027Z</created_datetime>
    </item>

</ROWS>
srowen commented 10 months ago

It's not related to this library, but the Java timestamp formatter rules. I believe the issue is that you haven't set a timezone but in later Java versions, yyyy depends on timezone? You can try the format pattern "uuuu". I don't think that's quite right, but it's close

dolfinus commented 10 months ago

Why this option is working fine while reading, but fails while writing?

dolfinus commented 10 months ago

With yyyy -> uuuu:

df.write.format("xml").option("timestampFormat", "uuuu-MM-dd HH:mm:ss.SSSXXX").mode("overwrite").save("2.xml")
Caused by: java.time.temporal.UnsupportedTemporalTypeException: Unsupported field: Year
        at java.time.Instant.getLong(Instant.java:603)
        at java.time.format.DateTimePrintContext.getValue(DateTimePrintContext.java:298)
        at java.time.format.DateTimeFormatterBuilder$NumberPrinterParser.format(DateTimeFormatterBuilder.java:2551)
        at java.time.format.DateTimeFormatterBuilder$CompositePrinterParser.format(DateTimeFormatterBuilder.java:2190)
        at java.time.format.DateTimeFormatter.formatTo(DateTimeFormatter.java:1746)
        at java.time.format.DateTimeFormatter.format(DateTimeFormatter.java:1720)
        at com.databricks.spark.xml.parsers.StaxXmlGenerator$.writeElement$1(StaxXmlGenerator.scala:89)
        at com.databricks.spark.xml.parsers.StaxXmlGenerator$.writeChildElement$1(StaxXmlGenerator.scala:57)
        at com.databricks.spark.xml.parsers.StaxXmlGenerator$.writeChild$1(StaxXmlGenerator.scala:79)
        at com.databricks.spark.xml.parsers.StaxXmlGenerator$.$anonfun$apply$12(StaxXmlGenerator.scala:130)
        at com.databricks.spark.xml.parsers.StaxXmlGenerator$.$anonfun$apply$12$adapted(StaxXmlGenerator.scala:128)
        at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
        at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
        at com.databricks.spark.xml.parsers.StaxXmlGenerator$.writeElement$1(StaxXmlGenerator.scala:128)
        at com.databricks.spark.xml.parsers.StaxXmlGenerator$.apply(StaxXmlGenerator.scala:155)
        at com.databricks.spark.xml.util.XmlFile$$anon$1.next(XmlFile.scala:134)
        at com.databricks.spark.xml.util.XmlFile$$anon$1.next(XmlFile.scala:111)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
        at org.apache.spark.internal.io.SparkHadoopWriter$.$anonfun$executeTask$1(SparkHadoopWriter.scala:137)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1563)
        at org.apache.spark.internal.io.SparkHadoopWriter$.executeTask(SparkHadoopWriter.scala:135)
        ... 10 more

With explicit timezone:

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, TimestampType
from datetime import datetime, timezone

spark = SparkSession.builder.config("spark.jars.packages", "com.databricks:spark-xml_2.12:0.17.0").config("spark.sql.session.timeZone", "UTC").getOrCreate()
schema = StructType([StructField("created-at", TimestampType())])

df = spark.createDataFrame([{"created-at": datetime.now(tz=timezone.utc)}], schema=schema)
df.show(10, False)

df.write.format("xml").option("timestampFormat", "yyyy-MM-dd HH:mm:ss.SSSXXX").mode("overwrite").save("2.xml")

Same exception as in the issue description:

Caused by: java.time.temporal.UnsupportedTemporalTypeException: Unsupported field: YearOfEra
        at java.time.Instant.getLong(Instant.java:603)
        at java.time.format.DateTimePrintContext.getValue(DateTimePrintContext.java:298)
        at java.time.format.DateTimeFormatterBuilder$NumberPrinterParser.format(DateTimeFormatterBuilder.java:2551)
        at java.time.format.DateTimeFormatterBuilder$CompositePrinterParser.format(DateTimeFormatterBuilder.java:2190)
        at java.time.format.DateTimeFormatter.formatTo(DateTimeFormatter.java:1746)
        at java.time.format.DateTimeFormatter.format(DateTimeFormatter.java:1720)
        at com.databricks.spark.xml.parsers.StaxXmlGenerator$.writeElement$1(StaxXmlGenerator.scala:89)
        at com.databricks.spark.xml.parsers.StaxXmlGenerator$.writeChildElement$1(StaxXmlGenerator.scala:57)
        at com.databricks.spark.xml.parsers.StaxXmlGenerator$.writeChild$1(StaxXmlGenerator.scala:79)
        at com.databricks.spark.xml.parsers.StaxXmlGenerator$.$anonfun$apply$12(StaxXmlGenerator.scala:130)
        at com.databricks.spark.xml.parsers.StaxXmlGenerator$.$anonfun$apply$12$adapted(StaxXmlGenerator.scala:128)
        at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
        at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
        at com.databricks.spark.xml.parsers.StaxXmlGenerator$.writeElement$1(StaxXmlGenerator.scala:128)
        at com.databricks.spark.xml.parsers.StaxXmlGenerator$.apply(StaxXmlGenerator.scala:155)
        at com.databricks.spark.xml.util.XmlFile$$anon$1.next(XmlFile.scala:134)
        at com.databricks.spark.xml.util.XmlFile$$anon$1.next(XmlFile.scala:111)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
        at org.apache.spark.internal.io.SparkHadoopWriter$.$anonfun$executeTask$1(SparkHadoopWriter.scala:137)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1563)
        at org.apache.spark.internal.io.SparkHadoopWriter$.executeTask(SparkHadoopWriter.scala:135)
        ... 10 more
srowen commented 10 months ago

I see it, the write path does not set a timezone on the formatter used to write the timestamp. That is the problem here, i'm pretty sure. That's easy to fix in this code, but not sure if there will be another release here, as it's in Spark now.

For now, you could work around by manually formatting the timestamp as a string as desired before writing.

dolfinus commented 10 months ago

but not sure if there will be another release here, as it's in Spark now.

XML support will be added to Spark 4.x which will be released on June 2024. Migration from Spark 3.x to 4.x will take quite a long time.

Do you mean that users will not receive any bug fixes for spark-xml package for at least a year?

srowen commented 10 months ago

Not necessarily, it's possible to create more releases if needed, but I'd set the expectation that there won't be releases unless it's pretty important