archivesunleashed / aut

The Archives Unleashed Toolkit is an open-source toolkit for analyzing web archives.
https://aut.docs.archivesunleashed.org/
Apache License 2.0
137 stars 33 forks source link

Heap space!! java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf(Arrays.java:3332) #317

Open ruebot opened 5 years ago

ruebot commented 5 years ago

Describe the bug

On a number FDLP and Stanford collections, we run into this space heap space error, and it kills the Spark job. Upon investigation, this does not seem to be a problem with a large ARC/WARC or large collection. This happens on small and large collections.

19/04/18 18:02:34 DEBUG BlockManager: Told master about block rdd_3_3
19/04/18 18:02:34 INFO ArchiveRecordInputFormat: Closed archive file file:/tuna1/scratch/nruest/auk_collection_testing/10689/warcs/ARCHIVEIT-10689-TEST-JOB677126-20180828211602861-00009.warc.gz
19/04/18 18:02:34 ERROR Executor: Exception in task 3.0 in stage 0.0 (TID 3)
java.lang.OutOfMemoryError: Java heap space
        at java.util.Arrays.copyOf(Arrays.java:3332)
        at java.lang.StringCoding.safeTrim(StringCoding.java:89)
        at java.lang.StringCoding.access$100(StringCoding.java:50)
        at java.lang.StringCoding$StringDecoder.decode(StringCoding.java:154)
        at java.lang.StringCoding.decode(StringCoding.java:193)
        at java.lang.StringCoding.decode(StringCoding.java:254)
        at java.lang.String.<init>(String.java:546)
        at java.lang.String.<init>(String.java:566)
        at io.archivesunleashed.ArchiveRecordImpl.<init>(ArchiveRecord.scala:117)
        at io.archivesunleashed.package$RecordLoader$$anonfun$loadArchives$2.apply(package.scala:69)
        at io.archivesunleashed.package$RecordLoader$$anonfun$loadArchives$2.apply(package.scala:69)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
        at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:463)
        at org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:139)
        at org.apache.spark.serializer.SerializerManager.dataSerializeStream(SerializerManager.scala:174)
        at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1$$anonfun$apply$10.apply(BlockManager.scala:1203)
        at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1$$anonfun$apply$10.apply(BlockManager.scala:1201)
        at org.apache.spark.storage.DiskStore.put(DiskStore.scala:69)
        at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1201)
        at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
        at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
        at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
        at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
        at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
19/04/18 18:02:34 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_0.0, runningTasks: 25
19/04/18 18:02:34 INFO SparkContext: Invoking stop() from shutdown hook
19/04/18 18:02:34 WARN TaskSetManager: Lost task 3.0 in stage 0.0 (TID 3, localhost, executor driver): java.lang.OutOfMemoryError: Java heap space
        at java.util.Arrays.copyOf(Arrays.java:3332)
        at java.lang.StringCoding.safeTrim(StringCoding.java:89)
        at java.lang.StringCoding.access$100(StringCoding.java:50)
        at java.lang.StringCoding$StringDecoder.decode(StringCoding.java:154)
        at java.lang.StringCoding.decode(StringCoding.java:193)
        at java.lang.StringCoding.decode(StringCoding.java:254)
        at java.lang.String.<init>(String.java:546)
        at java.lang.String.<init>(String.java:566)
        at io.archivesunleashed.ArchiveRecordImpl.<init>(ArchiveRecord.scala:117)
        at io.archivesunleashed.package$RecordLoader$$anonfun$loadArchives$2.apply(package.scala:69)
        at io.archivesunleashed.package$RecordLoader$$anonfun$loadArchives$2.apply(package.scala:69)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
        at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:463)
        at org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:139)
        at org.apache.spark.serializer.SerializerManager.dataSerializeStream(SerializerManager.scala:174)
        at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1$$anonfun$apply$10.apply(BlockManager.scala:1203)
        at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1$$anonfun$apply$10.apply(BlockManager.scala:1201)
        at org.apache.spark.storage.DiskStore.put(DiskStore.scala:69)
        at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1201)
        at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
        at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
        at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
        at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
        at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)

