lintool / warcbase

Warcbase is an open-source platform for managing analyzing web archives
http://warcbase.org/
161 stars 47 forks source link

Issues with serialization on persistance #227

Open bzz opened 8 years ago

bzz commented 8 years ago

Warcbase is awesome and I'm trying to use latest version on local Spark instance it to peek into .warc files.

AFAIK that should work after #160 but I'm having few issues, especially in case if results are saved using Spark RDD persistence facilities like persist() using serialization, on both disk and memory.

Do you guys have any troubles using persistence\serialization with Spark 1.6 or is that something relevant only to my local environment? Please advise.

  1. no kryo, no cache

    After #186 I was under impression that Kryo it's no longer mandatory, but

    val r = RecordLoader.loadArchives(localPath, sc)
    r.takeI(1) foreach println

    for me results in

    org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.archive.io.warc.WARCRecord
  2. kryo, no cache

    If I turn Kryo on, same code results in

    org.apache.spark.SparkException: Job aborted due to stage failure: Exception while getting task result: com.esotericsoftware.kryo.KryoException: Error during Java deserialization.
    Serialization trace:
    ISO8601 (org.warcbase.spark.archive.io.GenericArchiveRecord)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)

    This is quite easy to fix - it works for me, if non-serializable SimpleDataFormaters are removed. Please let me know if you are interested in PR fixing it.

  3. kryo, cache

    Adding r.persist(StorageLevel.DISK_ONLY) to the code above results something cryptic, but in a way similar to 1

    org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): com.esotericsoftware.kryo.KryoException: java.util.ConcurrentModificationException
    Serialization trace:
    classes (sun.misc.Launcher$AppClassLoader)
    classloader (java.security.ProtectionDomain)
    context (java.security.AccessControlContext)
    acc (org.apache.spark.sql.hive.client.NonClosableMutableURLClassLoader)
    classLoader (org.apache.hadoop.hive.conf.HiveConf)
    conf (org.apache.hadoop.fs.LocalFileSystem)
    fs (org.apache.hadoop.fs.ChecksumFileSystem$FSDataBoundedInputStream)
    in (java.io.BufferedInputStream)
    in (com.google.common.io.CountingInputStream)
    in (org.archive.io.warc.WARCRecord)
    warcRecord (org.warcbase.spark.archive.io.GenericArchiveRecord)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)

    It looks the similar as dbpedia/distributed-extraction-framework#9 and presumably has something to do with registering serializable classes...

jrwiebe commented 8 years ago

Thanks for this. I don't believe any of us active contributors have been very focused on performance tuning, and thus we haven't really spent time ensuring capabilities like persistence are working properly.

Re (2), you're welcome to submit a pull request that replaces the SimpleDateFormat calls with a serializable equivalent. Your other two use scenarios will require some more examination.

I probably won't have time to look into your other use scenarios for a while. I'm tagging @aliceranzhou here, since she worked on this before (realizing she's probably busy with other things these days). Also @yb1.

lintool commented 8 years ago

Thanks for your positive feedback. I think it'll work if you do this:

val r = RecordLoader.loadArchives("src/test/resources/arc/example.arc.gz", sc)
  .keepValidPages()
  .map(r => ExtractDomain(r.getUrl))
  .take(1)

I.e., extract the fields that you want out of the WARC records... Will that do it for you?