databricks / spark-xml

XML data source for Spark SQL and DataFrames
Apache License 2.0
505 stars 227 forks source link

Fails on some comments with Scala: MatchError #164

Closed metador closed 8 years ago

metador commented 8 years ago

I'm trying to load a xml file and it fails while parsing the some of the comments. The comment <!-- 1 Beefy Fritos Bur --> fails with the error Scala: Match Error but looks like <!-- TLD Generator V1.28c --> is parsed without any problems.

Here is the XML

<?xml version="1.0" encoding="utf-8"?>
<!-- TLD Generator V1.28c -->
<POSLog xmlns="http://www.nrf-arts.org/IXRetail/namespace/" xmlns:ns1="xxx" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.nrf-arts.org/IXRetail/namespace/ ../XSD/POSLogForFoodserviceV3.0.0.xsd" xmlns:xyz="xxx">
  <Transaction MajorVersion="3" MinorVersion="0" FixVersion="0" TrainingModeFlag="false">
    <RetailStoreID>027505</RetailStoreID>
    <OrganizationHierarchy Level="OperatingCompany">NMBR</OrganizationHierarchy>
    <OrganizationHierarchy Level="Division">TBC</OrganizationHierarchy>
    <OrganizationHierarchy Level="Concept">TBC</OrganizationHierarchy>
    <WorkstationID>1</WorkstationID>
    <SequenceNumber>131629</SequenceNumber>
    <BusinessDayDate>2016-03-18</BusinessDayDate>
    <BeginDateTime>2016-03-18T22:13:18</BeginDateTime>
    <EndDateTime>2016-03-18T22:14:18</EndDateTime>
    <OperatorID OperatorType="Cashier" OperatorName="R, R">32</OperatorID>
    <ReceiptDateTime>2016-03-18T22:13:21</ReceiptDateTime>
    <RetailTransaction TransactionStatus="Finished">
      <PriceDerivationResult DiscountBenefit="Manager">
        <SequenceNumber>1</SequenceNumber>
        <Percent Action="Subtract">100.00</Percent>
        <PriceDerivationRule>
          <PriceDerivationRuleID>100% Manager Disc.</PriceDerivationRuleID>
          <Amount Action="Subtract">2.00</Amount>
        </PriceDerivationRule>
        <ReasonCode>FIXED DISCOUNT</ReasonCode>
        <OperatorBypassApproval>
          <SequenceNumber>1</SequenceNumber>
          <ApproverID>39</ApproverID>
        </OperatorBypassApproval>
      </PriceDerivationResult>
      <LineItem CancelFlag="false" VoidFlag="false">
        <!-- 1 Beefy Fritos Bur -->
        <SequenceNumber>2</SequenceNumber>
        <Sale>
          <POSIdentity>
            <POSItemID>B-BFFT</POSItemID>
          </POSIdentity>
          <RegularSalesUnitPrice>1.00</RegularSalesUnitPrice>
          <ActualSalesUnitPrice>1.00</ActualSalesUnitPrice>
          <ExtendedAmount>1.00</ExtendedAmount>
          <Quantity>1</Quantity>
        </Sale>
      </LineItem>
      <LineItem CancelFlag="false" VoidFlag="false">
        <!-- 1 Shrd Chk Mini Qu -->
        <SequenceNumber>3</SequenceNumber>
        <Sale>
          <POSIdentity>
            <POSItemID>MQ-SSC</POSItemID>
          </POSIdentity>
          <RegularSalesUnitPrice>1.00</RegularSalesUnitPrice>
          <ActualSalesUnitPrice>1.00</ActualSalesUnitPrice>
          <ExtendedAmount>1.00</ExtendedAmount>
          <Quantity>1</Quantity>
        </Sale>
      </LineItem>
      <!-- SOS -->
      <Total TotalType="TransactionGrossAmount">2.00</Total>
      <Total TotalType="TransactionNetAmount">0.00</Total>
      <Total TotalType="TransactionTaxAmount">0.00</Total>
      <Total TotalType="TransactionNonSalesAmount">2.00</Total>
      <Total TotalType="X:TransactionPromoAmount">0.00</Total>
      <Customer>
        <CustomerID>
        </CustomerID>
        <CustomerName>
          <FullName>
          </FullName>
        </CustomerName>
      </Customer>
      <Foodservice DestinationType="XXX">
      </Foodservice>
    </RetailTransaction>
  </Transaction>
 </POSLog>

