Closed chrisfw closed 2 years ago
I'm not clear that it is related to the XML processing. What happens if you show the result of just loading the XML, with no other code?
That works, but something about how the nested structures are created under the hood seems to be leaving me unable to get them out separately. I can persist df19 itself, but not nested structure that represents one of the complex columns. I just cannot figure out how to get that nested structure out as a separate parquet file and I am hoping for at least some guidance since the schema seems to report a "normal" structure yet I can't do a simple show() on it or save it to a parquet file.
I don't know what the data looks like, so not sure there is any problem with the parsing result? XML is nested by nature and nested types are normal and expected. I think there is a problem with the subsequent transforms. What is normal, why not just select the cols you want?
I actually put the data samples in the zip attachment to reproduce the issue. I am trying to select the columns I want into a new DF and therein lies the problem, I can't treat it as a normal dataframe when I do that. The classcastexception occurs. It isn't necessarily a problem with the results at least in terms of how the structure appears to be represented, but I can't get the nested structures exploded/flattened/exported
I see it now, OK. There shouldn't be a need to infer the schema, then pass it in. Just let it infer when it reads. The result looks OK to me when I parse the file, as you say. I don't think it's the select here, it's some transformation downstream. I am not sure which part causes it or why, but maybe you can eliminate parts of it until the error goes away?
Exploding a struct is just .select("mystruct.*")
Hi Sean,
I found that if I don’t pass in the template, where there are multiple records defined for all of the nested arrays, those nested arrays and structs don’t get created and they are instead left as XML strings. Since many of the individual bill files that are processed may not have them, I found it needed to be done that way. I had tried using the XSD at one point (this was quite some time ago), but that unfortunately didn’t work either.
I’m not too familiar with Spark, but I have spent a lot of time and I’ve tried many different ways to export just the nested structure along with requisite ids from the main dataframe all to no avail. If you print the schema of the df that represents the nested struct it appears fine, but then trying to use it as a standard df or export it just doesn’t work. I’m at a loss given my limited knowledge of spark. I’ve tried using the underlying RDD, just extracting row by row, etc., etc. all with no luck. Any other ideas you can provide would be welcome.
Regards, Chris
On Apr 29, 2022, at 10:50 PM, Sean Owen @.***> wrote:
I see it now, OK. There shouldn't be a need to infer the schema, then pass it in. Just let it infer when it reads. The result looks OK to me when I parse the file, as you say. I don't think it's the select here, it's some transformation downstream. I am not sure which part causes it or why, but maybe you can eliminate parts of it until the error goes away?
Exploding a struct is just .select("mystruct.*")
— Reply to this email directly, view it on GitHub, or unsubscribe. You are receiving this because you authored the thread.
Hm, there should be no difference in inferring X's schema and then applying it, vs just letting it infer and parse in one shot. I see you aren't inferring on the same data that is parsed, or it's not obvious that it's the same. Is that possibly an issue? it is worth ruling out, esp as you are saying 'the schema isn't right'. As I say the parsing of the input, after inferring on that input, is fine. I'm not familiar with what else you are trying to do - is 'exporting a struct' not just selecting the fields? what do you mean the schema is fine but using it or exporting it doesn't work?
Thanks a lot for investigating this Sean. The schema inference is problematic for those nested fields that represent 0...x size arrays of nested structs (that also contain 0...x size arrays). The template fills out those arrays so that the arrays and nested structures can be properly inferred. In the actual file some of those array fields are empty so the inference cannot properly deduce the structure of those fields, hence the need to provide the template so the inference can correctly deduce the array fields and the nested structs/arrays within. You should be able to see the problem where the nested fields cannot be properly inferred simply by running without using the template for schema inference. Both the template file used for inference and the actual file whose data I want to utilize have the same schema, but as I mentioned the "real" data may have zero size arrays for various elements making the inference inaccurate.
When I say the schema isn't right, I mean that what looks like a straightforward dataframe schema shown when using printSchema doesn't seem to be what is reflected in the error when I try and do a show() or export the data frame to a parquet file. If you look at the below schema, where is this unsafe array data that is referenced in the exception when show() is called? Where is there even an array at all? You should be able to reproduce this issue using the info in the issue and again I am not suggesting that it is a problem with the XML parsing into the df, I am merely trying to determine how to successfully extract the nested data into their own dataframes for export into separate parquet files for use in an Oracle database as external tables. java.lang.ClassCastException: class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData cannot be cast to class org.apache.spark.sql.catalyst.InternalRow
scala> dfDetailBlockExp.printSchema root |-- BlockElem: struct (nullable = true) | |-- DetailAmt: double (nullable = true) | |-- DetailRate: double (nullable = true) | |-- DetailTierNumber: long (nullable = true) | |-- DetailTierType: long (nullable = true) | |-- DetailTierUse: long (nullable = true) | |-- DetailUsage: double (nullable = true) | |-- DetailUsageXRate: double (nullable = true)
scala> dfDetailBlock.show() 22/04/30 09:55:36 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 2) java.lang.ClassCastException: class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData cannot be cast to class org.apache.spark.sql.catalyst.InternalRow (org.apache.spark.sql.catalyst.expressions.UnsafeArrayData and org.apache.spark.sql.catalyst.InternalRow are in unnamed module of loader 'app') at org.apache.spark.sql.catalyst.util.GenericArrayData.getStruct(GenericArrayData.scala:76) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759) at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:349) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829)
On Sat, Apr 30, 2022 at 9:09 AM Sean Owen @.***> wrote:
Hm, there should be no difference in inferring X's schema and then applying it, vs just letting it infer and parse in one shot. I see you aren't inferring on the same data that is parsed, or it's not obvious that it's the same. Is that possibly an issue? it is worth ruling out, esp as you are saying 'the schema isn't right'. As I say the parsing of the input, after inferring on that input, is fine. I'm not familiar with what else you are trying to do - is 'exporting a struct' not just selecting the fields? what do you mean the schema is fine but using it or exporting it doesn't work?
— Reply to this email directly, view it on GitHub https://github.com/databricks/spark-xml/issues/579#issuecomment-1113986561, or unsubscribe https://github.com/notifications/unsubscribe-auth/ABOZ7PP6VD4ZGNDW3GQUKTTVHUWKNANCNFSM5UXTIR3Q . You are receiving this because you authored the thread.Message ID: @.***>
It seems no matter how I try and get at the underlying dataframe data, I am foiled by the same error.
scala> dfDetailBlock.write.saveAsTable("DetailBlock")
22/04/30 10:28:50 ERROR Utils: Aborting task
java.lang.ClassCastException: class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData cannot be cast to class org.apache.spark.sql.catalyst.InternalRow (org.apache.spark.sql.catalyst.expressions.UnsafeArrayData and org.apache.spark.sql.catalyst.InternalRow are in unnamed module of loader 'app')
at org.apache.spark.sql.catalyst.util.GenericArrayData.getStruct(GenericArrayData.scala:76)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.writeWithIterator(FileFormatDataWriter.scala:91)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:304)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1496)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:311)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$16(FileFormatWriter.scala:229)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
I get it, you infer schema on a template representing all possible elements. That's fine.
There are certainly arrays in play here - any repeated element in XML will be an array (of structs). But the "UnsafeArrayData" stuff is Spark internals.
I can reproduce that and I think it's a Spark bug. In .select($"DetailLines.DetailSubRecord.DetailBlocks.DetailBlock")
you're selecting the result of an explode(), which is fine until you try to select nested fields from the explode result, and I'm not sure why.
I tried inserting a write/read to Parquet before this line, to try to rule out anything related to the original spark-xml parsing, and still face an error, although it is a slightly different error, which gives me pause: org.apache.spark.sql.types.ArrayType cannot be cast to org.apache.spark.sql.types.StructType
That might be Spark telling us that it doesn't like the semantics of selecting myarray.myfield, but I thought that was fine (results in an array of the field values). There again though, if not that, it's a Spark bug. I am not sure anything is wrong with the code/query.
Are there other ways you can express this logic?
Thanks for your analysis and confirmation Sean. I was thinking of processing those nested elements separately in multiple passes via spark-xml, but then I lose access to some required key data from the parent, e.g. account number, invoice number - the key information I would need in order to be able to join properly on the Oracle side. I am certainly open to any suggestions you might have. The other thought would be to preprocess the XML to ensure the contextual parent key information is available in the child nodes prior to loading them via spark-xml, but I am hoping to avoid what seems like it should be an unnecessary step if there is some way around what appears to be a spark internals issue.
Also, if these are indeed Spark bugs, what is the best way for me to submit them where they might get some traction - I saw there was a Jira, should it be added there?
Thanks again for all of your help.
Regards, Chris
On Sat, Apr 30, 2022 at 4:34 PM Sean Owen @.***> wrote:
I get it, you infer schema on a template representing all possible elements. That's fine.
There are certainly arrays in play here - any repeated element in XML will be an array (of structs). But the "UnsafeArrayData" stuff is Spark internals.
I can reproduce that and I think it's a Spark bug. In .select($"DetailLines.DetailSubRecord.DetailBlocks.DetailBlock") you're selecting the result of an explode(), which is fine until you try to select nested fields from the explode result, and I'm not sure why.
I tried inserting a write/read to Parquet before this line, to try to rule out anything related to the original spark-xml parsing, and still face an error, although it is a slightly different error, which gives me pause: org.apache.spark.sql.types.ArrayType cannot be cast to org.apache.spark.sql.types.StructType
That might be Spark telling us that it doesn't like the semantics of selecting myarray.myfield, but I thought that was fine (results in an array of the field values). There again though, if not that, it's a Spark bug. I am not sure anything is wrong with the code/query.
Are there other ways you can express this logic?
— Reply to this email directly, view it on GitHub https://github.com/databricks/spark-xml/issues/579#issuecomment-1114049958, or unsubscribe https://github.com/notifications/unsubscribe-auth/ABOZ7PIGNQOWZWFD2K54TE3VHWKMBANCNFSM5UXTIR3Q . You are receiving this because you authored the thread.Message ID: @.***>
Oh, I think it works if you avoid referencing a whole array in the select. Try for example explode($"DetailLines.DetailSubRecord").as("DetailSubRecord")
as an intermediate step to get to dfDetailBlock
Ooh, if I do that, I can work my way down to the level of granularity/struct level I am trying to get to? Ok, I will give that a try, thank you.
On Sat, Apr 30, 2022 at 4:49 PM Sean Owen @.***> wrote:
Oh, I think it works if you avoid referencing a whole array in the select. Try for example explode($"DetailLines.DetailSubRecord").as("DetailSubRecord") as an intermediate step to get to dfDetailBlock
— Reply to this email directly, view it on GitHub https://github.com/databricks/spark-xml/issues/579#issuecomment-1114051832, or unsubscribe https://github.com/notifications/unsubscribe-auth/ABOZ7PKQGAYFPEAB4UH55V3VHWMERANCNFSM5UXTIR3Q . You are receiving this because you authored the thread.Message ID: @.***>
Sean, thank you so much for all of your help! Your suggestion worked and I am able to drill down and retrieve the nested data at the most granular level I was looking to retrieve. I really, really appreciate your assistance with this!
Regards, Chris
scala> val dfDetailLines = df19.select($"AcctNum",$"BillingDate",$"InvoiceNum",explode($"DetailLines.DetailLine").as("DetailLines"))
dfDetailLines: org.apache.spark.sql.DataFrame = [AcctNum: bigint, BillingDate: date ... 2 more fields]
scala> val detailSubRec = dfDetailLines.select(explode($"DetailLines.DetailSubRecord").as("DetailSubRecord"))
detailSubRec: org.apache.spark.sql.DataFrame = [DetailSubRecord: struct<DetailApplication: bigint, DetailBankedUsage: bigint ... 39 more fields>]
scala> var dfDetailBlock = detailSubRec.select($"DetailSubRecord.DetailBlocks.DetailBlock")
dfDetailBlock: org.apache.spark.sql.DataFrame = [DetailBlock: array<struct<DetailAmt:double,DetailRate:double,DetailTierNumber:bigint,DetailTierType:bigint,DetailTierUse:bigint,DetailUsage:double,DetailUsageXRate:double>>]
scala> val dfDetailBlockElem = dfDetailBlock.select(explode($"DetailBlock") as "Block")
dfDetailBlockElem: org.apache.spark.sql.DataFrame = [Block: struct<DetailAmt: double, DetailRate: double ... 5 more fields>]
scala> val dfDetailBlockExp = dfDetailBlockElem.select("Block.*")
dfDetailBlockExp: org.apache.spark.sql.DataFrame = [DetailAmt: double, DetailRate: double ... 5 more fields]
scala> dfDetailBlockExp.show()
+---------+----------+----------------+--------------+-------------+-----------+----------------+
|DetailAmt|DetailRate|DetailTierNumber|DetailTierType|DetailTierUse|DetailUsage|DetailUsageXRate|
+---------+----------+----------------+--------------+-------------+-----------+----------------+
| 0.15| 0.0| 1| 0| 0|2.0220413E7| 0.0|
| 14.14| 0.0| 1| 0| 0| 0.0| 0.0|
| 1.5| 1.5| 1| 0| 0| 1.0| 1.5|
| 0.28| 0.02| 1| 0| 0| 0.02| 4.0E-4|
| 7.9| 0.0| 1| 0| 0| 0.0| 0.0|
| 0.16| 0.02| 1| 0| 0| 0.02| 4.0E-4|
| 3.01| 0.0| 1| 0| 0|2.0220413E7| 0.0|
| 29.5| 29.5| 1| 0| 0| 1.0| 29.5|
| 2.43| 0.0825| 1| 0| 0| 0.082| 0.0068|
| 0.59| 0.02| 1| 0| 0| 0.02| 4.0E-4|
| 0.9| 0.0| 1| 0| 0|2.0220413E7| 0.0|
| 9.0| 9.0| 1| 0| 0| 1.0| 9.0|
+---------+----------+----------------+--------------+-------------+-----------+----------------+
Great, glad to hear. The error was mysterious; I'm not actually sure whether the original query is valid and it's a bug, or invalid and it's a poor error. But decomposing it is probably easier to reason about anyway.
Yes, it certainly was! I am just relieved it is working and I can continue on. Thank you again!
Regards, Chris
Using Spark version 3.2.1 Using Scala version 2.12.15 (OpenJDK 64-Bit Server VM, Java 11.0.14.1)
I load the xml files below. First one to establish the schema and the second with the actual instance of a billing xml for an account. The problem occurs when I attempt to create a dataframe from one of the columns which contains a complex nested structure. Just calling show() on the dataframe yields the class cast exception shown below. I've attached the XML files for review. I struggled with this issue for many hours today. Any assistance or guidance you can provide would be much appreciated.
Regards, Chris Whelan
billingsample.zip