To Reproduce Steps to reproduce the behavior on tuna:

  1. Using /tuna1/scratch/nruest/auk_collection_testing/10689
  2. /home/ruestn/spark-2.4.1-bin-hadoop2.7/bin/spark-shell --master local[30] --driver-memory 105g --conf spark.network.timeout=10000000 --conf spark.executor.heartbeatInterval=600s --conf spark.driver.maxResultSize=4g --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.shuffle.compress=true --conf spark.rdd.compress=true -Djava.io.tmpdir=/tuna1/scratch/nruest/tmp --jars /home/ruestn/aut/target/aut-0.17.1-SNAPSHOT-fatjar.jar -i /tuna1/scratch/nruest/auk_collection_testing/10689/133/spark_jobs/10689.scala 2>&1 | tee /tuna1/scratch/nruest/auk_collection_testing/10689/133/spark_jobs/10689.scala-test.log
  3. Should pop the error around 31 in stage 1.

Expected behavior Shouldn't hit the heap error.

Environment information

Additional context

ruebot commented 5 years ago

@jrwiebe @lintool @ianmilligan1 finally wrote it up. Let me know if I missed anything, or y'all want me to provide more info.

jrwiebe commented 5 years ago

@ruebot, you say:

Upon investigation, this does not seem to be a problem with a large ARC/WARC or large collection.

