lintool / warcbase

Warcbase is an open-source platform for managing analyzing web archives
161 stars 47 forks source link

Add UDF for computing MD5 checksum #211

Open lintool opened 8 years ago

lintool commented 8 years ago

Inspired by the recent UDF on image processing in Warcbase. If we had a UDF for computing the MD5 checksum of arbitrary data, we could apply to all images and find, for example, the most popular gifs in GeoCities.

youngbink commented 8 years ago


UDF for computing the MD5 checksum => ComputeChecksum.get(url: String, timeoutVal: Int = 5000, removeIconImage: Boolean = false, minWidth: Int = 30, minHeight: Int = 30)


Most popular images => ExtractPopularImages(records: RDD[ArchiveRecord], limit: Int, timeoutVal: Int = 5000)

import org.warcbase.spark.matchbox._
import org.warcbase.spark.rdd.RecordRDD._
import org.warcbase.spark.matchbox.RecordLoader
val recs=RecordLoader.loadArchives("/collections/webarchives/geocities/warcs/",sc).keepUrlPatterns(Set("*".r)).persist()
 ExtractPopularImages(recs, 1500) // show 1500 most populr

Example result:

lintool commented 8 years ago

@yb1 Sorry for not being clear... your current implementation fetches the images from IA, which requires web access and thus is slow. We already have the images in our archives, so we should work from those.

I envision something like:

  .map( img => (img.getUrl, ComputeMD5(img.getRawBytes), ComputeSize(img.getRawBytes)) )

After that, we can filter by size as we wish, group by, etc.

ianmilligan1 commented 8 years ago

Just chiming in to quickly note that this is going to be great – I already have a use case in mind (comparing two collections), esp. once we can get the icons filtered out.

ianmilligan1 commented 8 years ago

As noted in e-mail, happy to test this, but can you send me a test script? Haven't been able to get it working but am probably having a brainfart here.

If so I will test on our CPP collection, both on cluster & rho server.

ianmilligan1 commented 8 years ago

This was implemented in ab72ae4.

Example usage:

import org.warcbase.spark.matchbox._
import org.warcbase.spark.rdd.RecordRDD._
import org.warcbase.spark.matchbox.RecordLoader

val r = RecordLoader.loadArc("/mnt/vol1/data_sets/cpp/cpp_arcs_accession_01/", sc).persist()

ExtractPopularImages(r, 100) // show 100 most popular

In this example, it took the ARC file, found the 100 most popular images (based on a MD5 checksum) and outputted them as an array.

Am going to run this on a larger collection, provide example script and output.

ianmilligan1 commented 8 years ago

Struggling with most efficient way to send the output to a text file rather than just shell output. @yb1 can you work on the following script so that it saves it out:

something like:

import org.warcbase.spark.matchbox._
import org.warcbase.spark.rdd.RecordRDD._
import org.warcbase.spark.matchbox.RecordLoader

val r = RecordLoader.loadArchives("/collections/webarchives/geocities/warcs/*.warc.gz",sc).persist()
r.ExtractPopularImages(r, 2000)
youngbink commented 8 years ago

For now, please use

import org.warcbase.spark.matchbox._
import org.warcbase.spark.rdd.RecordRDD._
import org.warcbase.spark.matchbox.RecordLoader

val r = RecordLoader.loadArchives("/collections/webarchives/geocities/warcs/*.warc.gz",sc).persist()
val arr = r.ExtractPopularImages(r, 2000)
sc.parallelize(>x._2._2 + "\t" +  x._2._3), 1).saveAsTextFile("2000-Popular-Images-Geocities13")


ianmilligan1 commented 8 years ago

Thanks @yb1 – is there a way to bake the sc.parallelize(>x._2._2 + "\t" + x._2._3), 1) command into the underlying UDF – that'll be a bit intimidating for our user base (even if they don't need to fully understand it).

And what would be the best way to do subsets, i.e. implementing .keepUrlPatterns(Set("(?i)*".r)) as well.

Right now, testing with minor tweak on your script above:

import org.warcbase.spark.matchbox._
import org.warcbase.spark.rdd.RecordRDD._
import org.warcbase.spark.matchbox.RecordLoader

val r = RecordLoader.loadArchives("/collections/webarchives/geocities/warcs/*.warc.gz",sc).persist()
val arr = ExtractPopularImages(r, 2000)
sc.parallelize(>x._2._2 + "\t" +  x._2._3), 1).saveAsTextFile("2000-Popular-Images-Geocities13")
youngbink commented 8 years ago

The output type is modified to rdd. (

New script will be:

import org.warcbase.spark.matchbox._
import org.warcbase.spark.rdd.RecordRDD._
import org.warcbase.spark.matchbox.RecordLoader

val r = RecordLoader.loadArchives("/collections/webarchives/geocities/warcs/*.warc.gz",sc).persist()
ExtractPopularImages(r, 2000, sc).saveAsTextFile("2000-Popular-Images-Geocities14")

And for subsets, I'll get back to you soon.


ianmilligan1 commented 8 years ago

Great, @yb1. Did you successfully get a run of this done on GeoCities? If so, what memory settings did you use? (just so we don't re-invent the wheel – I ran into memory crashes with spark-shell --jars /cliphomes/i2millig/warcbase/warcbase-core/target/warcbase-core-0.1.0-SNAPSHOT-fatjar.jar --num-executors 75 --executor-cores 5 --executor-memory 10G --driver-memory 10G).

ianmilligan1 commented 8 years ago

Works like a charm.

Ran with spark-shell --jars /cliphomes/i2millig/warcbase/warcbase-core/target/warcbase-core-0.1.0-SNAPSHOT-fatjar.jar --num-executors 50 --executor-cores 5 --executor-memory 20G --driver-memory 10G --conf spark.yarn.executor.memoryOverhead=4096 --conf spark.driver.maxResultSize=2048.


import org.warcbase.spark.matchbox._
import org.warcbase.spark.rdd.RecordRDD._
import org.warcbase.spark.matchbox.RecordLoader

val r = RecordLoader.loadArchives("/collections/webarchives/geocities/warcs/*.warc.gz",sc).persist()
ExtractPopularImages(r, 2000, sc).saveAsTextFile("2000-Popular-Images-Geocities14")

Since this works, will document and then close the issue.