rjurney / Agile_Data_Code_2

Code for Agile Data Science 2.0, O'Reilly 2017, Second Edition
http://bit.ly/agile_data_science
MIT License
456 stars 306 forks source link

Chapter 2: Exercises #113

Closed geopolitis closed 2 years ago

geopolitis commented 5 years ago

Please find below all the out of the Exercise. The step 10 is successful, the step 11 fails. Also fails the "Loading and Inspecting Parquet Files" find the output below.

================= Exercises First count the number of job titles for each executive, then create a pyspark.sql.Row for this result. In [10]:

records = csv_lines.map(lambda line: line.split(',')) records.collect() ​ groups = records.groupBy(lambda x: x[0]) counts = groups.map(lambda x: (x[0], len(x[1]))) new_rows = counts.map(lambda x: Row(name=x[0], count=x[1])) new_rows.collect() Out[10]: [Row(count=1, name='Florian Liebert'), Row(count=2, name='Russell Jurney'), Row(count=1, name='Don Brown'), Row(count=1, name='Steve Jobs'), Row(count=1, name='Donald Trump')] In [11]:

rows.toDF().select("name","count").show()

Py4JJavaError Traceback (most recent call last) /home/vagrant/spark/python/pyspark/sql/utils.py in deco(*a, *kw) 62 try: ---> 63 return f(a, **kw) 64 except py4j.protocol.Py4JJavaError as e:

/home/vagrant/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 318 "An error occurred while calling {0}{1}{2}.\n". --> 319 format(target_id, ".", name), value) 320 else:

Py4JJavaError: An error occurred while calling o181.select. : org.apache.spark.sql.AnalysisException: cannot resolve 'count' given input columns: [company, name, title];; 'Project [name#1, 'count] +- LogicalRDD [company#0, name#1, title#2]

at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:88)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:85)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:268)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:268)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:279)
at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:289)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1$1.apply(QueryPlan.scala:293)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:293)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$6.apply(QueryPlan.scala:298)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:298)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:268)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:85)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:78)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:78)
at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:91)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:52)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:67)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withPlan(Dataset.scala:2884)
at org.apache.spark.sql.Dataset.select(Dataset.scala:1150)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)

During handling of the above exception, another exception occurred:

AnalysisException Traceback (most recent call last)

in ----> 1 rows.toDF().select("name","count").show() /home/vagrant/spark/python/pyspark/sql/dataframe.py in select(self, *cols) 1038 [Row(name=u'Alice', age=12), Row(name=u'Bob', age=15)] 1039 """ -> 1040 jdf = self._jdf.select(self._jcols(*cols)) 1041 return DataFrame(jdf, self.sql_ctx) 1042 /home/vagrant/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in __call__(self, *args) 1131 answer = self.gateway_client.send_command(command) 1132 return_value = get_return_value( -> 1133 answer, self.gateway_client, self.target_id, self.name) 1134 1135 for temp_arg in temp_args: /home/vagrant/spark/python/pyspark/sql/utils.py in deco(*a, **kw) 67 e.java_exception.getStackTrace())) 68 if s.startswith('org.apache.spark.sql.AnalysisException: '): ---> 69 raise AnalysisException(s.split(': ', 1)[1], stackTrace) 70 if s.startswith('org.apache.spark.sql.catalyst.analysis'): 71 raise AnalysisException(s.split(': ', 1)[1], stackTrace) AnalysisException: "cannot resolve '`count`' given input columns: [company, name, title];;\n'Project [name#1, 'count]\n+- LogicalRDD [company#0, name#1, title#2]\n" =================== **Loading and Inspecting Parquet Files** Using the SparkSession to load files as DataFrames and inspecting their contents... In [19]: # Load the parquet file containing flight delay records on_time_dataframe = spark.read.parquet('../data/on_time_performance.parquet') ​ # Register the data for Spark SQL on_time_dataframe.registerTempTable("on_time_performance") ​ # Check out the columns on_time_dataframe.columns --------------------------------------------------------------------------- Py4JJavaError Traceback (most recent call last) /home/vagrant/spark/python/pyspark/sql/utils.py in deco(*a, **kw) 62 try: ---> 63 return f(*a, **kw) 64 except py4j.protocol.Py4JJavaError as e: /home/vagrant/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 318 "An error occurred while calling {0}{1}{2}.\n". --> 319 format(target_id, ".", name), value) 320 else: Py4JJavaError: An error occurred while calling o385.parquet. : org.apache.spark.sql.AnalysisException: Path does not exist: file:/home/vagrant/Agile_Data_Code_2/data/on_time_performance.parquet; at org.apache.spark.sql.execution.datasources.DataSource$.org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary(DataSource.scala:626) at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$14.apply(DataSource.scala:350) at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$14.apply(DataSource.scala:350) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.immutable.List.foreach(List.scala:381) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at scala.collection.immutable.List.flatMap(List.scala:344) at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:349) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178) at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:559) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:280) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:214) at java.lang.Thread.run(Thread.java:748) During handling of the above exception, another exception occurred: AnalysisException Traceback (most recent call last) in 1 # Load the parquet file containing flight delay records ----> 2 on_time_dataframe = spark.read.parquet('../data/on_time_performance.parquet') 3 4 # Register the data for Spark SQL 5 on_time_dataframe.registerTempTable("on_time_performance") /home/vagrant/spark/python/pyspark/sql/readwriter.py in parquet(self, *paths) 289 [('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 'int')] 290 """ --> 291 return self._df(self._jreader.parquet(_to_seq(self._spark._sc, paths))) 292 293 @ignore_unicode_prefix /home/vagrant/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in __call__(self, *args) 1131 answer = self.gateway_client.send_command(command) 1132 return_value = get_return_value( -> 1133 answer, self.gateway_client, self.target_id, self.name) 1134 1135 for temp_arg in temp_args: /home/vagrant/spark/python/pyspark/sql/utils.py in deco(*a, **kw) 67 e.java_exception.getStackTrace())) 68 if s.startswith('org.apache.spark.sql.AnalysisException: '): ---> 69 raise AnalysisException(s.split(': ', 1)[1], stackTrace) 70 if s.startswith('org.apache.spark.sql.catalyst.analysis'): 71 raise AnalysisException(s.split(': ', 1)[1], stackTrace) AnalysisException: 'Path does not exist: file:/home/vagrant/Agile_Data_Code_2/data/on_time_performance.parquet;'
rjurney commented 5 years ago