Can you clarify this? Are you saying that jobs run with the same parameters against other large collections – in terms of total archive size – do not result in heap space errors? Because in my testing I didn't find this to be the case. It seemed like the error was rather a function of the total size of all the WARCs being operated on simultaneously. I think I was seeing jobs bail when they didn't have about 4 times the amount of memory as [# tasks] * [average WARC size].

I found this garbage collection log analyzer useful for trying to understand how memory was being used, though I'm still puzzled that so much memory is being used. See for example how much memory is used by a job that simply chains RecordLoader.loadArchives(...).count(). Some useful background on Java garbage collection here.

To generate garbage collection logs you want to do something like this (the spark.driver.extraJavaOptions conf directive is the key):

/home/jrwiebe/spark-2.4.1-bin-hadoop2.7/bin/spark-shell --conf spark.driver.extraJavaOptions="-XX:+PrintGCDetails -XX:+PrintGCTimeStamps -Xloggc:3260-16-16g-35bigwarcs.log -XX:+PrintHeapAtGC -XX:+PrintTenuringDistribution -XX:+UnlockExperimentalVMOptions -XX:G1LogLevel=finest -XX:+UnlockDiagnosticVMOptions -XX:+G1PrintRegionLivenessInfo" --master local[16] --driver-memory 16g --conf spark.network.timeout=10000000 --conf spark.executor.heartbeatInterval=600s --conf spark.driver.maxResultSize=4g --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.shuffle.compress=true --conf spark.rdd.compress=true --packages "io.archivesunleashed:aut:0.17.0" -i 3260.scala

For the record, I'm pretty certain KryoSerializer doesn't offer any benefits for this problem; I just kept using it because I copied @ruebot's command line. I ran tests with and without some of the other tuning options as well, and was seeing similar results.

ruebot commented 5 years ago

Sorry, yeah, that's kinda vague. I meant overall, we've analyzed 170T of collections ranging from under a 1G of WARCs up to 12T with the same basic Spark settings.

I added the KryoSerializer during troubleshooting because it looked like it got further along in some cases, and reading up on it, it made sense at the time.

Thanks for putting in the GC info. I was hoping to tease it out by creating the issue :smile:

ruebot commented 5 years ago

...and a single WARC as large as 80G.

ruebot commented 5 years ago

Reading the garbage collection post reminded me that Spark allocates (not sure if that is the right word), ~55-60% of "storage" memory for executors. A smarter version of me would have bookmarked that Stackoverflow or Spark mailing list thread. But, I can provide a screenshot of a the Spark UI of a currently running job.

Screenshot_2019-04-24 Spark shell - Executors

That's a job running with 30 threads, and 105G of RAM allocated:--master local[30] --driver-memory 105g

ruebot commented 5 years ago

I'm going to start adding collection info on failed collections here:

id size spark version aut version
593/12190 141G 2.4.3 0.17.0
769/4200 9.9T 2.4.4 0.18.0
713/5484 51G 2.4.4 0.18.0
1133/8271 225G 2.4.4 0.18.0
536/6355 282G 2.4.5 0.50.0
lintool commented 4 years ago

As a sanity check, I went to the data directory and did this:

$ gunzip -c *.gz | grep -a "Content-Length" | cut -d ' ' -f 2 | sort -n | less
2878090679
2878090400
2878090400
2247649420
2247649420
2247649180
2247649180

2878090679 = 2,878,090,679 ~ 2.9 GB. A bit silly to have records that big... dunno if this is the source of your errors though.

ianmilligan1 commented 4 years ago

So an individual record that’s almost 3GB? I wonder if we could implement a default that skips records greater than say a GB? Or lower?

I can imagine users who might want to work with big files - video etc - but for the most part, and with our generic processing pipeline, we could do without.

ruebot commented 4 years ago

I'm not certain that record size is the issue. I've collected some data from a number of collections that fail, and succeed. Large record size doesn't appear to be the sole factor in failure or success. We have some that are well above what @lintool listed having been a part of successful jobs. I'll take some time over the coming days, or weeks :sweat:, to try and isolate some of the WARCs that are actually causing the problems, then we can proceed digging in further from there if that makes sense.

PQ: Fails on extractVideoDetailsDF()

  1. 263,051,370
  2. 263,051,370
  3. 263,051,370
  4. 263,051,370
  5. 146,567,997

InternationalesQc: Success

  1. 585,726,559
  2. 148,428,760
  3. 148,428,512
  4. 135,553,646
  5. 135,553,230

10689: Fails on AUK

  1. 2,878,090,679
  2. 2,878,090,400
  3. 2,878,090,400
  4. 2,247,649,420
  5. 2,247,649,420

7235: Fails on AUK

  1. 65,608,807,060
  2. 65,608,807,060
  3. 65,608,807,060
  4. 65,608,807,060
  5. 65,608,807,060

AgricultureQc: Success

  1. 526,026,425
  2. 526,026,050
  3. 341,618,524
  4. 341,618,149
  5. 238,542,670

EnvironnementQc: Success

  1. 7,061,547,393
  2. 7,061,547,393
  3. 3,741,979,582
  4. 3,733,722,254
  5. 3,733,722,254

12190: Fails on AUK

  1. 3,273,574,468
  2. 3,273,574,204
  3. 2,580,347,094
  4. 2,580,346,830
  5. 2,185,762,748

TourismeQc-2006-2017: Success

  1. 50,182,078
  2. 26,795,182
  3. 24,213,112
  4. 23,125,967
  5. 22,184,611
ruebot commented 4 years ago

Hit a very similar error a couple times over the last few days on two different collections with Java 11 and Spark 3.0.0. I had initially thought it was a freak new error because of the new setup, but I believe this is the same error that we've all been running into for a few years now, and is definitely a blocker for 1.0.0.

20/06/23 10:00:58 ERROR Utils: Aborting task
java.lang.NegativeArraySizeException: -1976675752
        at java.base/java.lang.StringCoding.decodeUTF8_0(StringCoding.java:755)
        at java.base/java.lang.StringCoding.decodeUTF8(StringCoding.java:712)
        at java.base/java.lang.StringCoding.decode(StringCoding.java:318)
        at java.base/java.lang.String.<init>(String.java:592)
        at java.base/java.lang.String.<init>(String.java:614)
        at io.archivesunleashed.ArchiveRecordImpl.<init>(ArchiveRecord.scala:119)
        at io.archivesunleashed.package$RecordLoader$.$anonfun$loadArchives$2(package.scala:79)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:512)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:272)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:281)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:205)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:127)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)

