archivesunleashed / aut

The Archives Unleashed Toolkit is an open-source toolkit for analyzing web archives.
https://aut.docs.archivesunleashed.org/
Apache License 2.0
137 stars 33 forks source link

DataFrame discussion: open thread #190

Closed lintool closed 4 years ago

lintool commented 6 years ago

I've been playing a bit with DataFrames. Open thread just to share what the "user experience" might look like. I've got things working in my own local repo, but I'm hiding a bit of the magic here.

Assume we've got a DataFrames created. Here's the schema:

scala> df.printSchema()
root
 |-- CrawlDate: string (nullable = true)
 |-- Url: string (nullable = true)

Shows the rows

scala> df.show()
+---------+--------------------+
|CrawlDate|                 Url|
+---------+--------------------+
| 20080430|http://www.archiv...|
| 20080430|http://www.archiv...|
| 20080430|http://www.archiv...|
| 20080430|http://www.archiv...|
| 20080430|http://www.archiv...|
| 20080430|http://www.archiv...|
| 20080430|http://www.archiv...|
| 20080430|http://www.archiv...|
| 20080430|http://www.archiv...|
| 20080430|http://www.archiv...|
| 20080430|http://www.archiv...|
| 20080430|http://www.archiv...|
| 20080430|http://www.archiv...|
| 20080430|http://www.archiv...|
| 20080430|http://www.archiv...|
| 20080430|http://www.archiv...|
| 20080430|http://www.archiv...|
| 20080430|http://www.archiv...|
| 20080430|http://www.archiv...|
| 20080430|http://www.archiv...|
+---------+--------------------+
only showing top 20 rows

We can extract domains from the urls, and the group by and count as follows:

scala> df.select(Domain($"url").as("Domain")).groupBy("Domain").count().show()
+------------------+-----+
|            Domain|count|
+------------------+-----+
|   www.archive.org|  132|
|www.hideout.com.br|    1|
|     deadlists.com|    2|
+------------------+-----+

Thoughts?

ruebot commented 6 years ago

I like it.

greebie commented 6 years ago

I think the main advantage is that it follows (roughly) a typical SQL select statement, which is something that most technical librarians would be comfortable with.

I wonder if parquet format would help us down the road? https://parquet.apache.org/

There are Ruby gems for the format -- might be useful if we get to PetaByte size archives a decade from now.

lintool commented 6 years ago

Or this:

scala> df.printSchema()
root
 |-- src: string (nullable = true)
 |-- dest: string (nullable = true)

scala> df.show()
+--------------------+--------------------+
|                 src|                dest|
+--------------------+--------------------+
|http://www.archiv...|http://www.archiv...|
|http://www.archiv...|http://www.archiv...|
|http://www.archiv...|http://web.archiv...|
|http://www.archiv...|http://www.sloan.org|
|http://www.archiv...|http://www.archiv...|
|http://www.archiv...|http://www.archiv...|
|http://www.archiv...|ftp://ia311502.us...|
|http://www.archiv...|http://ia311502.u...|
|http://www.archiv...|http://www.archiv...|
|http://www.archiv...|http://www.alexa.com|
|http://www.archiv...|http://download.a...|
|http://www.archiv...|http://www.alexa.com|
|http://www.archiv...|http://www.prelin...|
|http://www.archiv...|http://www.prelin...|
|http://www.archiv...|  http://www.nsf.gov|
|http://www.archiv...|  http://www.nsf.gov|
|http://www.archiv...|  http://www.loc.gov|
|http://www.archiv...|  http://www.loc.gov|
|http://www.archiv...|http://www.lizard...|
|http://www.archiv...|http://www.lizard...|
+--------------------+--------------------+
only showing top 20 rows