Here is the code that I am trying to run:

trans = sqlContext.read.format('com.databricks.spark.xml').option("rowTag","Transaction").load("testing/sampleTLDnocomments.xml")

The error I am running into is the Scala:MatchError Here is the Log:

>>> trans1 = sqlContext.read.format('com.databricks.spark.xml').option("rowTag", "Transaction").load("testing/sampleTLD.xml")
16/08/29 21:45:48 INFO storage.MemoryStore: Block broadcast_31 stored as values in memory (estimated size 202.6 KB, free 202.6 KB)
16/08/29 21:45:48 INFO storage.MemoryStore: Block broadcast_31_piece0 stored as bytes in memory (estimated size 23.5 KB, free 226.1 KB)
16/08/29 21:45:48 INFO storage.BlockManagerInfo: Added broadcast_31_piece0 in memory on 172.31.30.114:60795 (size: 23.5 KB, free: 530.3 MB)
16/08/29 21:45:48 INFO spark.SparkContext: Created broadcast 31 from newAPIHadoopFile at XmlFile.scala:39
16/08/29 21:45:48 INFO input.FileInputFormat: Total input paths to process : 1
16/08/29 21:45:48 INFO spark.SparkContext: Starting job: treeAggregate at InferSchema.scala:103
16/08/29 21:45:48 INFO scheduler.DAGScheduler: Got job 13 (treeAggregate at InferSchema.scala:103) with 1 output partitions
16/08/29 21:45:48 INFO scheduler.DAGScheduler: Final stage: ResultStage 13 (treeAggregate at InferSchema.scala:103)
16/08/29 21:45:48 INFO scheduler.DAGScheduler: Parents of final stage: List()
16/08/29 21:45:48 INFO scheduler.DAGScheduler: Missing parents: List()
16/08/29 21:45:48 INFO scheduler.DAGScheduler: Submitting ResultStage 13 (MapPartitionsRDD[77] at treeAggregate at InferSchema.scala:103), which has no missing parents
16/08/29 21:45:48 INFO storage.MemoryStore: Block broadcast_32 stored as values in memory (estimated size 4.1 KB, free 230.1 KB)
16/08/29 21:45:48 INFO storage.MemoryStore: Block broadcast_32_piece0 stored as bytes in memory (estimated size 2.3 KB, free 232.4 KB)
16/08/29 21:45:48 INFO storage.BlockManagerInfo: Added broadcast_32_piece0 in memory on 172.31.30.114:60795 (size: 2.3 KB, free: 530.3 MB)
16/08/29 21:45:48 INFO spark.SparkContext: Created broadcast 32 from broadcast at DAGScheduler.scala:1006
16/08/29 21:45:48 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 13 (MapPartitionsRDD[77] at treeAggregate at InferSchema.scala:103)
16/08/29 21:45:48 INFO cluster.YarnScheduler: Adding task set 13.0 with 1 tasks
16/08/29 21:45:49 INFO spark.ExecutorAllocationManager: Requesting 1 new executor because tasks are backlogged (new desired total will be 1)
16/08/29 21:45:51 INFO cluster.YarnClientSchedulerBackend: Registered executor NettyRpcEndpointRef(null) (ip-172-31-30-113.ec2.internal:56600) with ID 12
16/08/29 21:45:51 INFO spark.ExecutorAllocationManager: New executor 12 has registered (new total is 1)
16/08/29 21:45:51 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 13.0 (TID 28, ip-172-31-30-113.ec2.internal, partition 0,NODE_LOCAL, 2293 bytes)
16/08/29 21:45:51 INFO storage.BlockManagerMasterEndpoint: Registering block manager ip-172-31-30-113.ec2.internal:46617 with 530.3 MB RAM, BlockManagerId(12, ip-172-31-30-113.ec2.internal, 46617)
16/08/29 21:45:52 INFO storage.BlockManagerInfo: Added broadcast_32_piece0 in memory on ip-172-31-30-113.ec2.internal:46617 (size: 2.3 KB, free: 530.3 MB)
16/08/29 21:45:52 INFO storage.BlockManagerInfo: Added broadcast_31_piece0 in memory on ip-172-31-30-113.ec2.internal:46617 (size: 23.5 KB, free: 530.3 MB)
16/08/29 21:45:53 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 13.0 (TID 28, ip-172-31-30-113.ec2.internal): scala.MatchError: <!-- 1 Beefy Fritos Bur --> (of class com.sun.xml.internal.stream.events.CommentEvent)
        at com.databricks.spark.xml.util.InferSchema$.inferField(InferSchema.scala:134)
        at com.databricks.spark.xml.util.InferSchema$.com$databricks$spark$xml$util$InferSchema$$inferObject(InferSchema.scala:171)
        at com.databricks.spark.xml.util.InferSchema$.inferField(InferSchema.scala:135)
        at com.databricks.spark.xml.util.InferSchema$.com$databricks$spark$xml$util$InferSchema$$inferObject(InferSchema.scala:171)
        at com.databricks.spark.xml.util.InferSchema$$anonfun$3$$anonfun$apply$2.apply(InferSchema.scala:94)
        at com.databricks.spark.xml.util.InferSchema$$anonfun$3$$anonfun$apply$2.apply(InferSchema.scala:83)
        at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:144)
        at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1157)
        at scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:201)
        at scala.collection.AbstractIterator.aggregate(Iterator.scala:1157)
        at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$23.apply(RDD.scala:1135)
        at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$23.apply(RDD.scala:1135)
        at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$24.apply(RDD.scala:1136)
        at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$24.apply(RDD.scala:1136)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

