databricks / spark-xml

XML data source for Spark SQL and DataFrames
Apache License 2.0
505 stars 227 forks source link

Problem use Scala functions in python. Scala error on Spark 3.3.2. Scala Version 2.13.8. #688

Open hipp0gryph opened 2 months ago

hipp0gryph commented 2 months ago

Hello! I try use Scala functions in Pythob from documentations on big xml data. All work correct if I use Spark with Scala 2.12. But on production cluster installed Spark 3.3.2 with Scala 2.13.8. And on that cluster i take error. Thank you in advice! Python example code for 2.12 (for 2.13 I changed version driver into code from 2.12 on 2.13):

from pyspark.sql import SparkSession, functions as F
from pyspark.sql.column import Column, _to_java_column
from pyspark.sql.types import StringType
from pyspark.sql.types import _parse_datatype_json_string

def ext_from_xml(xml_column, schema, options={}):
    java_column = _to_java_column(xml_column.cast('string'))
    java_schema = spark._jsparkSession.parseDataType(schema.json())
    scala_map = spark._jvm.org.apache.spark.api.python.PythonUtils.toScalaMap(options)
    jc = spark._jvm.com.databricks.spark.xml.functions.from_xml(
        java_column, java_schema, scala_map)
    return Column(jc)

def ext_schema_of_xml_df(df, options={}):
    assert len(df.columns) == 1

    scala_options = spark._jvm.PythonUtils.toScalaMap(options)
    java_xml_module = getattr(getattr(
        spark._jvm.com.databricks.spark.xml, "package$"), "MODULE$")
    java_schema = java_xml_module.schema_of_xml_df(df._jdf, scala_options)
    return _parse_datatype_json_string(java_schema.json())

spark = SparkSession.builder \
    .appName("Read XML from Archives") \
    .config("spark.jars.packages", "com.databricks:spark-xml_2.12:0.18.0") \
    .getOrCreate()
df = spark.createDataFrame([
    """<main xmlns:t1="2017-08-21" xmlns:t2="2022-09-22" 
    xmlns:t3="2017-10-23" xmlns:t4="2018-12-07" 
    xmlns="2022-09-22"></main>"""],
    StringType()).toDF("test")
schema = ext_schema_of_xml_df(df.select("test"))
df = df.select(ext_from_xml(F.col("test"), schema).alias("xml_content"))
df.printSchema()
df.show()

Error:

: org.apache.spark.SparkException: Job aborted due to stage failure: Task 54 in stage 23.0 failed 4 times, most recent failure: Lost task 54.3 in stage 23.0 (TID 618) (i148-ADHP-SRV-21 executor 1): java.lang.ClassCastException: scala.collection.mutable.ArrayBuffer cannot be cast to org.apache.spark.sql.catalyst.util.ArrayData
    at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getArray(rows.scala:48)
    at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getArray$(rows.scala:48)
    at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getArray(rows.scala:195)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.fieldToString_33_1$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.fieldToString_35_3$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at scala.collection.Iterator$$anon$9.next(Iterator.scala:577)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:365)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:136)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
    at scala.collection.immutable.List.foreach(List.scala:333)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
    at scala.Option.foreach(Option.scala:437)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2238)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2259)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2278)
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:506)
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:459)
    at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:48)
    at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3868)
    at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2863)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:3858)
    at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3856)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3856)
    at org.apache.spark.sql.Dataset.head(Dataset.scala:2863)
    at org.apache.spark.sql.Dataset.take(Dataset.scala:3084)
    at org.apache.spark.sql.Dataset.getRows(Dataset.scala:288)
    at org.apache.spark.sql.Dataset.showString(Dataset.scala:327)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
    at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.ClassCastException: scala.collection.mutable.ArrayBuffer cannot be cast to org.apache.spark.sql.catalyst.util.ArrayData
    at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getArray(rows.scala:48)
    at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getArray$(rows.scala:48)
    at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getArray(rows.scala:195)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.fieldToString_33_1$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.fieldToString_35_3$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at scala.collection.Iterator$$anon$9.next(Iterator.scala:577)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:365)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:136)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more

24/08/09 10:58:59 ERROR __main__: KND: An error occurred while calling o212.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 54 in stage 23.0 failed 4 times, most recent failure: Lost task 54.3 in stage 23.0 (TID 618) (i148-ADHP-SRV-21 executor 1): java.lang.ClassCastException: scala.collection.mutable.ArrayBuffer cannot be cast to org.apache.spark.sql.catalyst.util.ArrayData
    at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getArray(rows.scala:48)
    at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getArray$(rows.scala:48)
    at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getArray(rows.scala:195)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.fieldToString_33_1$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.fieldToString_35_3$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at scala.collection.Iterator$$anon$9.next(Iterator.scala:577)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:365)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:136)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
    at scala.collection.immutable.List.foreach(List.scala:333)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
    at scala.Option.foreach(Option.scala:437)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2238)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2259)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2278)
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:506)
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:459)
    at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:48)
    at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3868)
    at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2863)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:3858)
    at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3856)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3856)
    at org.apache.spark.sql.Dataset.head(Dataset.scala:2863)
    at org.apache.spark.sql.Dataset.take(Dataset.scala:3084)
    at org.apache.spark.sql.Dataset.getRows(Dataset.scala:288)
    at org.apache.spark.sql.Dataset.showString(Dataset.scala:327)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
    at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.ClassCastException: scala.collection.mutable.ArrayBuffer cannot be cast to org.apache.spark.sql.catalyst.util.ArrayData
    at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getArray(rows.scala:48)
    at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getArray$(rows.scala:48)
    at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getArray(rows.scala:195)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.fieldToString_33_1$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.fieldToString_35_3$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at scala.collection.Iterator$$anon$9.next(Iterator.scala:577)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:365)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:136)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more
hipp0gryph commented 2 months ago

I resolved my old problems with Python Spark using, but that problem blocked me and I find resolve on Scala for that. Thanks for you help in all my questions @srowen :)