scala> df.select(Domain($"src").as("src"), Domain($"dest").as("dest")).groupBy("src", "dest").count().show()
+---------------+--------------------+-----+
|            src|                dest|count|
+---------------+--------------------+-----+
|www.archive.org|         www.cfp.org|    1|
|www.archive.org|         www.acm.org|    4|
|www.archive.org|     www.mozilla.org|    1|
|www.archive.org|internetarchive.w...|    2|
|www.archive.org|www.informedia.cs...|    1|
|www.archive.org|    www.smartftp.com|    1|
|  deadlists.com|   www.deadlists.com|    2|
|www.archive.org| www.intermemory.org|    1|
|www.archive.org|     www.nytimes.com|    1|
|www.archive.org|    www.mplayerhq.hu|    1|
|www.archive.org|      www.cygwin.com|    1|
|www.archive.org|     www.archive.org|  305|
|www.archive.org|      lcweb2.loc.gov|    1|
|www.archive.org|        www.oclc.org|    1|
|www.archive.org|    www.mikewren.com|    4|
|www.archive.org|  hotwired.lycos.com|    1|
|www.archive.org|         www.eff.org|    1|
|www.archive.org|         www.ipl.org|    1|
|www.archive.org|     www.privacy.org|    5|
|www.archive.org| www.illiminable.com|    2|
+---------------+--------------------+-----+
only showing top 20 rows

Or, in straight-up SQL if you prefer:

scala> spark.sql("SELECT Domain(src), Domain(dest), count(*) FROM pages GROUP BY Domain(src), Domain(dest)").show()
+---------------+--------------------+--------+
|UDF:Domain(src)|    UDF:Domain(dest)|count(1)|
+---------------+--------------------+--------+
|www.archive.org|         www.cfp.org|       1|
|www.archive.org|         www.acm.org|       4|
|www.archive.org|     www.mozilla.org|       1|
|www.archive.org|internetarchive.w...|       2|
|www.archive.org|www.informedia.cs...|       1|
|www.archive.org|    www.smartftp.com|       1|
|  deadlists.com|   www.deadlists.com|       2|
|www.archive.org| www.intermemory.org|       1|
|www.archive.org|     www.nytimes.com|       1|
|www.archive.org|    www.mplayerhq.hu|       1|
|www.archive.org|      www.cygwin.com|       1|
|www.archive.org|     www.archive.org|     305|
|www.archive.org|      lcweb2.loc.gov|       1|
|www.archive.org|        www.oclc.org|       1|
|www.archive.org|    www.mikewren.com|       4|
|www.archive.org|  hotwired.lycos.com|       1|
|www.archive.org|         www.eff.org|       1|
|www.archive.org|         www.ipl.org|       1|
|www.archive.org|     www.privacy.org|       5|
|www.archive.org| www.illiminable.com|       2|
+---------------+--------------------+--------+
only showing top 20 rows
greebie commented 6 years ago

Yeah -- coding dataframes is definitely more intuitive than mapping through rdds. Lambda's are oddly confusing unless you are used to them.

cjer commented 6 years ago

Major like on this. Would make it easier for R and Python-pandas minded people to get into Scala code with great ease

ianmilligan1 commented 6 years ago

This is great, @lintool! It would let people interactively explore rather than using our rote scripts.

I don't think we could expect historians to know SQL syntax – I don't know it, for example – but with documentation I think the df.select syntax seems the most out-of-the-box readable. I'd love to see this move forward, and the bonus is I assume it's just added functionality and doesn't disrupt any of our existing scripts?

greebie commented 6 years ago

I think that once we implement scala dataframes, the pressure will be on to get it into pyspark. In particular the .toPandas() method, which would be amazing for mash-ups of Web archives against other social science data. @ianmilligan1 : .toPandas() would mean I could get my correspondence analysis graphs made directly from a collection of warcs, rather than from saved derivatives, albeit with a time lag.

lintool commented 6 years ago

Aye, here's the rub, though... I can imagine three different ways of doing analyses:

  1. current way w/ RDDs + transformations
  2. what I call fluent SQL (e.g., df.select)
  3. actual SQL queries

And, in theory, you multiply those by the number of languages (e.g., Scala, Python, R)

However, it will be a huge pain to make sure all the functionalities are exactly the same - for example, UDFs might be defined a bit differently, so we need to make sure all versions are in sync.

IMO we should standardize on a "canonical way" and then offer people "use at your own risk" functionalities.

greebie commented 6 years ago

Do you think it's possible to call UDFs from Scala via PySpark as per #148? In theory it looks possible, but @MapleOx had challenges getting it to work. Ideally, all pyspark calls would be running Scala UDFs with a small set of helper scripts. Or we could go the direction of using AUT plugins that are "use at your own risk."

