Open ohonchar opened 6 years ago
i have tested this and i am also receiving the same error. this is almost certainly an issue with how hdfs is setup. i think the best course of action is for us to review how we're instructing people to use a test hdfs and revise the instructions.
i have a feeling this would work on a system with a properly configured hdfs.
i think the port in the notebook might be wrong as well, you can see the datanode address and port by visiting the web page created by the hdfs namenode. by default this is 50010
, you might try that as well.
i am able to access the hdfs store, but i see this error when reading the file:
Py4JJavaErrorTraceback (most recent call last)
<ipython-input-3-7ae5f9e1f57a> in <module>()
1 import os
2 text_file = spc.textFile("hdfs://%s:%d%s" % (hdfs_host, hdfs_port, os.path.join("/", hdfs_path)))
----> 3 counts = text_file.flatMap(lambda line: line.split(" ")) .map(lambda word: (word, 1)) .reduceByKey(lambda a, b: a + b)
4 values = counts.collect()
5 if len(values) > 20:
/opt/spark/python/pyspark/rdd.pyc in reduceByKey(self, func, numPartitions, partitionFunc)
1606 [('a', 2), ('b', 1)]
1607 """
-> 1608 return self.combineByKey(lambda x: x, func, func, numPartitions, partitionFunc)
1609
1610 def reduceByKeyLocally(self, func):
/opt/spark/python/pyspark/rdd.pyc in combineByKey(self, createCombiner, mergeValue, mergeCombiners, numPartitions, partitionFunc)
1832 """
1833 if numPartitions is None:
-> 1834 numPartitions = self._defaultReducePartitions()
1835
1836 serializer = self.ctx.serializer
/opt/spark/python/pyspark/rdd.pyc in _defaultReducePartitions(self)
2242 return self.ctx.defaultParallelism
2243 else:
-> 2244 return self.getNumPartitions()
2245
2246 def lookup(self, key):
/opt/spark/python/pyspark/rdd.pyc in getNumPartitions(self)
2438
2439 def getNumPartitions(self):
-> 2440 return self._prev_jrdd.partitions().size()
2441
2442 @property
/opt/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in __call__(self, *args)
1131 answer = self.gateway_client.send_command(command)
1132 return_value = get_return_value(
-> 1133 answer, self.gateway_client, self.target_id, self.name)
1134
1135 for temp_arg in temp_args:
/opt/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
317 raise Py4JJavaError(
318 "An error occurred while calling {0}{1}{2}.\n".
--> 319 format(target_id, ".", name), value)
320 else:
321 raise Py4JError(
Py4JJavaError: An error occurred while calling o21.partitions.
: java.io.EOFException: End of File Exception between local host is: "base-notebook-1-s5wwq/172.17.0.5"; destination host is: "10.0.1.114":50010; : java.io.EOFException; For more details see: http://wiki.apache.org/hadoop/EOFException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:792)
at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:765)
at org.apache.hadoop.ipc.Client.call(Client.java:1479)
at org.apache.hadoop.ipc.Client.call(Client.java:1412)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
at com.sun.proxy.$Proxy21.getFileInfo(Unknown Source)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:771)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy22.getFileInfo(Unknown Source)
at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:2108)
at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1305)
at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1317)
at org.apache.hadoop.fs.Globber.getFileStatus(Globber.java:57)
at org.apache.hadoop.fs.Globber.glob(Globber.java:252)
at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1676)
at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:259)
at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:229)
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:315)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:199)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
at org.apache.spark.api.java.JavaRDDLike$class.partitions(JavaRDDLike.scala:61)
at org.apache.spark.api.java.AbstractJavaRDDLike.partitions(JavaRDDLike.scala:45)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.EOFException
at java.io.DataInputStream.readInt(DataInputStream.java:392)
at org.apache.hadoop.ipc.Client$Connection.receiveRpcResponse(Client.java:1084)
at org.apache.hadoop.ipc.Client$Connection.run(Client.java:979)
ok, i got it working, but i had to do a couple things
etc/hadoop/core-site.xml
config to this
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://10.0.1.114:9000</value>
</property>
</configuration>
9000
as the connection portin your case you will need to set the core-site.xml
to this probably:
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://10.8.251.197:9000</value>
</property>
</configuration>
and then use port 9000 to connect, although i'm guessing you could change that 8020 in the config file
we need to revisit that tutorial and make it clearer. thanks for bringing this up!
Through performing steps on tutorial https://radanalytics.io/examples/pyspark_hdfs_notebook . I've created instance with hadoop and configured hadoop single node as specified here https://hadoop.apache.org/docs/r2.7.1/hadoop-project-dist/hadoop-common/SingleCluster.html. Forwarded ports 50070 and 8020.
Deployed Jupyter notebook http://base-notebook-ohonchar.10.19.47.76.nip.io/notebooks/PySpark_HDFS.ipynb (password: supersecret) in Openshift accordingly tutorial.
As a result I can access to web ui interface http://10.8.251.197:50070/ but unfortunately I can not access to hdfs://http://10.8.251.197:8020/user/nbuser/input. In Jupyter notebook it returns with an error "Connection refused".
edit by elmiko: removed sensitive info