helgeho / ArchiveSpark

An Apache Spark framework for easy data processing, extraction as well as derivation for web archives and archival collections, developed at Internet Archive.
MIT License
144 stars 19 forks source link

cdx format, includes json #4

Closed borissmidt closed 6 years ago

borissmidt commented 7 years ago

Dear helgeho,

The common crawl its cdx files have a different structure then you library expects. au,com,canberratimes)/business/mining-and-resources/bhp-on-the-hunt-for-new-chief-ft-20121106-28x2j 20170820020911 {"url": "http://www.canberratimes.com.au/business/mining-and-resources/bhp-on-the-hunt-for-new-chief-ft-20121106-28x2j", "mime": "text/html", "mime-detected": "text/html", "status": "200", "digest": "KZDZMH5BCMZJOCZPQCRUOTAPYH2CYQVJ", "length": "35157", "offset": "509044848", "filename": "crawl-data/CC-MAIN-2017-34/segments/1502886105961.34/warc/CC-MAIN-20170820015021-20170820035021-00503.warc.gz"}

I will implement the support for this, since your library seems really promising. When it works i will send you a pull request.

Another question would it be a good idea to publish your library to mvn central? This would enable more people to use it.

helgeho commented 7 years ago

Hi Boris,

Thanks for your interest in ArchiveSpark, I would love to see it working with this CDX format.

ArchiveSpark has been implemented to be very modular. So, you do not need to change the library itself to be able to support a different CDX format, We have a concept for this, called Data Specification (DataSpec). All you need to do to support this format, is to implement a custom DataSpec for it, which you should do in a separate project, preferably published to GitHub as well (so there is no pull request needed).

An example for an alternative DataSpec for books instead of web archive records, with metadata in XML, can be found here: https://github.com/helgeho/IABooksOnArchiveSpark Only our pre-defined CDX/WARC specs are included in the core ArchiveSpark project under https://github.com/helgeho/ArchiveSpark/tree/master/src/main/scala/de/l3s/archivespark/specific/warc

Also, we will soon publish the project to MVN central. We are currently working on a bigger update with some fixes and a better documentation. Once that's done it'll be available on MVN central too. However, already now, you can load it from the Internet Archive's Maven server: http://builds.archive.org/maven2/de/l3s/archivespark_2.11/2.1.3/

Hope this helps, and we are looking forward to your DataSpec!

borissmidt commented 7 years ago

Oke i will write my own object, but common crawl seems like one of the biggest targets for this library, so wouldn't it be a better idea to have these different kinds of parsing available in a single library instead of multiple scattered projects?

I noticed that the library itself might need changes to support common crawl, Since common crawl has multiple warc files. I will keep you up to date with the progress.

I'm starting to understand the structure of the library thank you.

helgeho commented 7 years ago

There are many Web archives out there and all of them are targeted by our library. Since the introduction of Data Specifications, the support goes even beyond Web archives, books and journals are just two examples, and we cannot keep all possible file formats as part of the core. As this is collaborative work with the Internet Archive, we would like to keep only their current CDX format and server in the core, which is maintained by ourselves. However, we would be more than happy to link your new DataSpec from the README in this project.

Multiple WARC files are already supported in the current version, this is also handled by the used specification, for example in WarcCdxHdfsSpec, where warcPath is a directory located on HDFS, which can contain multiple WARC files that are referred to from the loaded CDX files.

ikreymer commented 7 years ago

FWIW, the format is simply called CDXJ, and there's not yet an official standard for it.. but essentially just keeping the url key and timestamp as header (for sorting) and using a JSON block for the rest of the fields.. It should be easy to support in general and allows for flexibility as far as what fields are included. There should be a spec for the exact format used by CommonCrawl. Will let you know when I have a chance to do that, if that helps at all. Or, happy to answer any questions about the format.

borissmidt commented 7 years ago

I'm already implementing it, parsing the format seems to be really easy, especially in Scala, i.e. a single case class will probably do the job.

