cerndb / dist-keras

Distributed Deep Learning, with a focus on distributed training, using Keras and Apache Spark.
http://joerihermans.com/work/distributed-keras/
GNU General Public License v3.0
623 stars 169 forks source link

When I run transformer, reported ImportError: No module named distkeras.utils #36

Closed brucebismarck closed 6 years ago

brucebismarck commented 6 years ago

I run a transformer on mnist data set.

from pyspark.sql.types import IntegerType changedTypedf = train_df.withColumn("label_int", train_df["label"].cast("integer")).select('features', 'label_int') changedTypedf.printSchema() encoder = OneHotTransformer(nb_classes, input_col="label_int", output_col="label_encoded") encoder.transform(changedTypedf)

I want to run these codes. The schema is root |-- features: vector (nullable = true) |-- label_int: integer (nullable = true)

However, it reported these error.

Traceback (most recent call last): File "/tmp/zeppelin_pyspark-4755371756168234437.py", line 367, in raise Exception(traceback.format_exc()) Exception: Traceback (most recent call last): File "/tmp/zeppelin_pyspark-4755371756168234437.py", line 360, in exec(code, _zcUserQueryNameSpace) File "", line 5, in File "/usr/local/lib/python2.7/site-packages/distkeras/transformers.py", line 299, in transform return dataframe.rdd.map(self._transform).toDF() File "/usr/lib/spark/python/pyspark/sql/session.py", line 57, in toDF return sparkSession.createDataFrame(self, schema, sampleRatio) File "/usr/lib/spark/python/pyspark/sql/session.py", line 535, in createDataFrame rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio) File "/usr/lib/spark/python/pyspark/sql/session.py", line 375, in _createFromRDD struct = self._inferSchema(rdd, samplingRatio) File "/usr/lib/spark/python/pyspark/sql/session.py", line 346, in _inferSchema first = rdd.first() File "/usr/lib/spark/python/pyspark/rdd.py", line 1361, in first rs = self.take(1) File "/usr/lib/spark/python/pyspark/rdd.py", line 1343, in take res = self.context.runJob(self, takeUpToNumLeft, p) File "/usr/lib/spark/python/pyspark/context.py", line 992, in runJob port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions) File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in call answer, self.gateway_client, self.target_id, self.name) File "/usr/lib/spark/python/pyspark/sql/utils.py", line 63, in deco return f(*a, kw) File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value format(target_id, ".", name), value) Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 26.0 failed 4 times, most recent failure: Lost task 0.3 in stage 26.0 (TID 67, ip-172-31-22-245.ec2.internal, executor 2): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/mnt1/yarn/usercache/zeppelin/appcache/application_1507639434255_0007/container_1507639434255_0007_01_000003/pyspark.zip/pyspark/worker.py", line 166, in main func, profiler, deserializer, serializer = read_command(pickleSer, infile) File "/mnt1/yarn/usercache/zeppelin/appcache/application_1507639434255_0007/container_1507639434255_0007_01_000003/pyspark.zip/pyspark/worker.py", line 55, in read_command command = serializer._read_with_length(file) File "/mnt1/yarn/usercache/zeppelin/appcache/application_1507639434255_0007/container_1507639434255_0007_01_000003/pyspark.zip/pyspark/serializers.py", line 169, in _read_with_length return self.loads(obj) File "/mnt1/yarn/usercache/zeppelin/appcache/application_1507639434255_0007/container_1507639434255_0007_01_000003/pyspark.zip/pyspark/serializers.py", line 454, in loads return pickle.loads(obj) ImportError: No module named distkeras.utils** at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193) at org.apache.spark.api.python.PythonRunner$$anon$1.(PythonRDD.scala:234) at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1690) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1678) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1677) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1677) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:855) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:855) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:855) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1905) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1860) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1849) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:671) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062) at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:446) at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:280) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:214) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/mnt1/yarn/usercache/zeppelin/appcache/application_1507639434255_0007/container_1507639434255_0007_01_000003/pyspark.zip/pyspark/worker.py", line 166, in main func, profiler, deserializer, serializer = read_command(pickleSer, infile) File "/mnt1/yarn/usercache/zeppelin/appcache/application_1507639434255_0007/container_1507639434255_0007_01_000003/pyspark.zip/pyspark/worker.py", line 55, in read_command command = serializer._read_with_length(file) File "/mnt1/yarn/usercache/zeppelin/appcache/application_1507639434255_0007/container_1507639434255_0007_01_000003/pyspark.zip/pyspark/serializers.py", line 169, in _read_with_length return self.loads(obj) File "/mnt1/yarn/usercache/zeppelin/appcache/application_1507639434255_0007/container_1507639434255_0007_01_000003/pyspark.zip/pyspark/serializers.py", line 454, in loads return pickle.loads(obj) ImportError: No module named distkeras.utils at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193) at org.apache.spark.api.python.PythonRunner$$anon$1.(PythonRDD.scala:234) at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more

Any hints for me? How can I solve it?

JoeriHermans commented 6 years ago

Are you sure dist-keras is installed on all cluster machines?

git clone https://github.com/JoeriHermans/dist-keras cd dist-keras pip install -e .

Joeri

brucebismarck commented 6 years ago

Hi Joeri,

Thanks a lot!

I know what happened!

JoeriHermans commented 6 years ago

No problem, if you run into other issues you can just open a new issue :)

Joeri

Plamen8780 commented 6 years ago

@brucebismarck, I get the exact same error. How did you solve it on your end?

@JoeriHermans, do I need to install dist-keras also on the worker nodes? In the readme it says to just execute pip install --upgrade dist-keras so I did it on the node where I start pyspark.

Plamen8780 commented 6 years ago

It seems like dist-keras should be installed also on the worker nodes.

A quick solution that worked for me was adding the following line: sc.addPyFile('distkeras.zip') where distkeras.zip contains the archived contents of https://github.com/cerndb/dist-keras/tree/master/distkeras

nadamaguid commented 6 years ago

I'm having the same problem and I made sure that dist-keras is installed on all nodes and I still got the utils error. I am using an EMR cluster with the latest version and I am trying to run the MNIST example. I tried the suggestion by @Plamen8780 by adding the sc.addPyFile() command and then I got another error which was an Import error as well:

ImportError: No module named keras at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298) at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438) at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310) at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302) at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289) at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28) at org.apache.spark.api.python.PythonRDD$$anonfun$1.apply(PythonRDD.scala:141) at org.apache.spark.api.python.PythonRDD$$anonfun$1.apply(PythonRDD.scala:141) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more

what am I missing?

reedv commented 6 years ago

@Plamen8780 Could you elaborate a bit more on what you did? I downloaded the dist-keras repo and ran zip distkeras.zip ./dist-keras/distkeras/* then used

sc = SparkSession.builder.config(conf=conf) \
            .appName(application_name) \
            .getOrCreate()
sc.sparkContext.addPyFile("/home/me/Downloads/distkeras.zip")

in my code, but still getting ImportError: No module named distkeras.utils. (Never used pyspark before) is there something else that needs to be done to integrate your workaround?