16/08/29 21:45:53 INFO scheduler.TaskSetManager: Starting task 0.1 in stage 13.0 (TID 29, ip-172-31-30-113.ec2.internal, partition 0,NODE_LOCAL, 2293 bytes)
16/08/29 21:45:53 INFO scheduler.TaskSetManager: Lost task 0.1 in stage 13.0 (TID 29) on executor ip-172-31-30-113.ec2.internal: scala.MatchError (<!-- 1 Beefy Fritos Bur --> (of class com.sun.xml.internal.stream.events.CommentEvent)) [duplicate 1]
16/08/29 21:45:53 INFO scheduler.TaskSetManager: Starting task 0.2 in stage 13.0 (TID 30, ip-172-31-30-113.ec2.internal, partition 0,NODE_LOCAL, 2293 bytes)
16/08/29 21:45:53 INFO scheduler.TaskSetManager: Lost task 0.2 in stage 13.0 (TID 30) on executor ip-172-31-30-113.ec2.internal: scala.MatchError (<!-- 1 Beefy Fritos Bur --> (of class com.sun.xml.internal.stream.events.CommentEvent)) [duplicate 2]
16/08/29 21:45:53 INFO scheduler.TaskSetManager: Starting task 0.3 in stage 13.0 (TID 31, ip-172-31-30-113.ec2.internal, partition 0,NODE_LOCAL, 2293 bytes)
16/08/29 21:45:53 INFO scheduler.TaskSetManager: Lost task 0.3 in stage 13.0 (TID 31) on executor ip-172-31-30-113.ec2.internal: scala.MatchError (<!-- 1 Beefy Fritos Bur --> (of class com.sun.xml.internal.stream.events.CommentEvent)) [duplicate 3]
16/08/29 21:45:53 ERROR scheduler.TaskSetManager: Task 0 in stage 13.0 failed 4 times; aborting job
16/08/29 21:45:53 INFO cluster.YarnScheduler: Removed TaskSet 13.0, whose tasks have all completed, from pool
16/08/29 21:45:53 INFO cluster.YarnScheduler: Cancelling stage 13
16/08/29 21:45:53 INFO scheduler.DAGScheduler: ResultStage 13 (treeAggregate at InferSchema.scala:103) failed in 5.268 s
16/08/29 21:45:53 INFO scheduler.DAGScheduler: Job 13 failed: treeAggregate at InferSchema.scala:103, took 5.286607 s
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/opt/cloudera/parcels/CDH-5.8.0-1.cdh5.8.0.p0.42/lib/spark/python/pyspark/sql/readwriter.py", line 137, in load
    return self._df(self._jreader.load(path))
  File "/opt/cloudera/parcels/CDH-5.8.0-1.cdh5.8.0.p0.42/lib/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in __call__
  File "/opt/cloudera/parcels/CDH-5.8.0-1.cdh5.8.0.p0.42/lib/spark/python/pyspark/sql/utils.py", line 45, in deco
    return f(*a, **kw)
  File "/opt/cloudera/parcels/CDH-5.8.0-1.cdh5.8.0.p0.42/lib/spark/python/lib/py4j-0.9-src.zip/py4j/protocol.py", line 308, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o622.load.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 13.0 failed 4 times, most recent failure: Lost task 0.3 in stage 13.0 (TID 31, ip-172-31-30-113.ec2.internal): scala.MatchError: <!-- 1 Beefy Fritos Bur --> (of class com.sun.xml.internal.stream.events.CommentEvent)
        at com.databricks.spark.xml.util.InferSchema$.inferField(InferSchema.scala:134)
        at com.databricks.spark.xml.util.InferSchema$.com$databricks$spark$xml$util$InferSchema$$inferObject(InferSchema.scala:171)
        at com.databricks.spark.xml.util.InferSchema$.inferField(InferSchema.scala:135)
        at com.databricks.spark.xml.util.InferSchema$.com$databricks$spark$xml$util$InferSchema$$inferObject(InferSchema.scala:171)
        at com.databricks.spark.xml.util.InferSchema$$anonfun$3$$anonfun$apply$2.apply(InferSchema.scala:94)
        at com.databricks.spark.xml.util.InferSchema$$anonfun$3$$anonfun$apply$2.apply(InferSchema.scala:83)
        at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:144)
        at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1157)
        at scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:201)
        at scala.collection.AbstractIterator.aggregate(Iterator.scala:1157)
        at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$23.apply(RDD.scala:1135)
        at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$23.apply(RDD.scala:1135)
        at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$24.apply(RDD.scala:1136)
        at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$24.apply(RDD.scala:1136)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
        at scala.Option.foreach(Option.scala:236)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1843)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1963)
        at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1025)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
        at org.apache.spark.rdd.RDD.reduce(RDD.scala:1007)
        at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1.apply(RDD.scala:1150)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
        at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1127)
        at com.databricks.spark.xml.util.InferSchema$.infer(InferSchema.scala:103)
        at com.databricks.spark.xml.XmlRelation$$anonfun$1.apply(XmlRelation.scala:46)
        at com.databricks.spark.xml.XmlRelation$$anonfun$1.apply(XmlRelation.scala:46)
        at scala.Option.getOrElse(Option.scala:120)
        at com.databricks.spark.xml.XmlRelation.<init>(XmlRelation.scala:45)
        at com.databricks.spark.xml.DefaultSource.createRelation(DefaultSource.scala:66)
        at com.databricks.spark.xml.DefaultSource.createRelation(DefaultSource.scala:44)
        at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:158)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:119)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:109)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
        at py4j.Gateway.invoke(Gateway.java:259)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:209)
        at java.lang.Thread.run(Thread.java:745)

Any idea why this is happening and how we can solve this? Thanks

HyukjinKwon commented 8 years ago

Thanks for detailed explanation. I will take a look and be back.

HyukjinKwon commented 8 years ago

Oh, it was my bad. I forgot to add a default case. I submitted a PR here https://github.com/databricks/spark-xml/pull/166.

HyukjinKwon commented 8 years ago

Fixed in #166 Thanks.

metador commented 8 years ago

Thanks