maxpumperla / elephas

Distributed Deep learning with Keras & Spark
http://maxpumperla.com/elephas/
MIT License
1.57k stars 312 forks source link

TypeError: can't pickle _thread.lock objects #146

Closed datistiquo closed 3 years ago

datistiquo commented 5 years ago

Similar to the issue here but different I think: https://github.com/maxpumperla/elephas/issues/82

For the MNIST example but not the jupyter example, instead for the below code:


import sys
import os
os.environ['JAVA_HOME'] = os.getenv("JAVA_HOME")
print(os.getenv("JAVA_HOME"))

import findspark
findspark.init()

#import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, StandardScaler, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql.functions import rand
from pyspark.mllib.evaluation import MulticlassMetrics

# Keras / Deep Learning
from keras.models import Sequential
from keras.layers.core import Dense, Dropout, Activation
from keras import optimizers, regularizers
from keras.optimizers import Adam

if __name__ == '__main__':

    from pyspark import SparkContext, SparkConf
    conf = SparkConf().setAppName('MNIST').setMaster('local[8]')

    from keras.datasets import mnist
    from keras.utils import np_utils

    (x_train, y_train), (x_test, y_test) = mnist.load_data()
    x_train = x_train.reshape(60000, 784)
    x_test = x_test.reshape(10000, 784)
    x_train = x_train.astype('float32')
    x_test = x_test.astype('float32')
    x_train /= 255
    x_test /= 255
    nb_classes = 10
    y_train = np_utils.to_categorical(y_train, nb_classes)

    from keras.models import Sequential
    from keras.layers.core import Dense, Dropout, Activation
    from keras.optimizers import SGD

    model = Sequential()
    model.add(Dense(128, input_dim=784))
    model.add(Activation('relu'))
    model.add(Dropout(0.2))
    model.add(Dense(128))
    model.add(Activation('relu'))
    model.add(Dropout(0.2))
    model.add(Dense(10))
    model.add(Activation('softmax'))
    model.compile(loss='categorical_crossentropy', optimizer=SGD())

    from elephas.utils.rdd_utils import to_simple_rdd
    rdd = to_simple_rdd(sc, x_train, y_train)

    from elephas.spark_model import SparkModel

    spark_model = SparkModel(model, frequency='epoch', mode='asynchronous')
    spark_model.fit(rdd, epochs=20, batch_size=32, verbose=0, validation_split=0.1)

I get an analogous error:


C:\Java\Java8
Using TensorFlow backend.
19/10/09 15:27:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
WARNING
2019-10-09 15:27:12.161649: I tensorflow/core/platform/cpu_feature_guard.cc:141] Your CPU supports instructions that this TensorFlow binary was not compiled to use: AVX2
>>> Fit model
Traceback (most recent call last):
  File "pyspark.py", line 105, in <module>
    spark_model.fit(rdd, epochs=20, batch_size=32, verbose=0, validation_split=0.1)
  File "C:\Users\\Anaconda3\lib\site-packages\elephas\spark_model.py", line 151, in fit
    self._fit(rdd, epochs, batch_size, verbose, validation_split)
  File "C:\Users\\Anaconda3\lib\site-packages\elephas\spark_model.py", line 163, in _fit
    self.start_server()
  File "C:\Users\\Anaconda3\lib\site-packages\elephas\spark_model.py", line 118, in start_server
    self.parameter_server.start()
  File "C:\Users\\Anaconda3\lib\site-packages\elephas\parameter\server.py", line 85, in start
    self.server.start()
  File "C:\Users\\Anaconda3\lib\multiprocessing\process.py", line 105, in start
    self._popen = self._Popen(self)
  File "C:\Users\\Anaconda3\lib\multiprocessing\context.py", line 223, in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
  File "C:\Users\\Anaconda3\lib\multiprocessing\context.py", line 322, in _Popen
    return Popen(process_obj)
  File "C:\Users\\Anaconda3\lib\multiprocessing\popen_spawn_win32.py", line 65, in __init__
    reduction.dump(process_obj, to_child)
  File "C:\Users\\Anaconda3\lib\multiprocessing\reduction.py", line 60, in dump
    ForkingPickler(file, protocol).dump(obj)
TypeError: can't pickle _thread.lock objects
C:\Java\Java8
ERFOLGREICH: Der Prozess mit PID 8104 (untergeordnetem Prozess von PID 11096) wurde beendet.
ERFOLGREICH: Der Prozess mit PID 11096 (untergeordnetem Prozess von PID 14268) wurde beendet.
ERFOLGREICH: Der Prozess mit PID 14268 (untergeordnetem Prozess von PID 3964) wurde beendet.

And afterwards:


Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "C:\Users\\Anaconda3\lib\multiprocessing\spawn.py", line 105, in spawn_main
    exitcode = _main(fd)
  File "C:\Users\\Anaconda3\lib\multiprocessing\spawn.py", line 115, in _main
    self = reduction.pickle.load(from_parent)
EOFError: Ran out of input
AjayJadhavDS commented 4 years ago

any update on this issue. I am also facing the same.

Running on windows 10, Python 3.7

Error -

