commoncrawl / cc-index-table

Index Common Crawl archives in tabular format
Apache License 2.0
107 stars 9 forks source link

Problem running the example for getting data for a language in Spark #2

Closed brad-safetonet closed 5 years ago

brad-safetonet commented 5 years ago

I'm attempting to run the example for retrieving all Icelandic data using EMR on AWS. Here's the way that I'm adding the job: aws emr add-steps --cluster-id j-1PWKHFC4A1VG4 --steps Type=CUSTOM_JAR,Name="CommonCrawlJob",Jar="command-runner.jar",ActionOnFailure=CONTINUE,Args=["/usr/lib/spark/bin/job.sh"]

And this is the job.sh file: #!/bin/bash /usr/lib/spark/bin/spark-submit --class org.commoncrawl.spark.examples.CCIndexWarcExport /tmp/cc-spark-0.2-SNAPSHOT-jar-with-dependencies.jar --query "SELECT url, warc_filename, warc_record_offset, warc_record_length WHERE crawl = 'CC-MAIN-2018-51' AND subset = 'warc' AND content_languages = 'isl'" --numOutputPartitions 12 --numRecordsPerWarcFile 20000 --warcPrefix ICELANDIC-CC-2018-51 s3://commoncrawl/cc-index/table/cc-main/warc/ s3://output_bucket/test

The error that I'm getting seems to indicate a problem with reading the index data: 19/01/08 17:08:37 WARN Utils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf. 19/01/08 17:08:37 INFO CCIndexExport: Schema of table ccindex: StructType(StructField(url_surtkey,StringType,true), StructField(url,StringType,true), StructField(url_host_name,StringType,true), StructField(url_host_tld,StringType,true), StructField(url_host_2nd_last_part,StringType,true), StructField(url_host_3rd_last_part,StringType,true), StructField(url_host_4th_last_part,StringType,true), StructField(url_host_5th_last_part,StringType,true), StructField(url_host_registry_suffix,StringType,true), StructField(url_host_registered_domain,StringType,true), StructField(url_host_private_suffix,StringType,true), StructField(url_host_private_domain,StringType,true), StructField(url_protocol,StringType,true), StructField(url_port,IntegerType,true), StructField(url_path,StringType,true), StructField(url_query,StringType,true), StructField(fetch_time,TimestampType,true), StructField(fetch_status,ShortType,true), StructField(content_digest,StringType,true), StructField(content_mime_type,StringType,true), StructField(content_mime_detected,StringType,true), StructField(warc_filename,StringType,true), StructField(warc_record_offset,IntegerType,true), StructField(warc_record_length,IntegerType,true), StructField(warc_segment,StringType,true), StructField(crawl,StringType,true), StructField(subset,StringType,true)) Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'crawl' given input columns: []; line 1 pos 72; 'Project ['url, 'warc_filename, 'warc_record_offset, 'warc_record_length] +- 'Filter ((('crawl = CC-MAIN-2018-51) && ('subset = warc)) && ('content_languages = isl)) +- OneRowRelation at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:92) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:89) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187) at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187) at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187) at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:95) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:95) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:107) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:107) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:106) at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:118) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:127) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187) at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:127) at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:95) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:89) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:84) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) at scala.collection.immutable.List.foreach(List.scala:381) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:84) at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:92) at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:105) at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57) at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55) at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47) at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:74) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:642) at org.commoncrawl.spark.examples.CCIndexExport.executeQuery(CCIndexExport.java:82) at org.commoncrawl.spark.examples.CCIndexWarcExport.run(CCIndexWarcExport.java:148) at org.commoncrawl.spark.examples.CCIndexExport.run(CCIndexExport.java:192) at org.commoncrawl.spark.examples.CCIndexWarcExport.main(CCIndexWarcExport.java:212) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:894) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 19/01/08 17:08:37 INFO SparkContext: Invoking stop() from shutdown hook

Any ideas about what's going wrong? Thanks!

sebastian-nagel commented 5 years ago

Looks like no columns at all have been recognized. Which EMR version resp. Spark version has been used? Could be also a configuration issue, I've passed explicitly:

spark-submit ... \
    --conf spark.hadoop.parquet.enable.dictionary=true \
    --conf spark.hadoop.parquet.enable.summary-metadata=false \
    --conf spark.sql.hive.metastorePartitionPruning=true \
    --conf spark.sql.parquet.filterPushdown=true \
    --conf spark.sql.parquet.mergeSchema=true \
    ...
brad-safetonet commented 5 years ago

EMR 5.19.0. I've actually tried it with the more recent EMR 5.20.0 as well, but got the same problem so I rolled back.

I think that your configuration suggestions are the most likely fix. I will have to try again with those explicit options.

brad-safetonet commented 5 years ago

I tried it again with those 5 config parameters that you suggested, but got the same result. Do you have any ideas for other config params that I could try adding?

sebastian-nagel commented 5 years ago

Hi @brad-safetonet, I was able to reproduce the problem and it's solved! Sorry, a stupid copy-paste error in the README.md: the SQL query needs to be SELECT ... FROM ccindex WHERE .... The test job is running, I'll let you know if it has succeeded, and then update the README. I'll also test whether additional options are required or only improve the performance. There's also an issue with Spark 2.4.0, see #3. I'll try to fix this issue during the next 2 weeks. Thanks!

brad-safetonet commented 5 years ago

That would explain it! Thanks for the assistance.

sebastian-nagel commented 5 years ago

Just to confirm: I've successfully tested the correct query with all the mentioned options both with Spark 2.3.2 and 2.4.0 and the work-around for #3 applied. I'll continue testing whether some configuration options are mandatory. I haven't tested Spark on EMR yet, it might be that jets3t isn't available there. Let me know when it works! And thanks for your patience!

brad-safetonet commented 5 years ago

Your suggested change to the query seems to have solved this issue (though I may be reporting another issue in the next few minutes). Closing.

sebastian-nagel commented 5 years ago

Ok, I've run tests with fewer configuration options: spark.sql.parquet.mergeSchema=true is required, at least, if columns are used which have been introduced later (that is the case for content_languages). I'll update the README.