So i will probably make it public tomorrow if all goes well.

On Sep 25, 2017 16:33, "Ilya Kreymer" notifications@github.com wrote:

FWIW, the format is simply called CDXJ, and there's not yet an official standard for it.. but essentially just keeping the url key and timestamp as header (for sorting) and using a JSON block for the rest of the fields.. It should be easy to support in general and allows for flexibility as far as what fields are included. There should be a spec for the exact format used by CommonCrawl. Will let you know when I have a chance to do that, if that helps at all. Or, happy to answer any questions about the format.

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/helgeho/ArchiveSpark/issues/4#issuecomment-331900634, or mute the thread https://github.com/notifications/unsubscribe-auth/AFt8lmceDSKXgSB6l2lbRdNDsahxNQY3ks5sl7m6gaJpZM4Piv4h .

borissmidt commented 7 years ago

Would it be a good idea to make the code generic depending on the Json object inside the CDX?

i.e.

//the mimum parts required needed to load from a warc gz file in hdfs
trait MinimumJsonCDX {
  def offset: String
  def length: String
  def filename: String
}
abstract class CdxJRecordExtractor[CdxJson <: MinimumJsonCDX]{
  abstract def parseJson(json :String): Option[CdxJson]
  implicit val formats = DefaultFormats
  def fromString(line: String) : Option[CdxJRecord[CdxJson]] = {
    val (prefix, json) = line.span(_ == '{')
    val data: Seq[String] = prefix.split(' ')
    for {
      surt <- data.lift(0)
      fetchtime <-  data.lift(1)
      info <- parseJson(json)
    }yield(CdxJRecord(surt,fetchtime,info))
  }
}

For common crawls we would have these concrete classes to parse the cdx files.

case class CdxJRecord[CdxJson <: MinimumJsonCDX](surtUrl:String, fetchtime: String, info: CdxJson)

case class CommonCrawlJson(url: String, mime: String, `mime-detected`: String, status: String, digest: String, length: String, offset: String, filename: String) extends MinimumJsonCDX

object CdxCommonCrawl extends CdxJRecordExtractor[CommonCrawlJson] {
 import org.json4s.native.JsonMethods.parse
  override def parseJson(json: String): Option[CommonCrawlJson] = parse(json).extractOpt[CommonCrawlJson]
}

I specialized the other classes so it accepts the json object as a generic parameter and implemented MultiTextDataLoader which is used to load multiple cdx files as implemented in the common crawl.

But is this extra flexibility worth the extra complexity? I'm asking because i'm used to write application code and not library code, which in scala is a big difference.

helgeho commented 7 years ago

Boris, unfortunately, I do not have the time to look into this in detail right now, please give me a few days to get back to you with better answers. Just a few things that I immediately noticed:

We work with multiple CDX files all the time, TextDataLoader does exactly that, there is no need for a MultiTextDataLoader. If you look at the code of TextDataLoader you will see sc.textFile(dataPath), this method is the way to load multiple text files with Spark if dataPath includes globs, e.g., /path/to/*.cdx.

My recommendation to keep it flexible would be to parse the JSON simply into a Map[String, Any] and provide an implicit conversation of your (to be implemented) CdxJWarcRecord class into a CommonCrawlCdxRecord class, which wraps the map and provides typed getters to the fields in the map (cp. WarcRecord).

Besides, I think all you need to do is to copy our WarcCdxHdfsSpec, merge it with WarcHdfsSpecBase (instead than deriving from it, copy the methods to your spec class) and switch all mentions of CdxRecord and WarcRecord with your own classes. As far as I can see, WarcRecord can be simply copied if you change the CdxRecord type in its constructor and the DataEnrichRoot base to your own (and potentially replace the implicit cast as decribed above).

Just one minor thing:

Thanks for your efforts!

borissmidt commented 7 years ago

The case class containing the mime-detected handles the conversion from json to a wrapped Map[String, Any].

The json part of an cdx entry: {"url": "http://www.canberratimes.com.au/business/mining-and-resources/bhp-on-the-hunt-for-new-chief-ft-20121106-28x2j", "mime": "text/html", "mime-detected": "text/html", "status": "200", "digest": "KZDZMH5BCMZJOCZPQCRUOTAPYH2CYQVJ", "length": "35157", "offset": "509044848", "filename": "crawl-data/CC-MAIN-2017-34/segments/1502886105961.34/warc/CC-MAIN-20170820015021-20170820035021-00503.warc.gz"}

I prefer to define a case class and extract it, Which lets the type system help me with extracting the values from the Map[String,Any].

Common crawl has an index file containing the paths of the cdx files, at the moment they are all in the same folder and a glob could be used, this however might change. The MultiTextDataLoader just loads each individual path instead of using a glob.

Also thank you for pointing out that i could just merge WarcCdxHdfsSpe with WarcHdfsSpecBase. It makes the code simpeler.

But i finished the code and i will test it now when everything works as planned i will create a new repo with the code in it.

borissmidt commented 7 years ago

Yesterday i was able to filter the cdx and acces the payload of html pages. Tomorrow i will ask permission from my boss to opensource the cdx json code, This will probably be granted.

helgeho commented 7 years ago

Sounds good, looking forward to it!

borissmidt commented 7 years ago

https://github.com/trafficdirect/SparkArchiveJCDX I still need to add the readme with the instructions on the usage.

helgeho commented 7 years ago

Cool 👍 how about ArchiveSpark instead of SparkArchive?

borissmidt commented 7 years ago

Damn, i stil cannot get it correct in my head :( i will change it ofcourse. https://github.com/trafficdirect/ArchiveSparkJCDX

helgeho commented 7 years ago

Also, if you add http://builds.archive.org/maven2 as a resolver to your build.sbt like so: https://github.com/helgeho/IABooksOnArchiveSpark/blob/master/build.sbt , SBT and IDEs can resolve ArchiveSpark automatically during development. I would still keep it as "provided" though, to not include it into your JAR at build time and keep it small.

borissmidt commented 7 years ago

Done, i will improve the description later on, I hope this implementation is good enough.

The extention is made flexible enough that you just implement your own json parser This can be done with a case class or a Map[String,Any] wrapped into an class which implements the MinimumJCdxJson.

You can also parse it from a String if the cdx line doesn't conform to the Commoncrawl format: surtUrl fetchtime {jsonobject}

If i look at it in a retrospective way, when using this fromString function you can implement any kind of cdx format, xml,csv etc. if required.

Maybe i should rename the classes to indicate this :)

borissmidt commented 7 years ago

I see that my libary fails to implement the value function, when i use.

  def parsePayload(parseSettings: ParseSettings,record: WarcJRecord[CommonCrawlJson]): PageUpdateInit = {

    val content = record.value[Array[Byte]](CommonCrawlWarcPayload, JWarcPayload.PayloadField)
  }
[error] /home/boris/direct/document-matcher-elastic/Matcher/src/main/scala/nl/trafficdirect/app/WarcIndexer.scala:39: wrong number of type parameters for overloaded method value value with alternatives:
[error]   [SpecificRoot >: nl.trafficdirect.archivespark.specific.warc.WarcJRecord[nl.trafficdirect.archivespark.specific.warc.commoncrawl.CommonCrawlJson] <: de.l3s.archivespark.enrich.EnrichRoot, T](f: de.l3s.archivespark.enrich.EnrichFunc[SpecificRoot, _], field: String)(implicit evidence$2: scala.reflect.ClassTag[T])Option[T] <and>
[error]   [SpecificRoot >: nl.trafficdirect.archivespark.specific.warc.WarcJRecord[nl.trafficdirect.archivespark.specific.warc.commoncrawl.CommonCrawlJson] <: de.l3s.archivespark.enrich.EnrichRoot, T](f: de.l3s.archivespark.enrich.EnrichFunc[SpecificRoot, _] with de.l3s.archivespark.enrich.DefaultField[T])(implicit evidence$1: scala.reflect.ClassTag[T])Option[T]
[error]     val content =  record.value[Array[Byte]](JWarcPayload,JWarcPayload.PayloadField)

i'll probably use this workaround for now and once i have more time and know more about scala programming and i will fix this inconvenience.

  class ValueGetter(record: WarcJRecord[_]) {
    def getValue[A](field:String)(implicit c: ClassTag[A]): Option[A] = {
      record.enrichment[A](field).map(_.get).collect{case x: A => x}
    }
  }

  implicit def toValueGetter(record: WarcJRecord[_]) = new ValueGetter(record)
helgeho commented 7 years ago

Sorry, that was actually my fault as I defined the type as a generic parameter of value in my description here https://github.com/helgeho/ArchiveSpark/issues/5, that's now corrected. Instead, you should define the type of the variable that you assign it to:

val content: Option[Array[Byte]] = record.value(CommonCrawlWarcPayload)

or (if you know a payload exists):

val content: Array[Byte] = record.value(CommonCrawlWarcPayload).get

(a field pointer is not required in this case, because payload is already the default field of that enrich function)

helgeho commented 7 years ago

I just noticed, because payload is the default field as I mentioned before, the enrich function also carries its type, so in this case, you do not even specify a type, so even this will work:

val contentOpt = record.value(CommonCrawlWarcPayload)
val content = record.value(CommonCrawlWarcPayload).get

In order to select another field, such as http headers, you will need to specify a type though:

val headersOpt: Option[Map[String, String]] = record.value(CommonCrawlWarcPayload, WarcPayload.HttpHeaderField)
val headers: Map[String, String] = record.value(CommonCrawlWarcPayload, WarcPayload.HttpHeaderField).get
borissmidt commented 7 years ago

Thank you :)

helgeho commented 7 years ago

I just pushed a long-awaited update to version 2.5, with some new features and bug fixes, among others:

More examples on these new features will follow soon.

Please also update your code to 2.5, thanks!

borissmidt commented 7 years ago

Hey i tried upgrading to 2.5, but it seems that it is unable to resolve the library, i also tried 2.5.0 but neither did work. It seems that version 2.1.2 was only in my local cache and cannot be resolved.

The sbt build used:

resolvers ++= Seq(
  "internetarchive" at "http://builds.archive.org/maven2"
)
lazy val dependencies = Seq(
  "org.apache.hadoop" % "hadoop-client" % "2.7.3" % "provided"
  ,"de.l3s" % "archivespark" % "2.5" % "provided"
  ,"org.apache.spark" %% "spark-core" % sparkVersion % "provided"
  ,"org.json4s" %% "json4s-native" % "3.2.11"
)
lazy val testDependencies = Seq(
  "org.scalatest" %% "scalatest" % "3.0.0" % "test"
  ,"org.scalactic" %% "scalactic" % "3.0.0" % "test"
)
libraryDependencies ++= dependencies ++ testDependencies
borissmidt commented 7 years ago

Sorry i simes that i forgot to add %% :)

helgeho commented 7 years ago

Done, 2.5 is now available on Maven Central:

libraryDependencies += "com.github.helgeho" %% "archivespark" % "2.5"
helgeho commented 6 years ago

Hi Boris, I just pushed our new documentation: https://github.com/helgeho/ArchiveSpark/blob/master/docs/README.md

This also includes a link to your DataSpec: https://github.com/helgeho/ArchiveSpark/blob/master/docs/DataSpecs.md

borissmidt commented 6 years ago

Thank you, but i havn't updated to the latest version yet, i might do it when i find the time.

helgeho commented 6 years ago

This should not be an issue as the DataSpec API has not changed since 2.5, and as you have defined ArchiveSpark as provided in your build.sbt, using it with a newer version of ArchiveSpark will just work I guess (not tested though).

Best, Helge