Fit model WARNING Using TensorFlow backend. Traceback (most recent call last): Traceback (most recent call last): File "", line 1, in File "Elephas.py", line 69, in Temp() File "Elephas.py", line 62, in Temp spark_model.fit(rdd, epochs=epochs, batch_size=batch_size, verbose=0, validation_split=0.1) File "C:\Users\ajadhav\AppData\Local\Continuum\anaconda3\lib\site-packages\elephas-0.4.3-py3.7.egg\elephas\spark_model.py", line 151, in fit self._fit(rdd, epochs, batch_size, verbose, validation_split) File "C:\Users\ajadhav\AppData\Local\Continuum\anaconda3\lib\site-packages\elephas-0.4.3-py3.7.egg\elephas\spark_model.py", line 163, in _fit self.start_server() File "C:\Users\ajadhav\AppData\Local\Continuum\anaconda3\lib\site-packages\elephas-0.4.3-py3.7.egg\elephas\spark_model.py", line 118, in start_server File "C:\Users\ajadhav\AppData\Local\Continuum\anaconda3\lib\multiprocessing\spawn.py", line 105, in spawn_main self.parameter_server.start() File "C:\Users\ajadhav\AppData\Local\Continuum\anaconda3\lib\site-packages\elephas-0.4.3-py3.7.egg\elephas\parameter\server.py", line 85, in start exitcode = _main(fd) File "C:\Users\ajadhav\AppData\Local\Continuum\anaconda3\lib\multiprocessing\spawn.py", line 115, in _main self.server.start() File "C:\Users\ajadhav\AppData\Local\Continuum\anaconda3\lib\multiprocessing\process.py", line 112, in start self = reduction.pickle.load(from_parent) self._popen = self._Popen(self) EOFError: Ran out of input File "C:\Users\ajadhav\AppData\Local\Continuum\anaconda3\lib\multiprocessing\context.py", line 223, in _Popen return _default_context.get_context().Process._Popen(process_obj) File "C:\Users\ajadhav\AppData\Local\Continuum\anaconda3\lib\multiprocessing\context.py", line 322, in _Popen return Popen(process_obj) File "C:\Users\ajadhav\AppData\Local\Continuum\anaconda3\lib\multiprocessing\popen_spawn_win32.py", line 89, in init reduction.dump(process_obj, to_child) File "C:\Users\ajadhav\AppData\Local\Continuum\anaconda3\lib\multiprocessing\reduction.py", line 60, in dump ForkingPickler(file, protocol).dump(obj) TypeError: can't pickle _thread.lock objects

proloyghosh1234 commented 4 years ago

I am also facing the same issue with

danielenricocahall commented 3 years ago

This issue may be resolved in the latest 1.0.0 release - I believe it was an issue with compatibility with Tensorflow 2.x API. Please try and let me know!

HungVS commented 3 years ago

@danielenricocahall I tried with the latest 1.0.0. release. I'm still facing that issue. Could you give me some advice?

danielenricocahall commented 3 years ago

@CoderandGymnast I made some modifications with the imports - I was able to run the following successfully:


from pyspark.sql import SparkSession

if __name__ == '__main__':
    from pyspark import SparkContext, SparkConf

    conf = SparkConf().setAppName('MNIST').setMaster('local[8]')

    from tensorflow.keras.datasets import mnist
    from tensorflow.keras.utils import to_categorical

    (x_train, y_train), (x_test, y_test) = mnist.load_data()
    x_train = x_train.reshape(60000, 784)
    x_test = x_test.reshape(10000, 784)
    x_train = x_train.astype('float32')
    x_test = x_test.astype('float32')
    x_train /= 255
    x_test /= 255
    nb_classes = 10
    y_train = to_categorical(y_train, nb_classes)

    from tensorflow.keras.models import Sequential
    from tensorflow.keras.layers import Dense, Dropout, Activation
    from tensorflow.keras.optimizers import SGD

    model = Sequential()
    model.add(Dense(128, input_dim=784))
    model.add(Activation('relu'))
    model.add(Dropout(0.2))
    model.add(Dense(128))
    model.add(Activation('relu'))
    model.add(Dropout(0.2))
    model.add(Dense(10))
    model.add(Activation('softmax'))
    model.compile(loss='categorical_crossentropy', optimizer=SGD())

    from elephas.utils.rdd_utils import to_simple_rdd
    sc = SparkSession.builder.getOrCreate().sparkContext

    rdd = to_simple_rdd(sc, x_train, y_train)

    from elephas.spark_model import SparkModel

    spark_model = SparkModel(model, frequency='epoch', mode='asynchronous')
    spark_model.fit(rdd, epochs=20, batch_size=32, verbose=0, validation_split=0.1)

Can you try the above? I'm running tensorflow 2.1.3 and pyspark 3.0.1.

HungVS commented 3 years ago

@danielenricocahall

HungVS commented 3 years ago

@proloyghosh1234 @AjayJadhavDS @datistiquo Any update on this issue?

danielenricocahall commented 3 years ago

@CoderandGymnast just tried and succeeded with a clean environment and same configuration, so I'm not sure what the problem is :( Maybe it's an issue with running on a Windows machine? I may have triage it further, but it does not appear to be reproducible on Linux machines.

HungVS commented 3 years ago

@danielenricocahall OK, I'll set up & try on a virtual Linux machine.

HungVS commented 3 years ago

@danielenricocahall I use WSL2 with OpenJDK 11.0.9.1 and everything works perfectly. Thank you so much.

danielenricocahall commented 3 years ago

@CoderandGymnast great! Glad to hear it :)