Open anjackson opened 2 years ago
Example of usage:
// Set load the WARCs:
Dataset<Row> df = WarcLoader.createDataFrame("test.warc.gz", spark);
df.createOrReplaceTempView("mementos");
// Run queries as SQL:
Dataset<Row> dft = spark.sql("SELECT url, `content.*` FROM mementos WHERE contentType IS NOT NULL");
dft.show();
@anjackson we moved over to using Sparkling for our processing earlier this year. It's been really great for us. aut
is basically a utility for creating a common set of DataFrames from what Sparkling processes.
...and I'm reminded of conversations we had 5 or 6 years ago about our projects working towards some common code base.
Thanks @ruebot - I have spent some time looking at Sparkling, but right now it feels like the gap is too big for me to switch over. Our current implementation is based on locally caching and repeatedly processing the payload (without passing large byte[]
arrays around) because we're tight for memory. My understanding is that Sparkling can support working in that way, but porting over to it would be quite a lot of work, and right now I don't really understand what benefits it would bring. I suspect this is because I've not fully grasped what Spark and Sparkling can do!
As I don't have much time to work on this, I'm starting off by learning how to use Spark, and porting the current process over with minimal changes. As I learn more I can hopefully bring things closer together, but it looks like being a long road.
Some experimentation with how to setup up the extraction without knowing all fields ahead of time... See 62de913b.
As per https://spark.apache.org/docs/latest/sql-getting-started.html#programmatically-specifying-the-schema this can work, but needs a bit of help. The composite Map function or functions that get applied via mapPartitions
needs to declare what fields it's going to add and what their types are. (Note that I tried doing this by grabbing the .first()
item in the RDD but that's rather costly as the source file gets reprocessed.)
The wrapper can then declare the schema, and the WARC-based RDD can be transformed to an JavaRDD<Row>
that matches the declared schema, both based on the sequence of field names.
Current implementation hardcodes the mapping and stores the types as metadata classes, so it can cope when the values are null. It seems that a more elegant implementation would require changes to the current indexer so the analysis declares the expected fields ahead of time.
Or we could just declare them all for now.
I realised older Hadoop support was needed, so experimented with some of these ideas. But after hacking things together, it's clear the Parquet writer is going to be painful to run under old Hadoop.
attempt_20220929220613710_0001_r_000000_0: 2022-09-29 22:06:34 ERROR TaskTracker:203 - Error running child : java.lang.NoSuchMethodError: 'long org.apache.hadoop.fs.FileSystem.getDefaultBlockSize(org.apache.hadoop.fs.Path)'
attempt_20220929220613710_0001_r_000000_0: at org.apache.parquet.hadoop.util.HadoopOutputFile.defaultBlockSize(HadoopOutputFile.java:93)
attempt_20220929220613710_0001_r_000000_0: at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:338)
attempt_20220929220613710_0001_r_000000_0: at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:314)
attempt_20220929220613710_0001_r_000000_0: at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:480)
attempt_20220929220613710_0001_r_000000_0: at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:435)
attempt_20220929220613710_0001_r_000000_0: at org.apache.parquet.hadoop.mapred.DeprecatedParquetOutputFormat$RecordWriterWrapper.<init>(DeprecatedParquetOutputFormat.java:92)
attempt_20220929220613710_0001_r_000000_0: at org.apache.parquet.hadoop.mapred.DeprecatedParquetOutputFormat.getRecordWriter(DeprecatedParquetOutputFormat.java:77)
attempt_20220929220613710_0001_r_000000_0: at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:433)
attempt_20220929220613710_0001_r_000000_0: at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:411)
attempt_20220929220613710_0001_r_000000_0: at org.apache.hadoop.mapred.Child.main(Child.java:170)
So, idea is that old Hadoop will just output JSONL, and then this can be transferred to the newer cluster as needed.
To support more modern patterns of usage, and more complex processing, it would be good to support Spark.
Long term, this should likely integrate with the Archives Unleashed Toolkit, but at the moment this is not easy for us to transition to using that. This is mostly due to how it handles the record contents, which gets embedded in the data frames, which leads to some heavy memory pressure (TBA some notes).
The current
hadoop3
branchWarcLoader
provides an initial implementation. It works by building a RDD stream of WARC Records, but also supports running the analyser on that stream, which is able to work on the full 'local' byte streams as long as no re-partitioning has happened. This can then output a stream of objects that contain the extracted metadata fields and are no longer tied to the original WARC input streams. This can then be turned into a DataFrame and SQL can be run on it. It can also be exported as Parquet etc.loadAndAnalyse
andcreateDataFrame
(short term).df.createOrReplaceTempView("mementos")
The POJO approach does mean we end up with a very wide schema with a lot of nulls if there's not been much analysis. Supporting a more dynamic schema would be nice, but then again fixing the schema aligns with the Solr schema.