lintool commented 6 years ago

For ExtractDomain in the above example, I'm wrapping the existing UDF... but it's kinda a hairball. It'll no doubt be more complicated across languages, and likely cause performance issues.

One concrete proposal would be to deprecate the current RDDs + transformations in favor of fluent SQL. Not right now, but plan for it...

lintool commented 6 years ago

@greebie on the data you just handed me...

scala> df.select(Domain($"Url").as("Domain")).groupBy("Domain").count().orderBy(desc("count")).show()
+--------------------+-----+                                                    
|              Domain|count|
+--------------------+-----+
|notstrategic.blog...|  292|
|       www.uvicfa.ca|   73|
|  uvfawhycertify.org|   70|
|    www.facebook.com|   66|
|notstrategic.blog...|   63|
|www.theglobeandma...|   35|
|     www.blogger.com|   22|
|beta.images.thegl...|   20|
|         www.uvic.ca|   19|
|         twitter.com|   12|
|      www.martlet.ca|    6|
|sec.theglobeandma...|    5|
|      dublincore.org|    5|
|     plus.google.com|    4|
|  mobile.twitter.com|    4|
|          thetyee.ca|    4|
| accounts.google.com|    3|
|     web.adblade.com|    3|
|            purl.org|    2|
|     www.vicnews.com|    2|
+--------------------+-----+
only showing top 20 rows
greebie commented 6 years ago

Beauty! I think the longview should be to move to dataframes more intensely. My little bit of research suggests that dataframes to parquet is where the real big data is at right now. If not parquet, then probably some other column-based/matrix-oriented format. Warcs are definitely not going to get smaller over time -- any way to boost speed and shrink storage will be very important down the road.

However, by "longview" I'm thinking at least 5+ years. Getting historians and non-data scientists engaged is much more important in the short-run, and that's going to require more user-friendly approaches vs super-powerful data formats.

lintool commented 6 years ago

@greebie I'm familiar with Parquet but that's not very relevant for our work... that's a physical format, and it's unlikely that organizations are going to rewrite their TBs of holdings...

greebie commented 6 years ago

I've just been seeing some papers that suggest dataframes --> parquet are the way to go for huge datasets. Also suggestions around that to parquet is better than to text for df outputs. But I see your point.

lintool commented 6 years ago

I've pushed a prototype to branch df.

The following script works:

import io.archivesunleashed._
import io.archivesunleashed.df._

val df = RecordLoader.loadArchives("warcs/", sc)
  .extractValidPagesDF()

df.printSchema()

df.select(ExtractDomain($"Url").as("Domain"))
  .groupBy("Domain").count().orderBy(desc("count")).show()

@ianmilligan1 @greebie give it a try on some more substantial amount of data?

How does it compare to what we're currently doing now?

ianmilligan1 commented 6 years ago

Just ran on a slightly more substantial amount of data (6GB) and looks great. I'll run the same script on a much more substantial corpus now.


+--------------------+------+
|              Domain| count|
+--------------------+------+
|   www.equalvoice.ca|239122|
|       greenparty.ca| 37747|
|      www.liberal.ca| 12543|
|www.policyalterna...| 11096|
|          www.ndp.ca|  7458|
|        www.egale.ca|  6449|
|www.blocquebecois...|  6388|
|     www.fairvote.ca|  6356|
| www.canadiancrc.com|  2478|
| www.davidsuzuki.org|  1991|
|         www.ocap.ca|  1920|
| www.conservative.ca|  1901|
|   www.canadians.org|  1562|
|         www.ccsd.ca|  1388|
|     www.wegovern.ca|   647|
|            ccla.org|   630|
|        www.ccla.org|   471|
|     www.youtube.com|   309|
|          www.gca.ca|   306|
|         coat.ncf.ca|   209|
+--------------------+------+
only showing top 20 rows

@lintool would it be possible to use df to reproduce the plain text (say by domain and by date) and the hyperlinks? I'm just not familiar with the syntax enough to know off hand how to construct that, and you might be able to do so in your sleep.

lintool commented 6 years ago

