rjurney / Agile_Data_Code_2

Code for Agile Data Science 2.0, O'Reilly 2017, Second Edition
http://bit.ly/agile_data_science
MIT License
456 stars 307 forks source link

When trying to run the ch02/pyspark_elasticsearch.py on the Vagrant image, I'm getting an error, slightly different error in ec2 #57

Closed markucsb closed 6 years ago

markucsb commented 6 years ago

csv_lines = sc.textFile("data/example.csv") data = csv_lines.map(lambda line: line.split(",")) schema_data = data.map(lambda x: ('key', {'name': x[0], 'company': x[1], 'title': x[2]})) schema_data.saveAsNewAPIHadoopFile( ... path='-', ... outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat", ... keyClass="org.apache.hadoop.io.NullWritable", ... valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", ... conf={ "es.resource" : "agile_data_science/executives" }) Traceback (most recent call last): File "", line 6, in File "/home/vagrant/spark/python/pyspark/rdd.py", line 1421, in saveAsNewAPIHadoopFile keyConverter, valueConverter, jconf) File "/home/vagrant/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in call File "/home/vagrant/spark/python/pyspark/sql/utils.py", line 63, in deco return f(*a, **kw) File "/home/vagrant/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.saveAsNewAPIHadoopFile. : java.lang.NoClassDefFoundError: org/apache/commons/httpclient/URIException at org.elasticsearch.hadoop.cfg.Settings.getProperty(Settings.java:550) at org.elasticsearch.hadoop.cfg.Settings.getResourceWrite(Settings.java:513) at org.elasticsearch.hadoop.mr.EsOutputFormat.init(EsOutputFormat.java:257) at org.elasticsearch.hadoop.mr.EsOutputFormat.checkOutputSpecs(EsOutputFormat.java:233) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1099) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1085) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopFile$2.apply$mcV$sp(PairRDDFunctions.scala:1005) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopFile$2.apply(PairRDDFunctions.scala:996) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopFile$2.apply(PairRDDFunctions.scala:996) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopFile(PairRDDFunctions.scala:996) at org.apache.spark.api.python.PythonRDD$.saveAsNewAPIHadoopFile(PythonRDD.scala:834) at org.apache.spark.api.python.PythonRDD.saveAsNewAPIHadoopFile(PythonRDD.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:280) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:214) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.ClassNotFoundException: org.apache.commons.httpclient.URIException at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 31 more

pdawczak commented 6 years ago

Similar here - I was able to execute simple queries against my elasticsearch, but pyspark fails to do so.

  1. I cdd to ch02 dir and run my pyspark with command:
PYSPARK_DRIVER_PYTHON=ipython pyspark --jars ../lib/mongo-hadoop-spark-2.0.2.jar,../lib/mongo-java-driver-3.4.2.jar,../lib/mongo-hadoop-2.0.2.jar --driver-class-path ../lib/mongo-hadoop-spark-2.0.2.jar:../lib/mongo-java-driver-3.4.2.jar:../lib/mongo-hadoop-2.0.2.jar
  1. In there, I've run:
>>> exec(open("pyspark_elasticsearch.py").read())
  1. which resulted with error:
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<string>", line 10, in <module>
  File "/home/vagrant/spark/python/pyspark/rdd.py", line 1421, in saveAsNewAPIHadoopFile
    keyConverter, valueConverter, jconf)
  File "/home/vagrant/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
  File "/home/vagrant/spark/python/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/home/vagrant/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.saveAsNewAPIHadoopFile.
: java.lang.ClassNotFoundException: org.elasticsearch.hadoop.mr.LinkedMapWritable
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:348)
        at org.apache.spark.util.Utils$.classForName(Utils.scala:229)
        at org.apache.spark.api.python.PythonRDD$$anonfun$getKeyValueTypes$1$$anonfun$apply$14.apply(PythonRDD.scala:738)
        at org.apache.spark.api.python.PythonRDD$$anonfun$getKeyValueTypes$1$$anonfun$apply$14.apply(PythonRDD.scala:737)
        at scala.Option.map(Option.scala:146)
        at org.apache.spark.api.python.PythonRDD$$anonfun$getKeyValueTypes$1.apply(PythonRDD.scala:737)
        at org.apache.spark.api.python.PythonRDD$$anonfun$getKeyValueTypes$1.apply(PythonRDD.scala:736)
        at scala.Option.flatMap(Option.scala:171)
        at org.apache.spark.api.python.PythonRDD$.getKeyValueTypes(PythonRDD.scala:736)
        at org.apache.spark.api.python.PythonRDD$.saveAsNewAPIHadoopFile(PythonRDD.scala:828)
        at org.apache.spark.api.python.PythonRDD.saveAsNewAPIHadoopFile(PythonRDD.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:280)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:214)
        at java.lang.Thread.run(Thread.java:748)

I'm afraid error message doesn't tell me much...

markucsb commented 6 years ago

One thing I noticed is that the Vagrant provisioning has errors in several places, including the download and installation of both kafka and hadoop, it references versions that are no longer there.

pdawczak commented 6 years ago