The common thread following the stack trace in all the errors in this issue are io.archivesunleashed.ArchiveRecordImpl.<init>(ArchiveRecord.scala.

@lintool had previously patched the issue with https://github.com/lintool/warcbase/commit/16db93460896da80dbc4c07f27b0294bb618e291, which I believe was meant to just skip over broken records, but didn't address the underlying issue that @anjackson brought up here; that we're hitting the 2G array limit size in Java.

I think the best path forward then is to decide if we want to truncate payloads like Andy suggested (I'm not 100% sure we should be do that), or update the *RecordUtils to have a streaming interface like webarchive-discovery does (hopefully that's the right spot I'm pointing to :sweat_smile:).

ruebot commented 4 years ago

One important point I left out that should reinforce the conclusion of the above comment, is that I'm fairly certain that the issue isn't WARC/ARC file specific.

One example to share is from 3940 (Alberta Oil Sands). It's about a 25G collection. If I run it on all the files (25 files), it will fail with the error listed in the comment immediately above. If I pull out the WARC that causes the analysis to fail, the job with the 24 remaining WARCs completes successfully. That would lead one on to believe it's just a bad WARC, so toss it out! But, it is not. If I run the same job on just that WARC alone, it completes successfully. Which, I believe reinforces the above note about hitting the 2G array limit in Java since we're juggling all the records in memory at a given time.

ruebot commented 4 years ago

@lintool's comment above hints at the problem as well; all those records above 2G.

jrwiebe commented 4 years ago

There's a cruel irony in the fact that mere hours before @ruebot's comment I deleted all my old memory profiles from tests I ran trying to understand this issue a year ago. Not that I think they would be particularly helpful – I never managed to figure out what were the limits of memory allocation, file size, workers, etc., that would produce the error.

ruebot commented 4 years ago

139/11989 is yet another.

adamyy commented 4 years ago

Environment

AUT

ArchiveSpark

Experiment

I was able to reproduce this issue with a much smaller set of warc files, I selected a subset of ARCHIVEIT-10689 (TEST-JOB727752) which is about 7.23G and I was simply doing RecordLoader.loadArchives(...).count()

image

I was doing it in spark-shell with the following flags:

--driver-memory 32g \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
--conf spark.shuffle.compress=true \
--conf spark.rdd.compress=true

At first I thought this is the same issue caused by manipulating too many records in the memory and somehow hitting the 2G array limit as mentioned by @ruebot but when I test that single WARC file (which is ~1G in size), the error persisted!

RecordLoader.loadArchives("ARCHIVEIT-10689-TEST-JOB727752-SEED1799561-20190110145814212-00001-h3.warc.gz", sc).count()

image

Perhaps this is a corrupted WARC? However, I also tried doing a similar operation using ArchiveSpark, which completed without errors.

scala> ArchiveSpark.load(WarcSpec.fromFiles("ARCHIVEIT-10689-TEST-JOB727752-SEED1799564-20190110143759592-00000-h3.warc.gz")).count()
res1: Long = 27040

(I have observed that ArchiveSpark and aut returns different count, which might have been caused by the filtering done by aut. If I set includeRevisit to false on WarcSpec#fromFiles then the two returns the same count for the WARCs I have tested.)

Observation

So I dig further into ARCHIVEIT-10689-TEST-JOB727752-SEED1799564-20190110143759592-00000-h3.warc.gz trying to figure out what happened, turned out one of the WARCs has a content length of 1,175,370,878, this number by itself is a perfectly valid Int, so aut didn't encounter any issue massaging the content into a byte array. However, java String internally allocates an byte array doubled in size for storing the characters, causing this number to overflow:

scala> 1175370878 << 1
res13: Int = -1944225540
lintool commented 4 years ago

@adamyy thanks for the update.

What about a simple fix like adding a content length filter and skipping records above a certain threshold? We can define the threshold in a static, set it to something reasonable that the user can adjust if need be.

adamyy commented 4 years ago

@lintool I think that should mute the issue but going forward we might want better payload handling per described https://github.com/archivesunleashed/aut/issues/494

ArchiveSpark's sparkling lib dodges this issue by having the payload (in our case, getContentString) represented as an InputStream rather than a String, so it is still somehow available in case the client do want to read it.

lintool commented 4 years ago

@adamyy Why don't we do that to mute the issue for now, so that it's not blocking the processing of existing collections? Send PR?

@ruebot Thoughts?

We can then close this issue and go back to #494 for a more principled solution.

adamyy commented 4 years ago

@lintool Would love to! Working on one now.

Looking back at the original issue though, it appears to me that https://github.com/archivesunleashed/aut/issues/317#issuecomment-648574293 and the original issue https://github.com/archivesunleashed/aut/issues/317 could be two distinct but related problem. I think original OOM is caused by the fact that because we use strings and not streams, we are forcing the content of some absurdly large WARCs to be kept in the memory. Though @ruebot also mentioned that some collections with large WARCs such as EnvironnementQc do succeed, so I might need to do further verification on the assumption, but that could be hard for large collections since my personal machine only has 32g of physical memory.

ruebot commented 4 years ago

@adamyy Sorry for the delay, I am on family leave, and @lintool should have at the very least tipped you off to this if he put you on to working on aut. I really won't be back around until after September 17th to really review or test anything in much detail.


@lintool

What about a simple fix like adding a content length filter and skipping records above a certain threshold? We can define the threshold in a static, set it to something reasonable that the user can adjust if need be.

This is not a solution. This is just hiding the actual problem, and throwing out information that a researcher, or archivist could have a need for.

So, @adamyy I would close #496. Additionally, for future reference, you'll need to add tests for a PR to be accepted.


@adamyy

Looking back at the original issue though, it appears to me that #317 (comment) and the original issue #317 could be two distinct but related problem.

I'd argue they are the same thing, but we don't have to split hairs.


@adamyy

I think original OOM is caused by the fact that because we use strings and not streams, we are forcing the content of some absurdly large WARCs to be kept in the memory.

Agreed in principle. The actual size of the ARCs or WARCs files is a red herring IMO. What matters is the record size, as you noted in my https://github.com/archivesunleashed/aut/issues/317#issuecomment-648574293 comment.

As of my last deep dive in this issue (late June), my inclination was that copyToByteArray is the problem. I was tempted to write our own ByteArray implementation, and @helgeho mentioned to me that he has done the same thing in sparkling. But, I don't think our own ByteArray implementation on it's own would be a solution, because as you noted we are reading in to strings, and we're just be kicking the can down the road a little bit further before this pops again. The ByteArray implementation, and/or sparkling integration (#494) hasn't happened because I've been away, and we're waiting for the larger Archive-It + Archives Unleashed collaboration to commence in the fall when I'm back.

That said, if you'd like to take a crack at replacing the string implementation with a streaming implementation, that would be much appreciated. In a perfect world, we should be able to process large collections with less than the amount of RAM you have available on your laptop. The caveat here being, I won't really be around review and test anything for a few weeks.


As for the count difference between aut and sparkling, that makes sense since aut -- unfortunately -- only looks at response WARC record types.

ruebot commented 2 years ago

Similar to https://github.com/archivesunleashed/aut/issues/492#issuecomment-1132280373, I tested the initial collection causing issues (10689) on tuna with just 8GB, and ran the same analysis without incident. I'll do some more exhaustive testing of problematic collections we had over the next few days, but I think we might have resolved this issue with #533.