@geopolitis Looking at this now...

rjurney commented 5 years ago

I pushed a fix, the name for count/tital wasn't being called right.

rjurney commented 5 years ago

Still fixing...

rjurney commented 5 years ago

@geopolitis This is fixed.

geopolitis commented 4 years ago

@rjurney a few of the issues have been fixed, however, seem like the issues below insists and also after a few steps is missing the file /data/simple_flight_delay_features.jsonl.bz2 "Building a Classifier Model to Predict Flight Delays Loading Our Data"

Could you help?

rows.toDF().select("name","count").show()

Py4JJavaError Traceback (most recent call last) /home/vagrant/spark/python/pyspark/sql/utils.py in deco(*a, *kw) 62 try: ---> 63 return f(a, **kw) 64 except py4j.protocol.Py4JJavaError as e:

/home/vagrant/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 318 "An error occurred while calling {0}{1}{2}.\n". --> 319 format(target_id, ".", name), value) 320 else:

Py4JJavaError: An error occurred while calling o181.select. : org.apache.spark.sql.AnalysisException: cannot resolve 'count' given input columns: [company, name, title];; 'Project [name#1, 'count] +- LogicalRDD [company#0, name#1, title#2]

at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:88)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:85)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:268)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:268)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:279)
at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:289)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1$1.apply(QueryPlan.scala:293)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:293)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$6.apply(QueryPlan.scala:298)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:298)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:268)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:85)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:78)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:78)
at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:91)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:52)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:67)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withPlan(Dataset.scala:2884)
at org.apache.spark.sql.Dataset.select(Dataset.scala:1150)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)

During handling of the above exception, another exception occurred:

AnalysisException Traceback (most recent call last)

in ----> 1 rows.toDF().select("name","count").show() /home/vagrant/spark/python/pyspark/sql/dataframe.py in select(self, *cols) 1038 [Row(name=u'Alice', age=12), Row(name=u'Bob', age=15)] 1039 """ -> 1040 jdf = self._jdf.select(self._jcols(*cols)) 1041 return DataFrame(jdf, self.sql_ctx) 1042 /home/vagrant/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in __call__(self, *args) 1131 answer = self.gateway_client.send_command(command) 1132 return_value = get_return_value( -> 1133 answer, self.gateway_client, self.target_id, self.name) 1134 1135 for temp_arg in temp_args: /home/vagrant/spark/python/pyspark/sql/utils.py in deco(*a, **kw) 67 e.java_exception.getStackTrace())) 68 if s.startswith('org.apache.spark.sql.AnalysisException: '): ---> 69 raise AnalysisException(s.split(': ', 1)[1], stackTrace) 70 if s.startswith('org.apache.spark.sql.catalyst.analysis'): 71 raise AnalysisException(s.split(': ', 1)[1], stackTrace) AnalysisException: "cannot resolve '`count`' given input columns: [company, name, title];;\n'Project [name#1, 'count]\n+- LogicalRDD [company#0, name#1, title#2]\n"
geopolitis commented 4 years ago

@rjurney regarding the /data/simple_flight_delay_features.jsonl.bz2 "Building a Classifier Model to Predict Flight Delays Loading Our Data"

I found a solution. The script in Chapter08 (ch08/download_data.sh ) downloads that files. However you need those files in chapter 02.

rjurney commented 4 years ago

I fixed the download issue in https://github.com/rjurney/Agile_Data_Code_2/commit/1a55d930d3a12c6259673ce92706ccd7c4a0d281

Run git pull origin master and you should get the update.

rjurney commented 4 years ago

I'm looking at the rest...

rjurney commented 2 years ago

@geopolitis I fixed all these to make the class go in 2021.