Thanks @markucsb for your comment - do you have any suggestions or are you aware of how could that be fixed? I've stoped reading the book as I'm unable to reproduce those simple tests...

markucsb commented 6 years ago

Did not mean to Close the issue, I do not have any suggestions at this point, I've now done the EC2 instance setup and am hitting another issue. I am also blocked by this issue and cannot move forward in the book, very frustrating.

schema_data.saveAsNewAPIHadoopFile( ... path='-', ... outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat", ... keyClass="org.apache.hadoop.io.NullWritable", ... valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", ... conf={ "es.resource" : "agile_data_science/executives" }) Traceback (most recent call last): File "", line 6, in File "/home/ubuntu/spark/python/pyspark/rdd.py", line 1421, in saveAsNewAPIHadoopFile keyConverter, valueConverter, jconf) File "/home/ubuntu/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in call File "/home/ubuntu/spark/python/pyspark/sql/utils.py", line 63, in deco return f(*a, **kw) File "/home/ubuntu/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.saveAsNewAPIHadoopFile. : java.lang.NoClassDefFoundError: org/apache/commons/httpclient/protocol/ProtocolSocketFactory at org.elasticsearch.hadoop.rest.commonshttp.CommonsHttpTransportFactory.create(CommonsHttpTransportFactory.java:39) at org.elasticsearch.hadoop.rest.NetworkClient.selectNextNode(NetworkClient.java:99) at org.elasticsearch.hadoop.rest.NetworkClient.(NetworkClient.java:82) at org.elasticsearch.hadoop.rest.NetworkClient.(NetworkClient.java:59) at org.elasticsearch.hadoop.rest.RestClient.(RestClient.java:92) at org.elasticsearch.hadoop.rest.InitializationUtils.discoverEsVersion(InitializationUtils.java:284) at org.elasticsearch.hadoop.mr.EsOutputFormat.init(EsOutputFormat.java:260) at org.elasticsearch.hadoop.mr.EsOutputFormat.checkOutputSpecs(EsOutputFormat.java:233) at org.apache.spark.internal.io.SparkHadoopMapReduceWriter$.write(SparkHadoopMapReduceWriter.scala:76) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1085) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1084) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopFile$2.apply$mcV$sp(PairRDDFunctions.scala:1003) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopFile$2.apply(PairRDDFunctions.scala:994) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopFile$2.apply(PairRDDFunctions.scala:994) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopFile(PairRDDFunctions.scala:994) at org.apache.spark.api.python.PythonRDD$.saveAsNewAPIHadoopFile(PythonRDD.scala:839) at org.apache.spark.api.python.PythonRDD.saveAsNewAPIHadoopFile(PythonRDD.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:280) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:214) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.ClassNotFoundException: org.apache.commons.httpclient.protocol.ProtocolSocketFactory at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 36 more

markucsb commented 6 years ago

I was able to get it to work on the ec2 instance by installing http://central.maven.org/maven2/commons-httpclient/commons-httpclient/3.1/commons-httpclient-3.1.jar within ~/Agile_Data_Code_2/lib/ and add it at the end of this line: spark.jars /home/ubuntu/Agile_Data_Code_2/lib/mongo-hadoop-spark-2.0.2.jar,/home/ubuntu/Agile_Data_Code_2/lib/mongo-java-driver-3.4.2.jar,/home/ubuntu/Agile_Data_Code_2/lib/mongo-hadoop-2.0.2.jar,/home/ubuntu/Agile_Data_Code_2/lib/elasticsearch-spark-20_2.10-5.5.1.jar,/home/ubuntu/Agile_Data_Code_2/lib/snappy-java-1.1.2.6.jar,/home/ubuntu/Agile_Data_Code_2/lib/lzo-hadoop-1.0.5.jar

within the /home/vagrant/spark/conf/spark-defaults.conf

rjurney commented 6 years ago

@markucsb @pdawczak Sorry for the problems. The Vagrant image is deprecated, I can't manage to support two environments and trying to do so was a mistake. Use the EC2 scripts, and I will fix any issues you find there. The EC2 instance I use is less expensive than the book if you stop them when you aren't using it. Please bare with me!

rjurney commented 6 years ago

@markucsb I think I resolved this in https://github.com/rjurney/Agile_Data_Code_2/commit/a596fc6091579268d026df149990245b0fc0e322 by following your instructions. Would you mind testing things out?

pdawczak commented 6 years ago

Thanks @rjurney for your response. It's a shame - I was counting on being able to set up local environment... I commute roughly about 3 hours to work in the train and being able to do code exercises on my machine would let me progress with the book.

I appreciate managing Vagrant in this instance is difficult for you, but would you be able to point me to some resources (eg. what versions of tools, how to manage vagrant) so I could investigate myself?

Thanks!

markucsb commented 6 years ago

@rjurney looks like the ec2 bootstrap has the same issue with kafka version, it is not installed properly. Can you update that before I test the other changes. Airflow is also not installed

rjurney commented 6 years ago

Thanks for letting me know. I'll take a look at it tonight.

rjurney commented 6 years ago

I hate to do this, but Vagrant has been deprecated. I can't support two systems on my limited time. Use EC2. :(