archivesunleashed / aut

The Archives Unleashed Toolkit is an open-source toolkit for analyzing web archives.
https://aut.docs.archivesunleashed.org/
Apache License 2.0
137 stars 33 forks source link

ARC reader string vs int error on record length #492

Closed ruebot closed 2 years ago

ruebot commented 4 years ago

Describe the bug

Job crashes on java.lang.NumberFormatException

To Reproduce

Any of the three auk derivatives commands on these three ARCs:

Expected behavior

We should handle this exception better. Catch it an move on, or something better. The entire process shouldn't fail because of it.

Environment information

java.lang.NumberFormatException: For input string: "78
"
        at java.base/java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
        at java.base/java.lang.Long.parseLong(Long.java:692)
        at java.base/java.lang.Long.parseLong(Long.java:817)
        at org.archive.io.arc.ARCRecordMetaData.getLength(ARCRecordMetaData.java:129)
        at org.archive.io.ArchiveRecord.available(ArchiveRecord.java:229)
        at org.archive.io.ArchiveRecord.skip(ArchiveRecord.java:246)
        at org.archive.io.ArchiveRecord.close(ArchiveRecord.java:172)
        at org.archive.io.ArchiveReader.cleanupCurrentRecord(ArchiveReader.java:175)
        at org.archive.io.ArchiveReader$ArchiveRecordIterator.hasNext(ArchiveReader.java:449)
        at io.archivesunleashed.data.ArchiveRecordInputFormat$ArchiveRecordReader.nextKeyValue(ArchiveRecordInputFormat.java:187)
        at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:247)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:511)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:511)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithKeys_0$(Unknown Source)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
        at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:132)
        at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
        at org.apache.spark.scheduler.Task.run(Task.scala:127)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)
ruebot commented 3 years ago

Looks like this can effect older versions of aut as well, so it's not necessarily a Java 11 and Spark 3+ issue with the RecordLoader.

ruebot commented 2 years ago

Tested on the issue-494 branch with Java 11 and Spark 3.0.3 as part of #533 testing (with only 8G of RAM!!):

$ export JAVA_HOME=/usr/lib/jvm/java-1.11.0-openjdk-amd64 ./spark-shell --master local[8] --driver-memory 8g --jars /home/ruestn/aut/target/aut-0.91.1-SNAPSHOT-fatjar.jar
import io.archivesunleashed._
import io.archivesunleashed.udfs._
import io.archivesunleashed.app._

sc.setLogLevel("INFO")

// Web archive collection; web pages.
val webpages = RecordLoader.loadArchives("/tuna1/scratch/nruest/auk_collection_testing/11989/warcs/*", sc)
  .webpages()

// Web archive collection; web graph.
val webgraph = RecordLoader.loadArchives("/tuna1/scratch/nruest/auk_collection_testing/11989/warcs/*", sc)
  .webgraph()

// Domains file.
webpages.groupBy(removePrefixWWW(extractDomain($"Url")).alias("url"))
  .count()
  .sort($"count".desc)
  .write.csv("/tuna1/scratch/nruest/auk_collection_testing/11989/all-domains")

// Full-text.
webpages.select($"crawl_date", removePrefixWWW(extractDomain(($"url")).alias("domain")), $"url", $"content")
  .write.csv("/tuna1/scratch/nruest/auk_collection_testing/11989k/full-text")

// GraphML
val graph = webgraph.groupBy(
                       $"crawl_date",
                       removePrefixWWW(extractDomain($"src")).as("src_domain"),
                       removePrefixWWW(extractDomain($"dest")).as("dest_domain"))
              .count()
              .filter(!($"dest_domain"===""))
              .filter(!($"src_domain"===""))
              .filter($"count" > 5)
              .orderBy(desc("count"))

WriteGraphML(graph.collect(), "/tuna1/scratch/nruest/auk_collection_testing/11989/example.graphml")

Everything completed successfully.