@ianmilligan1 not yet, I need to port the UDFs over. On my TODO list is to replicate all the standard derivatives using DFs.

ianmilligan1 commented 6 years ago

Cool. This is great, and the df.select syntax seems straightforward and nicely modular to me.

lintool commented 6 years ago

Yes, I believe the df syntax maps nicely over to FAAV. We can probably bring into even better alignment with some custom DSL hacking.

greebie commented 6 years ago

This works nicely. The runspeeds are pretty close, with rdd slightly faster.

I could not import the matchbox and df at the same time due to ambiguity of the ExtractDomain function.

import io.archivesunleashed._
import io.archivesunleashed.matchbox._
import java.time.Instant
val warcPath = "warcs/*.warc.gz"

def timed(f: => Unit) = {
  val start = System.currentTimeMillis()
  f
  val end = System.currentTimeMillis()
  println("Elapsed Time: " + (end - start))
}

timed {
  println("Get urls and count, taking 3.")
  val r = RecordLoader.loadArchives(warcPath, sc)
  .keepValidPages()
  .map (r => ExtractDomain(r.getUrl))
  .countItems()
  println(r.collect().deep.mkString("\n"))
}

// Exiting paste mode, now interpreting.

Get urls and count, taking 3.
(notstrategic.blogspot.ca,292)                                                  
(www.uvicfa.ca,73)
(uvfawhycertify.org,70)
(www.facebook.com,66)
(notstrategic.blogspot.com,63)
(www.theglobeandmail.com,35)
(www.blogger.com,22)
(beta.images.theglobeandmail.com,20)
(www.uvic.ca,19)
(twitter.com,12)
(www.martlet.ca,6)
(dublincore.org,5)
(sec.theglobeandmail.com,5)
(plus.google.com,4)
(thetyee.ca,4)
(mobile.twitter.com,4)
(accounts.google.com,3)
(web.adblade.com,3)
(maps.google.ca,2)
(purl.org,2)
(theglobeandmail.com,2)
(m.theglobeandmail.com,2)
(ow.ly,2)
(pixel.facebook.com,2)
(www.vicnews.com,2)
(d1z2jf7jlzjs58.cloudfront.net,2)
(sec.images.theglobeandmail.com,2)
(www.google.com,1)
(janniaragon.wordpress.com,1)
(googleads.g.doubleclick.net,1)
(martlet.ca,1)
(v1.theglobeandmail.com,1)
(btn.weather.ca,1)
(d.adgear.com,1)
(mail.google.com,1)
(www.cfax1070.com,1)
(globeandmail.com,1)
(cdns.gigya.com,1)
(www.blogblog.com,1)
(static.ak.facebook.com,1)
(janniaragon.me,1)
(player.cfax1070.com,1)
(platform.twitter.com,1)
(www.sfufa.ca,1)
(www.cautbulletin.ca,1)
(fonts.googleapis.com,1)
(gmpg.org,1)
(ype.youneeq.ca,1)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>Elapsed Time: 4310
import io.archivesunleashed._
import io.archivesunleashed.matchbox._
import java.time.Instant
warcPath: String = warcs/*.warc.gz
timed: (f: => Unit)Unit

scala> :paste
// Entering paste mode (ctrl-D to finish)

import io.archivesunleashed._
import io.archivesunleashed.df._
import java.time.Instant

val warcPath = "warcs/*.warc.gz"

def timed(f: => Unit) = {
  val start = System.currentTimeMillis()
  f
  val end = System.currentTimeMillis()
  println("Elapsed Time: " + (end - start))
}

timed {
  println ("Testing dataframes");
  val df = RecordLoader.loadArchives(warcPath, sc)
    .extractValidPagesDF()
  df.select(ExtractDomain($"Url")
    .as("Domain")).groupBy("Domain")
    .count().orderBy(desc("count")).show()  
}

// Exiting paste mode, now interpreting.

Testing dataframes
+--------------------+-----+                                                    
|              Domain|count|
+--------------------+-----+
|notstrategic.blog...|  292|
|       www.uvicfa.ca|   73|
|  uvfawhycertify.org|   70|
|    www.facebook.com|   66|
|notstrategic.blog...|   63|
|www.theglobeandma...|   35|
|     www.blogger.com|   22|
|beta.images.thegl...|   20|
|         www.uvic.ca|   19|
|         twitter.com|   12|
|      www.martlet.ca|    6|
|sec.theglobeandma...|    5|
|      dublincore.org|    5|
|          thetyee.ca|    4|
|  mobile.twitter.com|    4|
|     plus.google.com|    4|
| accounts.google.com|    3|
|     web.adblade.com|    3|
|d1z2jf7jlzjs58.cl...|    2|
|      maps.google.ca|    2|
+--------------------+-----+
only showing top 20 rows

>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>Elapsed Time: 4865
import io.archivesunleashed._
import io.archivesunleashed.df._
import java.time.Instant
warcPath: String = warcs/*.warc.gz
timed: (f: => Unit)Unit
helgeho commented 6 years ago

This data frame discussion is great 👍 But here is just a quick note / idea on that:

When you start from an ArchiveSpark RDD or you convert an AUT RDD using the bridge with .toArchiveSpark, you can apply our enrich functions, like HTML, Entities and others, e.g., by calling rdd.enrich(Html.first("title")), depending on what data you need, and finally, convert it into a JSON format with rdd.toJsonStrings(pretty = false). The produced JSON is fully compatible with Spark's data frames, so you can directly read it with .read.json and query the contained fields.

Maybe it would be worth having a look at that and build your new UDFs on top of it instead of reinventing the wheel and write your own conversions. This way we could reuse the idea of enrichments and help it grow + support data frames with UDFs to run queries on top of that.

Further, the pre-processed JSON dataset can also be saved and directly reloaded as a data frame, so ArchiveSpark can be used for corpus building here, and a reloaded corpus would be directly supported by your work on data frames. We haven't spent much work on data frames specifically yet, but it turns out this is a very easy way to integrate both approaches, without much additional work.

lintool commented 6 years ago

Updated df branch. Now this works:

import io.archivesunleashed._
import io.archivesunleashed.df._

val df = RecordLoader.loadArchives("warcs/", sc)
  .extractHyperlinksDF()

df.printSchema()

df.select(RemovePrefixWWW(ExtractDomain($"Src")).as("SrcDomain"),
    RemovePrefixWWW(ExtractDomain($"Dest")).as("DestDomain"))
  .groupBy("SrcDomain", "DestDomain").count().orderBy(desc("SrcDomain")).show()

Result is something like this:

+---------------+--------------------+-----+                                    
|      SrcDomain|          DestDomain|count|
+---------------+--------------------+-----+
|web.adblade.com|     web.adblade.com|   18|
|web.adblade.com|smarterlifestyles...|    8|
|    vicnews.com|iservices.blackpr...|    2|
|    vicnews.com|albernivalleynews...|    2|
|    vicnews.com|       blackpress.ca|   14|
|    vicnews.com|         twitter.com|    6|
|    vicnews.com|       revweekly.com|    4|
|    vicnews.com|         vicnews.com|   35|
|    vicnews.com|       drivewaybc.ca|    4|
|    vicnews.com|         addthis.com|    2|
|    vicnews.com|placead.bcclassif...|    4|
|    vicnews.com|blogs.bclocalnews...|    4|
|    vicnews.com|       pinterest.com|    2|
|    vicnews.com|       mondaymag.com|    2|
|    vicnews.com|          paperg.com|    2|
|    vicnews.com| nanaimobulletin.com|    2|
|    vicnews.com|         pqbnews.com|    3|
|    vicnews.com| sookenewsmirror.com|    1|
|    vicnews.com|     bclocalnews.com|    8|
|    vicnews.com|        facebook.com|    4|
+---------------+--------------------+-----+
only showing top 20 rows
ianmilligan1 commented 6 years ago

Late to this, but this is working quite nicely @lintool – here they are on a decently-large collection sorted by descending count.

+--------------------+--------------------+---------+
|           SrcDomain|          DestDomain|    count|
+--------------------+--------------------+---------+
|          liberal.ca|          liberal.ca|112375610|
|              ndp.ca|              ndp.ca| 27593036|
|policyalternative...|policyalternative...| 20066112|
|     davidsuzuki.org|     davidsuzuki.org| 17909044|
|              chp.ca|              chp.ca| 15486062|
|          liberal.ca|   action.liberal.ca| 14598706|
|            ccla.org|            ccla.org| 13258763|
|       equalvoice.ca|       equalvoice.ca| 10245075|
|canadianactionpar...|canadianactionpar...|  9896440|
|              afn.ca|              afn.ca|  8109879|
|       greenparty.ca|       greenparty.ca|  3402972|
|     canadiancrc.com|     canadiancrc.com|  2937523|
|       canadians.org|       canadians.org|  2750689|
|       equalvoice.ca|gettingtothegate.com|  2464993|
|          liberal.ca|         twitter.com|  2377754|
|       equalvoice.ca|           adobe.com|  2368464|
|          liberal.ca|   events.liberal.ca|  2234996|
|       equalvoice.ca|       snapdesign.ca|  2103383|
|       equalvoice.ca|          flickr.com|  2067749|
|       equalvoice.ca|         twitter.com|  2066463|
+--------------------+--------------------+---------+

What's the best way to export these to CSV files?

greebie commented 6 years ago

I was reading this http://sigdelta.com/blog/scala-spark-udfs-in-python/ and it looks like a good path forward. Create .callUdf() and .registerUdf() functions for the objects in aut and then they can be used in Python or Scala.

Then the main Python code can just be a bridge to aut which would reduce the long-term maintenance burden.

SamFritz commented 6 years ago

RD Lightning Talk + Discussion

We had a great discussion at the Toronto Datathon about data frames. Just some quick notes made during @lintool lightning talk and feedback from the community:

The AUT team would like the community’s input in regards to data frames. Just for a bit of context, the discussion revolves around whether there is interest in moving from RDDs to data frames/table framework. We would like to gauge interest as well as what this would mean for accessibility, impact, uptake, etc.

Community input will help to direct future project priorities and determine dev cycles. Ultimately we want to focus future work on features/items that will be useful and used.

Discussion/Points brought up by AUT Datathon Participants:

Other points to consider:

Further discussion from datathon re: DF to be documented

ruebot commented 6 years ago

214 has been merged now. #209 has some examples on how to use PySpark. If you use Spark 2.3.0, it doesn't require the .enableHiveSupport() hack.

FYI: @digitalshawn, @justinlittman

jrwiebe commented 5 years ago

What is to be the relationship between the class DataFrameLoader and the DataFrame UDFs? As we add UDFs (e.g., ExtractPDFLinks) should we also add methods to DataFrameLoader that resemble the existing ones – i.e., methods that take an archive path argument, which are wrappers around RecordLoader.loadArchives()?

ianmilligan1 commented 5 years ago

^^^ @lintool ?

ruebot commented 5 years ago

@jrwiebe do you still need input from @lintool on this one?

jrwiebe commented 5 years ago

His input would be valuable. It gets to some of the point @SamFritz summarizes above: do we go all-in on DF, and deprecate RDD? Is anything important lost in doing so?

ruebot commented 5 years ago

I think it would just be hiding RDD from the user, and basically finishing migrating everything over. @lintool has a bit more in https://github.com/archivesunleashed/aut/issues/231

...I'm still waiting feedback from him on https://github.com/archivesunleashed/aut/issues/223 too

jrwiebe commented 5 years ago

Couldn't stay away.

I was looking at the "resolve before 0.18.0" tag, and noticed @lintool's comment about how filtering, in the case of a certain issue, is being done on RDD, when our plan is eventually to move everything to DF. After my contributions of the past week I thought I'd see how much work would be needed to use DataFrames from the initial loadArchives call.

I created a branch that does this (https://github.com/archivesunleashed/aut/commit/54af8339101eaaf4cd74b31e2d2941a12cd78162), and re-implements extractPDFDetailsDF using DataFrames end-to-end; the new method is extractPDFDetailsDF2. The use of _UDF methods looks clunky, but will seem less so if we convert most of our matchbox methods to UDF methods that operate on DF columns, as these do. Also, it's quite possible I've gone about things in the wrong way.

Demo

scala> :paste
// Entering paste mode (ctrl-D to finish)

import io.archivesunleashed._
import io.archivesunleashed.df._

val df = RecordLoader.loadArchivesDF("target/test-classes/warc/example.pdf.warc.gz", sc)

// Exiting paste mode, now interpreting.

import io.archivesunleashed._
import io.archivesunleashed.df._
df: org.apache.spark.sql.DataFrame = [archiveFilename: string, crawlDate: string ... 8 more fields]

scala> df.select("url", "mimeType").show(10)
+--------------------+--------------------+
|                 url|            mimeType|
+--------------------+--------------------+
|https://ajax.goog...|     text/javascript|
|https://yorkspace...|            text/css|
|https://yorkspace...|            text/css|
|https://yorkspace...|application/javas...|
|https://d39af2mgp...|application/javas...|
|https://yorkspace...|            text/css|
|https://yorkspace...|            text/css|
|https://yorkspace...|            text/css|
|https://yorkspace...|            text/css|
|http://d39af2mgp1...|            text/css|
+--------------------+--------------------+
only showing top 10 rows

scala> val pdfDeets = df.extractPDFDetailsDF2
19/08/18 16:41:41 WARN SparkSession$Builder: Using an existing SparkSession; some configuration may not take effect.
pdfDeets: org.apache.spark.sql.DataFrame = [url: string, filename: string ... 5 more fields]

scala> pdfDeets.show
19/08/18 16:41:49 WARN PDFParser: J2KImageReader not loaded. JPEG2000 files will not be processed.
See https://pdfbox.apache.org/2.0/dependencies.html#jai-image-io
for optional dependencies.

[Stage 2:>                                                          (0 + 1) / 1]19/08/18 16:41:49 WARN SQLite3Parser: org.xerial's sqlite-jdbc is not loaded.
Please provide the jar on your classpath to parse sqlite files.
See tika-parsers/pom.xml for the correct version.
+--------------------+--------------------+---------+-------------------+---------------+--------------------+--------------------+
|                 url|            filename|extension|mime_type_webserver| mime_type_tika|                 md5|         binaryBytes|
+--------------------+--------------------+---------+-------------------+---------------+--------------------+--------------------+
|https://yorkspace...|   cost-analysis.pdf|      pdf|    application/pdf|application/pdf|aaba59d2287afd40c...|[25 50 44 46 2D 3...|
|https://yorkspace...|JCDL%20-%20Cost%2...|      pdf|    application/pdf|application/pdf|322cd5239141408c4...|[25 50 44 46 2D 3...|
+--------------------+--------------------+---------+-------------------+---------------+--------------------+--------------------+
ruebot commented 5 years ago

Nice work!

My main goal here is to get @lintool's input before the 0.18.0 release, but not necessarily implement it all before then. I'm really looking the correct path forward here, and on #231, which overlaps a fair bit. That way we have a focused path post 0.18.0 release. If we have this branch, I think that is a really good start, and hopefully will align with @lintool's thinking.

jrwiebe commented 5 years ago

That makes sense.

It turns out my loadArchivesDF is not ready for prime time. I ran the PDF extraction method against the data I was using to test #348, 14g of CPP WARCs, and it failed part way through with an out of memory error. I ran an even simpler script that applies a filter to a DF of WARCs, and that failed too (log).

import io.archivesunleashed._
import io.archivesunleashed.df._

val warcs_path = "/home/jrwiebe/warcs/cpp10/*.gz"
val output_path = "/tuna1/scratch/jrwiebe/get-pdf-test/df/"

val df = RecordLoader.loadArchivesDF(warcs_path, sc)

val count_www = df.filter(col("url").contains("www")).count

println(count_www + " URLs contain 'www'")

sys.exit

It seems that the VM's array size limit is reached when converting certain RDD records -- presumably ones containing large files -- to DataFrame rows. Searching for the problem I see recommendations about tweaking spark settings, such as parallelism and number of partitions, but I think we're actually limited by the number of WARC files we're dealing with, so I don't know that this would help. Anyway, it's something to start with.

lintool commented 5 years ago

We're good to close this issue. Re: large blobs overflowing memory, we should create a new issue when it really becomes a bottleneck.