databricks / spark-xml

XML data source for Spark SQL and DataFrames
Apache License 2.0
500 stars 226 forks source link

decimalCannotGreaterThanPrecisionError with DecimalType() and value = 0.0X #626

Closed marcuskw closed 1 year ago

marcuskw commented 1 year ago

Apache Spark 3.3.0 Scala 2.12 spark-xml version: 0.14Apache Spark 3.3.0, Scala 2.12

When parsing XML in Pyspark using ext_from_xml we get an error related to DecimalType(). When we change the XML value to 1.01 instead of 0.01 we no longer get this issue.

Repro steps:

from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql.column import Column, _to_java_column
from pyspark.sql.types import _parse_datatype_json_string

schema = StructType([StructField('Element', DecimalType(21,6), True)])

def ext_from_xml(xml_column, schema, options={}):
    java_column = _to_java_column(xml_column.cast("string"))
    java_schema = spark._jsparkSession.parseDataType(schema.json())
    scala_map = spark._jvm.org.apache.spark.api.python.PythonUtils.toScalaMap(options)
    jc = spark._jvm.com.databricks.spark.xml.functions.from_xml(
        java_column, java_schema, scala_map
    )
    return Column(jc)

df = spark.createDataFrame(["<Test><Element>0.01</Element></Test>"], StringType())

parsedDF = df.select(
        ext_from_xml(F.col("value"), schema, {"mode": "FAILFAST"}).alias("value")
    )

display(parsedDF)

