Build and process the Common Crawl index table – an index to WARC files in a columnar data format (Apache Parquet).
The index table is built from the Common Crawl URL index files by Apache Spark. It can be queried by SparkSQL, Amazon Athena (built on Presto or Trino), Apache Hive and many other big data frameworks and applications.
This projects provides a comprehensive set of example queries (SQL) and also Java code to fetch and process the WARC records matched by a SQL query.
mvn package
Spark needs to be installed in order to build the table and also (alternatively) for processing. Please refer to the Spark documentation how to install Spark and set up a Spark cluster.
Not part of this project. Please have a look at cc-pyspark for examples how to query and process the tabular URL index with Python and PySpark. The project cc-notebooks includes some examples how to gain insights into the Common Crawl data sets using the columnar index.
A Spark job converts the Common Crawl URL index files (a sharded gzipped index in CDXJ format) into a table in Parquet or ORC format.
> APPJAR=target/cc-index-table-0.3-SNAPSHOT-jar-with-dependencies.jar
> $SPARK_HOME/bin/spark-submit --class org.commoncrawl.spark.CCIndex2Table $APPJAR
CCIndex2Table [options] <inputPathSpec> <outputPath>
Arguments:
<inputPaths>
pattern describing paths of input CDX files, e.g.
s3a://commoncrawl/cc-index/collections/CC-MAIN-2017-43/indexes/cdx-*.gz
<outputPath>
output directory
Options:
-h,--help Show this message
--outputCompression <arg> data output compression codec: gzip/zlib
(default), snappy, lzo, none
--outputFormat <arg> data output format: parquet (default), orc
--partitionBy <arg> partition data by columns (comma-separated,
default: crawl,subset)
--useNestedSchema use the schema with nested columns (default:
false, use flat schema)
The script convert_url_index.sh runs CCIndex2Table
using Spark on Yarn.
Columns are defined and described in the table schema (flat or nested).
First, the table needs to be imported into Amazon Athena. In the Athena Query Editor:
ccindex
: CREATE DATABASE ccindex
and make sure that it's selected as "DATABASE"s3://
. Execute the "create table" query.s3://
: MSCK REPAIR TABLE ccindex
(do not forget to adapt the table name). This step needs to be repeated every time new data partitions have been added.A couple of sample queries are also provided (for the flat schema):
.
into words): count-hostname-elements.sql/
): count-url-path-elements.sql/robots.txt
may be a redirect)Athena creates results in CSV format. E.g., for the last example, the mining of multi-lingual domains we get:
domain | n_lang | n_pages | lang_counts |
---|---|---|---|
vatican.va | 40 | 42795 | {de=3147, ru=20, be=1, fi=3, pt=4036, bg=11, lt=1, hr=395, fr=5677, hu=79, uc=2, uk=17, sk=20, sl=4, sp=202, sq=5, mk=1, ge=204, sr=2, sv=3, or=2243, sw=5, el=5, mt=2, en=7650, it=10776, es=5360, zh=5, iw=2, cs=12, ar=184, vi=1, th=4, la=1844, pl=658, ro=9, da=2, tr=5, nl=57, po=141} |
iubilaeummisericordiae.va | 7 | 2916 | {de=445, pt=273, en=454, it=542, fr=422, pl=168, es=612} |
osservatoreromano.va | 7 | 1848 | {de=284, pt=42, en=738, it=518, pl=62, fr=28, es=176} |
cultura.va | 3 | 1646 | {en=373, it=1228, es=45} |
annusfidei.va | 6 | 833 | {de=51, pt=92, en=171, it=273, fr=87, es=159} |
pas.va | 2 | 689 | {en=468, it=221} |
photogallery.va | 6 | 616 | {de=90, pt=86, en=107, it=130, fr=83, es=120} |
im.va | 6 | 325 | {pt=2, en=211, it=106, pl=1, fr=3, es=2} |
museivaticani.va | 5 | 266 | {de=63, en=54, it=47, fr=37, es=65} |
laici.va | 4 | 243 | {en=134, it=5, fr=51, es=53} |
radiovaticana.va | 3 | 220 | {en=5, it=214, fr=1} |
casinapioiv.va | 2 | 213 | {en=125, it=88} |
vaticanstate.va | 5 | 193 | {de=25, en=76, it=24, fr=25, es=43} |
laityfamilylife.va | 5 | 163 | {pt=21, en=60, it=3, fr=78, es=1} |
camposanto.va | 1 | 156 | {de=156} |
synod2018.va | 3 | 113 | {en=24, it=67, fr=22} |
As a first use case, let's export parts of the table and save it in one of the formats supported by Spark. The tool CCIndexExport runs a Spark job to extract parts of the index table and save it as a table in Parquet, ORC, JSON or CSV. It may even transform the data into an entirely different table. Please refert to the Spark SQL programming guide and the overview of built-in SQL functions for more information.
The tool requires as arguments input and output path, but you also want to pass a useful SQL query instead of the default SELECT * FROM ccindex LIMIT 10
. All available command-line options are show when called with --help
:
> $SPARK_HOME/bin/spark-submit --class org.commoncrawl.spark.examples.CCIndexExport $APPJAR --help
CCIndexExport [options] <tablePath> <outputPath>
Arguments:
<tablePath>
path to cc-index table
s3://commoncrawl/cc-index/table/cc-main/warc/
<outputPath>
output directory
Options:
-h,--help Show this message
-q,--query <arg> SQL query to select rows
-t,--table <arg> name of the table data is loaded into
(default: ccindex)
--numOutputPartitions <arg> repartition data to have <n> output partitions
--outputCompression <arg> data output compression codec: none, gzip/zlib
(default), snappy, lzo, etc.
Note: the availability of compression options
depends on the chosen output format.
--outputFormat <arg> data output format: parquet (default), orc,
json, csv
--outputPartitionBy <arg> partition data by columns (comma-separated,
default: crawl,subset)
The following Spark SQL options are recommended to achieve an optimal query performance:
spark.hadoop.parquet.enable.dictionary=true
spark.hadoop.parquet.enable.summary-metadata=false
spark.sql.hive.metastorePartitionPruning=true
spark.sql.parquet.filterPushdown=true
Because the schema of the index table has slightly changed over time by adding new columns the following option is required if any of the new columns (e.g., content_languages
) is used in the query:
spark.sql.parquet.mergeSchema=true
The URL index was initially created to easily fetch web page captures from the Common Crawl archives. The columnar index also contains the necessary information for this task - the fields warc_filename
, warc_record_offset
and warc_record_length
. This allows us to define a subset of the Common Crawl archives by a SQL query, fetch all records of the subset and export them to WARC files for further processing. The tool CCIndexWarcExport addresses this use case:
> $SPARK_HOME/bin/spark-submit --class org.commoncrawl.spark.examples.CCIndexWarcExport $APPJAR --help
CCIndexWarcExport [options] <tablePath> <outputPath>
Arguments:
<tablePath>
path to cc-index table
s3://commoncrawl/cc-index/table/cc-main/warc/
<outputPath>
output directory
Options:
-q,--query <arg> SQL query to select rows. Note: the result
is required to contain the columns `url',
`warc_filename', `warc_record_offset' and
`warc_record_length', make sure they're
SELECTed.
-t,--table <arg> name of the table data is loaded into
(default: ccindex)
--csv <arg> CSV file to load WARC records by filename,
offset and length.The CSV file must have
column headers and the input columns `url',
`warc_filename', `warc_record_offset' and
`warc_record_length' are mandatory, see also
option --query.
-h,--help Show this message
--numOutputPartitions <arg> repartition data to have <n> output
partitions
--numRecordsPerWarcFile <arg> allow max. <n> records per WARC file. This
will repartition the data so that in average
one partition contains not more than <n>
rows. Default is 10000, set to -1 to disable
this option.
Note: if both --numOutputPartitions and
--numRecordsPerWarcFile are used, the former
defines the minimum number of partitions,
the latter the maximum partition size.
--warcCreator <arg> (WARC info record) creator of WARC export
--warcOperator <arg> (WARC info record) operator of WARC export
--warcPrefix <arg> WARC filename prefix
Let's try to put together a couple of WARC files containing only web pages written in Icelandic (ISO-639-3 language code isl). We choose Icelandic because it's not so common and the number of pages in the Common Crawl archives is manageable, cf. the language statistics. We take the query get-records-for-language.sql and run it as Spark job:
> $SPARK_HOME/bin/spark-submit \
--conf spark.hadoop.parquet.enable.dictionary=true \
--conf spark.hadoop.parquet.enable.summary-metadata=false \
--conf spark.sql.hive.metastorePartitionPruning=true \
--conf spark.sql.parquet.filterPushdown=true \
--conf spark.sql.parquet.mergeSchema=true \
--class org.commoncrawl.spark.examples.CCIndexWarcExport $APPJAR \
--query "SELECT url, warc_filename, warc_record_offset, warc_record_length
FROM ccindex
WHERE crawl = 'CC-MAIN-2018-43' AND subset = 'warc' AND content_languages = 'isl'" \
--numOutputPartitions 12 \
--numRecordsPerWarcFile 20000 \
--warcPrefix ICELANDIC-CC-2018-43 \
s3://commoncrawl/cc-index/table/cc-main/warc/ \
.../my_output_path/
It's also possible to pass the result of SQL query as a CSV file, e.g., an Athena result file. If you've already run the get-records-for-language.sql and the output file is available on S3, just replace the --query
argument by --csv
pointing to the result file:
> $SPARK_HOME/bin/spark-submit --class org.commoncrawl.spark.examples.CCIndexWarcExport $APPJAR \
--csv s3://aws-athena-query-results-123456789012-us-east-1/Unsaved/2018/10/26/a1a82705-047c-4902-981d-b7a93338d5ac.csv \
...