Error message: AnalysisException: Decimal scale (2) cannot be greater than precision (1). org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 16.0 failed 4 times, most recent failure: Lost task 2.3 in stage 16.0 (TID 58) (10.139.64.12 executor 0): org.apache.spark.sql.AnalysisException: Decimal scale (2) cannot be greater than precision (1). at org.apache.spark.sql.errors.QueryCompilationErrors$.decimalCannotGreaterThanPrecisionError(QueryCompilationErrors.scala:1942) at org.apache.spark.sql.types.DecimalType.(DecimalType.scala:49) at org.apache.spark.sql.catalyst.CatalystTypeConverters$.convertToCatalyst(CatalystTypeConverters.scala:511) at org.apache.spark.sql.catalyst.CatalystTypeConverters$.$anonfun$convertToCatalyst$2(CatalystTypeConverters.scala:513) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286) at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:38) at scala.collection.TraversableLike.map(TraversableLike.scala:286) at scala.collection.TraversableLike.map$(TraversableLike.scala:279) at scala.collection.AbstractTraversable.map(Traversable.scala:108) at org.apache.spark.sql.catalyst.CatalystTypeConverters$.convertToCatalyst(CatalystTypeConverters.scala:513) at com.databricks.spark.xml.XmlDataToCatalyst.nullSafeEval(XmlDataToCatalyst.scala:45) at org.apache.spark.sql.catalyst.expressions.UnaryExpression.eval(Expression.scala:670) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at org.apache.spark.sql.execution.collect.UnsafeRowBatchUtils$.encodeUnsafeRows(UnsafeRowBatchUtils.scala:81) at org.apache.spark.sql.execution.collect.Collector.$anonfun$processFunc$1(Collector.scala:186) at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55) at org.apache.spark.scheduler.Task.doRunTask(Task.scala:169) at org.apache.spark.scheduler.Task.$anonfun$run$4(Task.scala:137) at com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:36) at com.databricks.unity.HandleImpl.runWith(UCSHandle.scala:58) at com.databricks.unity.HandleImpl.$anonfun$runWithAndClose$1(UCSHandle.scala:63) at scala.util.Using$.resource(Using.scala:269) at com.databricks.unity.HandleImpl.runWithAndClose(UCSHandle.scala:62) at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:137) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.Task.run(Task.scala:96) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:902) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1696) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:905) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:760) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750)

Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:3312) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:3244) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:3235) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:3235) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1424) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1424) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1424) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3524) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3462) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3450) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:51) at org.apache.spark.scheduler.DAGScheduler.$anonfun$runJob$1(DAGScheduler.scala:1169) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1157) at org.apache.spark.SparkContext.runJobInternal(SparkContext.scala:2713) at org.apache.spark.sql.execution.collect.Collector.$anonfun$runSparkJobs$1(Collector.scala:312) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80) at org.apache.spark.sql.execution.collect.Collector.runSparkJobs(Collector.scala:271) at org.apache.spark.sql.execution.collect.Collector.collect(Collector.scala:322) at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:105) at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:112) at org.apache.spark.sql.execution.qrc.InternalRowFormat$.collect(cachedSparkResults.scala:115) at org.apache.spark.sql.execution.qrc.InternalRowFormat$.collect(cachedSparkResults.scala:104) at org.apache.spark.sql.execution.qrc.InternalRowFormat$.collect(cachedSparkResults.scala:88) at org.apache.spark.sql.execution.qrc.ResultCacheManager.$anonfun$computeResult$1(ResultCacheManager.scala:527) at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80) at org.apache.spark.sql.execution.qrc.ResultCacheManager.collectResult$1(ResultCacheManager.scala:519) at org.apache.spark.sql.execution.qrc.ResultCacheManager.computeResult(ResultCacheManager.scala:539) at org.apache.spark.sql.execution.qrc.ResultCacheManager.$anonfun$getOrComputeResultInternal$1(ResultCacheManager.scala:396) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.sql.execution.qrc.ResultCacheManager.getOrComputeResultInternal(ResultCacheManager.scala:390) at org.apache.spark.sql.execution.qrc.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:292) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeCollectResult$1(SparkPlan.scala:431) at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80) at org.apache.spark.sql.execution.SparkPlan.executeCollectResult(SparkPlan.scala:428) at org.apache.spark.sql.Dataset.collectResult(Dataset.scala:3424) at org.apache.spark.sql.Dataset.$anonfun$collectResult$1(Dataset.scala:3415) at org.apache.spark.sql.Dataset.$anonfun$withAction$3(Dataset.scala:4290) at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:777) at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4288) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$8(SQLExecution.scala:243) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:392) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:188) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:985) at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:142) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:342) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4288) at org.apache.spark.sql.Dataset.collectResult(Dataset.scala:3414) at com.databricks.backend.daemon.driver.OutputAggregator$.withOutputAggregation0(OutputAggregator.scala:267) at com.databricks.backend.daemon.driver.OutputAggregator$.withOutputAggregation(OutputAggregator.scala:101) at com.databricks.backend.daemon.driver.PythonDriverLocalBase.generateTableResult(PythonDriverLocalBase.scala:720) at com.databricks.backend.daemon.driver.JupyterDriverLocal.computeListResultsItem(JupyterDriverLocal.scala:1332) at com.databricks.backend.daemon.driver.JupyterDriverLocal$JupyterEntryPoint.addCustomDisplayData(JupyterDriverLocal.scala:489) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380) at py4j.Gateway.invoke(Gateway.java:306) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:195) at py4j.ClientServerConnection.run(ClientServerConnection.java:115) at java.lang.Thread.run(Thread.java:750) Caused by: org.apache.spark.sql.AnalysisException: Decimal scale (2) cannot be greater than precision (1). at org.apache.spark.sql.errors.QueryCompilationErrors$.decimalCannotGreaterThanPrecisionError(QueryCompilationErrors.scala:1942) at org.apache.spark.sql.types.DecimalType.(DecimalType.scala:49) at org.apache.spark.sql.catalyst.CatalystTypeConverters$.convertToCatalyst(CatalystTypeConverters.scala:511) at org.apache.spark.sql.catalyst.CatalystTypeConverters$.$anonfun$convertToCatalyst$2(CatalystTypeConverters.scala:513) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286) at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:38) at scala.collection.TraversableLike.map(TraversableLike.scala:286) at scala.collection.TraversableLike.map$(TraversableLike.scala:279) at scala.collection.AbstractTraversable.map(Traversable.scala:108) at org.apache.spark.sql.catalyst.CatalystTypeConverters$.convertToCatalyst(CatalystTypeConverters.scala:513) at com.databricks.spark.xml.XmlDataToCatalyst.nullSafeEval(XmlDataToCatalyst.scala:45) at org.apache.spark.sql.catalyst.expressions.UnaryExpression.eval(Expression.scala:670) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at org.apache.spark.sql.execution.collect.UnsafeRowBatchUtils$.encodeUnsafeRows(UnsafeRowBatchUtils.scala:81) at org.apache.spark.sql.execution.collect.Collector.$anonfun$processFunc$1(Collector.scala:186) at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55) at org.apache.spark.scheduler.Task.doRunTask(Task.scala:169) at org.apache.spark.scheduler.Task.$anonfun$run$4(Task.scala:137) at com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:36) at com.databricks.unity.HandleImpl.runWith(UCSHandle.scala:58) at com.databricks.unity.HandleImpl.$anonfun$runWithAndClose$1(UCSHandle.scala:63) at scala.util.Using$.resource(Using.scala:269) at com.databricks.unity.HandleImpl.runWithAndClose(UCSHandle.scala:62) at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:137) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.Task.run(Task.scala:96) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:902) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1696) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:905) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:760) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more

srowen commented 1 year ago

Duplicate of https://github.com/databricks/spark-xml/issues/622 Update to 0.16.0