Closed rayjang closed 6 years ago
First method is bad I think. I got memory error. I will try other method.
first method cannot be possible since dataset is too big. vectorAssembler takes infinite time.
Hi Rayjang,
Sorry for not replying sooner. At the end of the week I'll have some time to look into this issue. I'll keep you posted if I come up with some other ideas.
Joeri
Update.
my image data's shape is (224,224,3) and the total number of dataset is 800.
raw_dataset_train = reader.read.format('com.databricks.spark.csv') \ .options(header='false', inferSchema='true', maxColumns='1000000') \ .load(path_train)
This code is possible ( I changed maxColumns since my columns are 224'224'3)
BUT the below code isn't (takes infinite time..)
ector_assembler = VectorAssembler(inputCols=features, outputCol="features")
If you have some time, plz share good idea or code for efficient way of distributed DL on image data with many dimensions
pairs
= [(x, y) for x, y in zip(features, labels)]
sc.parallelize(pairs)`
I also tried this way, but I got ' out of memory' error.
update. pairs = [(x, y) for x, y in zip(features, labels)] sc.parallelize(pairs)` It is working well (I didn't set up more driver memory) I am trying to find a method to use numpy feature and numpy label directly to train distributedly
This method requires that all the pairs fit into memory right? I find it really strange that VectorAssembler takes so much time...
Joeri
I think it was related to my Driver memory. My research has some conditions related to memory size, so I limited the memory size. I increased memory more and added more machines, so It worked well in distributed way. ( summ up: This problem is from my memory condition. but at that condition, there was no error except infinite work time.) BUT I am still finding better way to train my image(.png) files and lables. Converting to csv -> df -> spark vector. I think it is complicated in the reality.
VectorAssembler is still slow. I use 1 master with 6G Driver memory, 3 workers with 1 CPUcore, , 6G worker memory, Titanx X GPU
I don't know why VA is slow or take time a lot..(I think processing 800 rows of data takes infinite time. 3 rows of data takes time a lot)
I am not sure that this problem comes from shortage of memory or core , or fault of my code
my csv file's columns are like this...
Hmmm. What if you would code a "custom" vector assembler? For example, something like this:
def to_vector(row):
columns = ['your', 'columns', 'here']
new_row = []
for column in columns:
new_row.append(column)
return new_row
Of course, this snippet could be improved much more.
Joeri
Now What I am doing is like .. I try to make dense vector for 'feature' directly with numpy array.
x_flatten = x_train.reshape((800,-1))
z_train = np.empty((800, 224*224*3))
for i in range(800):
z_train[i,:] = x_flatten[i,:]
df = pd.DataFrame(columns=['feature','label'])
for i in range(799):
df.set_value(i,'feature',[z_train[i]])
df.set_value(i,'label',[y_train[i]])
df['feature'] = df['feature'].apply(lambda x: x[0])
df['label'] = df['label'].apply(lambda x: x[0])
df['feature'] = df['feature'].apply(lambda x: x.tolist())
df['feature'] = df['feature'].apply(lambda x: DenseVector(x))
dataset_train = reader.createDataFrame(df)
nb_classes = 100
encoder = OneHotTransformer(nb_classes, input_col="label", output_col="label_encoded")
dataset_train = encoder.transform(dataset_train)
dataset_train = dataset_train.select( "feature","label", "label_encoded")
reshape_transformer = ReshapeTransformer("feature", "matrix", (224, 224, 3))
dataset_train = reshape_transformer.transform(dataset_train)
dataset_train = dataset_train.select("feature", "matrix","label", "label_encoded")
dataset_train.repartition(num_workers)
trainer = DOWNPOUR(keras_model=model, worker_optimizer=sgd, loss='categorical_crossentropy', num_workers=1,
batch_size=4, communication_window=5, num_epoch=1,
features_col="feature", label_col="label_encoded")
trained_model = trainer.train(training_set)
print("Training time: " + str(trainer.get_training_time()))
This is Schema
I got this error..
Traceback (most recent call last):
File "/HOME/rayjang/spark-2.2.0-bin-hadoop2.7/python/pyspark/cloudpickle.py", line 148, in dump
return Pickler.dump(self, obj)
File "/HOME/anaconda3/lib/python3.5/pickle.py", line 408, in dump
self.save(obj)
File "/HOME/anaconda3/lib/python3.5/pickle.py", line 475, in save
f(self, obj) # Call unbound method with explicit self
File "/HOME/anaconda3/lib/python3.5/pickle.py", line 740, in save_tuple
save(element)
File "/HOME/anaconda3/lib/python3.5/pickle.py", line 475, in save
f(self, obj) # Call unbound method with explicit self
File "/HOME/rayjang/spark-2.2.0-bin-hadoop2.7/python/pyspark/cloudpickle.py", line 445, in save_instancemethod
self.save_reduce(types.MethodType, (obj.__func__, obj.__self__), obj=obj)
File "/HOME/rayjang/spark-2.2.0-bin-hadoop2.7/python/pyspark/cloudpickle.py", line 582, in save_reduce
save(args)
File "/HOME/anaconda3/lib/python3.5/pickle.py", line 475, in save
f(self, obj) # Call unbound method with explicit self
File "/HOME/anaconda3/lib/python3.5/pickle.py", line 725, in save_tuple
save(element)
File "/HOME/anaconda3/lib/python3.5/pickle.py", line 520, in save
self.save_reduce(obj=obj, *rv)
File "/HOME/rayjang/spark-2.2.0-bin-hadoop2.7/python/pyspark/cloudpickle.py", line 600, in save_reduce
save(state)
File "/HOME/anaconda3/lib/python3.5/pickle.py", line 475, in save
f(self, obj) # Call unbound method with explicit self
File "/HOME/anaconda3/lib/python3.5/pickle.py", line 810, in save_dict
self._batch_setitems(obj.items())
File "/HOME/anaconda3/lib/python3.5/pickle.py", line 836, in _batch_setitems
save(v)
File "/HOME/anaconda3/lib/python3.5/pickle.py", line 520, in save
self.save_reduce(obj=obj, *rv)
File "/HOME/rayjang/spark-2.2.0-bin-hadoop2.7/python/pyspark/cloudpickle.py", line 600, in save_reduce
save(state)
File "/HOME/anaconda3/lib/python3.5/pickle.py", line 475, in save
f(self, obj) # Call unbound method with explicit self
File "/HOME/anaconda3/lib/python3.5/pickle.py", line 810, in save_dict
self._batch_setitems(obj.items())
File "/HOME/anaconda3/lib/python3.5/pickle.py", line 836, in _batch_setitems
save(v)
File "/HOME/anaconda3/lib/python3.5/pickle.py", line 520, in save
self.save_reduce(obj=obj, *rv)
File "/HOME/rayjang/spark-2.2.0-bin-hadoop2.7/python/pyspark/cloudpickle.py", line 600, in save_reduce
save(state)
File "/HOME/anaconda3/lib/python3.5/pickle.py", line 475, in save
f(self, obj) # Call unbound method with explicit self
File "/HOME/anaconda3/lib/python3.5/pickle.py", line 810, in save_dict
self._batch_setitems(obj.items())
File "/HOME/anaconda3/lib/python3.5/pickle.py", line 836, in _batch_setitems
save(v)
File "/HOME/anaconda3/lib/python3.5/pickle.py", line 520, in save
self.save_reduce(obj=obj, *rv)
File "/HOME/rayjang/spark-2.2.0-bin-hadoop2.7/python/pyspark/cloudpickle.py", line 600, in save_reduce
save(state)
File "/HOME/anaconda3/lib/python3.5/pickle.py", line 475, in save
f(self, obj) # Call unbound method with explicit self
File "/HOME/anaconda3/lib/python3.5/pickle.py", line 810, in save_dict
self._batch_setitems(obj.items())
File "/HOME/anaconda3/lib/python3.5/pickle.py", line 836, in _batch_setitems
save(v)
File "/HOME/anaconda3/lib/python3.5/pickle.py", line 520, in save
self.save_reduce(obj=obj, *rv)
File "/HOME/rayjang/spark-2.2.0-bin-hadoop2.7/python/pyspark/cloudpickle.py", line 600, in save_reduce
save(state)
File "/HOME/anaconda3/lib/python3.5/pickle.py", line 475, in save
f(self, obj) # Call unbound method with explicit self
File "/HOME/anaconda3/lib/python3.5/pickle.py", line 810, in save_dict
self._batch_setitems(obj.items())
File "/HOME/anaconda3/lib/python3.5/pickle.py", line 836, in _batch_setitems
save(v)
File "/HOME/anaconda3/lib/python3.5/pickle.py", line 475, in save
f(self, obj) # Call unbound method with explicit self
File "/HOME/anaconda3/lib/python3.5/pickle.py", line 770, in save_list
self._batch_appends(obj)
File "/HOME/anaconda3/lib/python3.5/pickle.py", line 794, in _batch_appends
save(x)
File "/HOME/anaconda3/lib/python3.5/pickle.py", line 475, in save
f(self, obj) # Call unbound method with explicit self
File "/HOME/anaconda3/lib/python3.5/pickle.py", line 740, in save_tuple
save(element)
File "/HOME/anaconda3/lib/python3.5/pickle.py", line 475, in save
f(self, obj) # Call unbound method with explicit self
File "/HOME/anaconda3/lib/python3.5/pickle.py", line 810, in save_dict
self._batch_setitems(obj.items())
File "/HOME/anaconda3/lib/python3.5/pickle.py", line 836, in _batch_setitems
save(v)
File "/HOME/anaconda3/lib/python3.5/pickle.py", line 475, in save
f(self, obj) # Call unbound method with explicit self
File "/HOME/anaconda3/lib/python3.5/pickle.py", line 810, in save_dict
self._batch_setitems(obj.items())
File "/HOME/anaconda3/lib/python3.5/pickle.py", line 836, in _batch_setitems
save(v)
File "/HOME/anaconda3/lib/python3.5/pickle.py", line 506, in save
self.save_global(obj, rv)
File "/HOME/rayjang/spark-2.2.0-bin-hadoop2.7/python/pyspark/cloudpickle.py", line 372, in save_global
if obj.__module__ == "__builtin__" or obj.__module__ == "builtins":
AttributeError: 'NotImplementedType' object has no attribute '__module__'
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
/HOME/rayjang/spark-2.2.0-bin-hadoop2.7/python/pyspark/cloudpickle.py in dump(self, obj)
147 try:
--> 148 return Pickler.dump(self, obj)
149 except RuntimeError as e:
/HOME/anaconda3/lib/python3.5/pickle.py in dump(self, obj)
407 self.framer.start_framing()
--> 408 self.save(obj)
409 self.write(STOP)
/HOME/anaconda3/lib/python3.5/pickle.py in save(self, obj, save_persistent_id)
474 if f is not None:
--> 475 f(self, obj) # Call unbound method with explicit self
476 return
/HOME/anaconda3/lib/python3.5/pickle.py in save_tuple(self, obj)
739 for element in obj:
--> 740 save(element)
741
/HOME/anaconda3/lib/python3.5/pickle.py in save(self, obj, save_persistent_id)
474 if f is not None:
--> 475 f(self, obj) # Call unbound method with explicit self
476 return
/HOME/rayjang/spark-2.2.0-bin-hadoop2.7/python/pyspark/cloudpickle.py in save_instancemethod(self, obj)
444 if PY3:
--> 445 self.save_reduce(types.MethodType, (obj.__func__, obj.__self__), obj=obj)
446 else:
/HOME/rayjang/spark-2.2.0-bin-hadoop2.7/python/pyspark/cloudpickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
581 save(func)
--> 582 save(args)
583 write(pickle.REDUCE)
/HOME/anaconda3/lib/python3.5/pickle.py in save(self, obj, save_persistent_id)
474 if f is not None:
--> 475 f(self, obj) # Call unbound method with explicit self
476 return
/HOME/anaconda3/lib/python3.5/pickle.py in save_tuple(self, obj)
724 for element in obj:
--> 725 save(element)
726 # Subtle. Same as in the big comment below.
/HOME/anaconda3/lib/python3.5/pickle.py in save(self, obj, save_persistent_id)
519 # Save the reduce() output and finally memoize the object
--> 520 self.save_reduce(obj=obj, *rv)
521
/HOME/rayjang/spark-2.2.0-bin-hadoop2.7/python/pyspark/cloudpickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
599 if state is not None:
--> 600 save(state)
601 write(pickle.BUILD)
/HOME/anaconda3/lib/python3.5/pickle.py in save(self, obj, save_persistent_id)
474 if f is not None:
--> 475 f(self, obj) # Call unbound method with explicit self
476 return
/HOME/anaconda3/lib/python3.5/pickle.py in save_dict(self, obj)
809 self.memoize(obj)
--> 810 self._batch_setitems(obj.items())
811
/HOME/anaconda3/lib/python3.5/pickle.py in _batch_setitems(self, items)
835 save(k)
--> 836 save(v)
837 write(SETITEMS)
/HOME/anaconda3/lib/python3.5/pickle.py in save(self, obj, save_persistent_id)
519 # Save the reduce() output and finally memoize the object
--> 520 self.save_reduce(obj=obj, *rv)
521
/HOME/rayjang/spark-2.2.0-bin-hadoop2.7/python/pyspark/cloudpickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
599 if state is not None:
--> 600 save(state)
601 write(pickle.BUILD)
/HOME/anaconda3/lib/python3.5/pickle.py in save(self, obj, save_persistent_id)
474 if f is not None:
--> 475 f(self, obj) # Call unbound method with explicit self
476 return
/HOME/anaconda3/lib/python3.5/pickle.py in save_dict(self, obj)
809 self.memoize(obj)
--> 810 self._batch_setitems(obj.items())
811
/HOME/anaconda3/lib/python3.5/pickle.py in _batch_setitems(self, items)
835 save(k)
--> 836 save(v)
837 write(SETITEMS)
/HOME/anaconda3/lib/python3.5/pickle.py in save(self, obj, save_persistent_id)
519 # Save the reduce() output and finally memoize the object
--> 520 self.save_reduce(obj=obj, *rv)
521
/HOME/rayjang/spark-2.2.0-bin-hadoop2.7/python/pyspark/cloudpickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
599 if state is not None:
--> 600 save(state)
601 write(pickle.BUILD)
/HOME/anaconda3/lib/python3.5/pickle.py in save(self, obj, save_persistent_id)
474 if f is not None:
--> 475 f(self, obj) # Call unbound method with explicit self
476 return
/HOME/anaconda3/lib/python3.5/pickle.py in save_dict(self, obj)
809 self.memoize(obj)
--> 810 self._batch_setitems(obj.items())
811
/HOME/anaconda3/lib/python3.5/pickle.py in _batch_setitems(self, items)
835 save(k)
--> 836 save(v)
837 write(SETITEMS)
/HOME/anaconda3/lib/python3.5/pickle.py in save(self, obj, save_persistent_id)
519 # Save the reduce() output and finally memoize the object
--> 520 self.save_reduce(obj=obj, *rv)
521
/HOME/rayjang/spark-2.2.0-bin-hadoop2.7/python/pyspark/cloudpickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
599 if state is not None:
--> 600 save(state)
601 write(pickle.BUILD)
/HOME/anaconda3/lib/python3.5/pickle.py in save(self, obj, save_persistent_id)
474 if f is not None:
--> 475 f(self, obj) # Call unbound method with explicit self
476 return
/HOME/anaconda3/lib/python3.5/pickle.py in save_dict(self, obj)
809 self.memoize(obj)
--> 810 self._batch_setitems(obj.items())
811
/HOME/anaconda3/lib/python3.5/pickle.py in _batch_setitems(self, items)
835 save(k)
--> 836 save(v)
837 write(SETITEMS)
/HOME/anaconda3/lib/python3.5/pickle.py in save(self, obj, save_persistent_id)
519 # Save the reduce() output and finally memoize the object
--> 520 self.save_reduce(obj=obj, *rv)
521
/HOME/rayjang/spark-2.2.0-bin-hadoop2.7/python/pyspark/cloudpickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
599 if state is not None:
--> 600 save(state)
601 write(pickle.BUILD)
/HOME/anaconda3/lib/python3.5/pickle.py in save(self, obj, save_persistent_id)
474 if f is not None:
--> 475 f(self, obj) # Call unbound method with explicit self
476 return
/HOME/anaconda3/lib/python3.5/pickle.py in save_dict(self, obj)
809 self.memoize(obj)
--> 810 self._batch_setitems(obj.items())
811
/HOME/anaconda3/lib/python3.5/pickle.py in _batch_setitems(self, items)
835 save(k)
--> 836 save(v)
837 write(SETITEMS)
/HOME/anaconda3/lib/python3.5/pickle.py in save(self, obj, save_persistent_id)
474 if f is not None:
--> 475 f(self, obj) # Call unbound method with explicit self
476 return
/HOME/anaconda3/lib/python3.5/pickle.py in save_list(self, obj)
769 self.memoize(obj)
--> 770 self._batch_appends(obj)
771
/HOME/anaconda3/lib/python3.5/pickle.py in _batch_appends(self, items)
793 for x in tmp:
--> 794 save(x)
795 write(APPENDS)
/HOME/anaconda3/lib/python3.5/pickle.py in save(self, obj, save_persistent_id)
474 if f is not None:
--> 475 f(self, obj) # Call unbound method with explicit self
476 return
/HOME/anaconda3/lib/python3.5/pickle.py in save_tuple(self, obj)
739 for element in obj:
--> 740 save(element)
741
/HOME/anaconda3/lib/python3.5/pickle.py in save(self, obj, save_persistent_id)
474 if f is not None:
--> 475 f(self, obj) # Call unbound method with explicit self
476 return
/HOME/anaconda3/lib/python3.5/pickle.py in save_dict(self, obj)
809 self.memoize(obj)
--> 810 self._batch_setitems(obj.items())
811
/HOME/anaconda3/lib/python3.5/pickle.py in _batch_setitems(self, items)
835 save(k)
--> 836 save(v)
837 write(SETITEMS)
/HOME/anaconda3/lib/python3.5/pickle.py in save(self, obj, save_persistent_id)
474 if f is not None:
--> 475 f(self, obj) # Call unbound method with explicit self
476 return
/HOME/anaconda3/lib/python3.5/pickle.py in save_dict(self, obj)
809 self.memoize(obj)
--> 810 self._batch_setitems(obj.items())
811
/HOME/anaconda3/lib/python3.5/pickle.py in _batch_setitems(self, items)
835 save(k)
--> 836 save(v)
837 write(SETITEMS)
/HOME/anaconda3/lib/python3.5/pickle.py in save(self, obj, save_persistent_id)
505 if isinstance(rv, str):
--> 506 self.save_global(obj, rv)
507 return
/HOME/rayjang/spark-2.2.0-bin-hadoop2.7/python/pyspark/cloudpickle.py in save_global(self, obj, name, pack)
371 def save_global(self, obj, name=None, pack=struct.pack):
--> 372 if obj.__module__ == "__builtin__" or obj.__module__ == "builtins":
373 if obj in _BUILTIN_TYPE_NAMES:
AttributeError: 'NotImplementedType' object has no attribute '__module__'
During handling of the above exception, another exception occurred:
PicklingError Traceback (most recent call last)
<ipython-input-15-c89868bf416d> in <module>()
2 batch_size=4, communication_window=5, num_epoch=1,
3 features_col="feature", label_col="label_encoded")
----> 4 trained_model = trainer.train(training_set)
5 print("Training time: " + str(trainer.get_training_time()))
6
/HOME/anaconda3/lib/python3.5/site-packages/distkeras/trainers.py in train(self, dataframe, shuffle)
636 self.record_training_start()
637 # Iterate through the epochs.
--> 638 self.history = dataset.rdd.mapPartitionsWithIndex(worker.train).collect()
639 # End the training procedure.
640 self.record_training_end()
/HOME/rayjang/spark-2.2.0-bin-hadoop2.7/python/pyspark/rdd.py in collect(self)
807 """
808 with SCCallSiteSync(self.context) as css:
--> 809 port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
810 return list(_load_from_socket(port, self._jrdd_deserializer))
811
/HOME/rayjang/spark-2.2.0-bin-hadoop2.7/python/pyspark/rdd.py in _jrdd(self)
2453
2454 wrapped_func = _wrap_function(self.ctx, self.func, self._prev_jrdd_deserializer,
-> 2455 self._jrdd_deserializer, profiler)
2456 python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(), wrapped_func,
2457 self.preservesPartitioning)
/HOME/rayjang/spark-2.2.0-bin-hadoop2.7/python/pyspark/rdd.py in _wrap_function(sc, func, deserializer, serializer, profiler)
2386 assert serializer, "serializer should not be empty"
2387 command = (func, profiler, deserializer, serializer)
-> 2388 pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
2389 return sc._jvm.PythonFunction(bytearray(pickled_command), env, includes, sc.pythonExec,
2390 sc.pythonVer, broadcast_vars, sc._javaAccumulator)
/HOME/rayjang/spark-2.2.0-bin-hadoop2.7/python/pyspark/rdd.py in _prepare_for_python_RDD(sc, command)
2372 # the serialized command will be compressed by broadcast
2373 ser = CloudPickleSerializer()
-> 2374 pickled_command = ser.dumps(command)
2375 if len(pickled_command) > (1 << 20): # 1M
2376 # The broadcast will have same life cycle as created PythonRDD
/HOME/rayjang/spark-2.2.0-bin-hadoop2.7/python/pyspark/serializers.py in dumps(self, obj)
458
459 def dumps(self, obj):
--> 460 return cloudpickle.dumps(obj, 2)
461
462
/HOME/rayjang/spark-2.2.0-bin-hadoop2.7/python/pyspark/cloudpickle.py in dumps(obj, protocol)
702
703 cp = CloudPickler(file,protocol)
--> 704 cp.dump(obj)
705
706 return file.getvalue()
/HOME/rayjang/spark-2.2.0-bin-hadoop2.7/python/pyspark/cloudpickle.py in dump(self, obj)
160 msg = "Could not serialize object: %s: %s" % (e.__class__.__name__, emsg)
161 print_exec(sys.stderr)
--> 162 raise pickle.PicklingError(msg)
163
164 def save_memoryview(self, obj):
PicklingError: Could not serialize object: AttributeError: 'NotImplementedType' object has no attribute '__module__'
Hi,
That's a possibility as well, but this simply won't scale. How do you read the data from disk? And in what format is it?
Furthermore, when you increase the number of workers, don't use DOWNPOUR, use ADAG (communication window -
15) with Adam optimizer (which is actually Accumulated Gradient Normalization, see https://arxiv.org/abs/1710.02368). It has better convergence rate, while using the bandwith more optimally.
Joeri
my code load image file and change the png file to ndarray. and I have labels done manually with ndarray format. their format is totally similar with "(X_train, Y_train), (X_valid, Y_valid) = cifar10.load_data()" of "from keras.datasets import cifar10"
img = image.load_img(img_path, target_size=(224, 224))
x = image.img_to_array(img)
x = np.expand_dims(x, axis=0)
x = preprocess_input(x)
images.append(x)
labels.append(np.array(i, ndmin=1, copy=False))
My problem is how to "(X_train, Y_train), (X_valid, Y_valid) = cifar10.load_data()" of "from keras.datasets import cifar10" can be changed directly with dnarray format to input format of dist-keras trainer.
I got Pickle error, so it might come from serialization. I am quite new to spark though, my input is not changed to rdd perfectly, right?
I don't know what this error is TypeError: can't pickle _cffi_backend.CTypeDescr objects
Traceback (most recent call last): File "/HOME/rayjang/cnn_finetune/./pdDF.py", line 199, in <module> trained_model = trainer.train(training_set) File "/usr/local/lib/python2.7/dist-packages/distkeras/trainers.py", line 638, in train self.history = dataset.rdd.mapPartitionsWithIndex(worker.train).collect() File "/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/rdd.py", line 776, in collect File "/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/rdd.py", line 2403, in _jrdd File "/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/rdd.py", line 2336, in _wrap_function File "/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/rdd.py", line 2315, in _prepare_for_python_RDD File "/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 428, in dumps File "/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 657, in dumps File "/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 107, in dump File "/usr/lib/python2.7/pickle.py", line 224, in dump self.save(obj) File "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "/usr/lib/python2.7/pickle.py", line 568, in save_tuple save(element) File "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 401, in save_instancemethod File "/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 535, in save_reduce File "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "/usr/lib/python2.7/pickle.py", line 554, in save_tuple save(element) File "/usr/lib/python2.7/pickle.py", line 331, in save self.save_reduce(obj=obj, *rv) File "/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 553, in save_reduce File "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "/usr/lib/python2.7/pickle.py", line 655, in save_dict self._batch_setitems(obj.iteritems()) File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems save(v) File "/usr/lib/python2.7/pickle.py", line 331, in save self.save_reduce(obj=obj, *rv) File "/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 553, in save_reduce File "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "/usr/lib/python2.7/pickle.py", line 655, in save_dict self._batch_setitems(obj.iteritems()) File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems save(v) File "/usr/lib/python2.7/pickle.py", line 331, in save self.save_reduce(obj=obj, *rv) File "/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 553, in save_reduce File "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "/usr/lib/python2.7/pickle.py", line 655, in save_dict self._batch_setitems(obj.iteritems()) File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems save(v) File "/usr/lib/python2.7/pickle.py", line 331, in save self.save_reduce(obj=obj, *rv) File "/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 553, in save_reduce File "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "/usr/lib/python2.7/pickle.py", line 655, in save_dict self._batch_setitems(obj.iteritems()) File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems save(v) File "/usr/lib/python2.7/pickle.py", line 331, in save self.save_reduce(obj=obj, *rv) File "/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 553, in save_reduce File "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "/usr/lib/python2.7/pickle.py", line 655, in save_dict self._batch_setitems(obj.iteritems()) File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems save(v) File "/usr/lib/python2.7/pickle.py", line 331, in save self.save_reduce(obj=obj, *rv) File "/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 553, in save_reduce File "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "/usr/lib/python2.7/pickle.py", line 655, in save_dict self._batch_setitems(obj.iteritems()) File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems save(v) File "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "/usr/lib/python2.7/pickle.py", line 655, in save_dict self._batch_setitems(obj.iteritems()) File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems save(v) File "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "/usr/lib/python2.7/pickle.py", line 606, in save_list self._batch_appends(iter(obj)) File "/usr/lib/python2.7/pickle.py", line 639, in _batch_appends save(x) File "/usr/lib/python2.7/pickle.py", line 331, in save self.save_reduce(obj=obj, *rv) File "/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 553, in save_reduce File "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "/usr/lib/python2.7/pickle.py", line 655, in save_dict self._batch_setitems(obj.iteritems()) File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems save(v) File "/usr/lib/python2.7/pickle.py", line 331, in save self.save_reduce(obj=obj, *rv) File "/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 553, in save_reduce File "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "/usr/lib/python2.7/pickle.py", line 655, in save_dict self._batch_setitems(obj.iteritems()) File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems save(v) File "/usr/lib/python2.7/pickle.py", line 331, in save self.save_reduce(obj=obj, *rv) File "/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 553, in save_reduce File "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "/usr/lib/python2.7/pickle.py", line 655, in save_dict self._batch_setitems(obj.iteritems()) File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems save(v) File "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "/usr/lib/python2.7/pickle.py", line 606, in save_list self._batch_appends(iter(obj)) File "/usr/lib/python2.7/pickle.py", line 639, in _batch_appends save(x) File "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "/usr/lib/python2.7/pickle.py", line 568, in save_tuple save(element) File "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "/usr/lib/python2.7/pickle.py", line 655, in save_dict self._batch_setitems(obj.iteritems()) File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems save(v) File "/usr/lib/python2.7/pickle.py", line 331, in save self.save_reduce(obj=obj, *rv) File "/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 553, in save_reduce File "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "/usr/lib/python2.7/pickle.py", line 655, in save_dict self._batch_setitems(obj.iteritems()) File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems save(v) File "/usr/lib/python2.7/pickle.py", line 331, in save self.save_reduce(obj=obj, *rv) File "/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 553, in save_reduce File "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "/usr/lib/python2.7/pickle.py", line 655, in save_dict self._batch_setitems(obj.iteritems()) File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems save(v) File "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "/usr/lib/python2.7/pickle.py", line 606, in save_list self._batch_appends(iter(obj)) File "/usr/lib/python2.7/pickle.py", line 639, in _batch_appends save(x) File "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "/usr/lib/python2.7/pickle.py", line 568, in save_tuple save(element) File "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "/usr/lib/python2.7/pickle.py", line 655, in save_dict self._batch_setitems(obj.iteritems()) File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems save(v) File "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "/usr/lib/python2.7/pickle.py", line 655, in save_dict self._batch_setitems(obj.iteritems()) File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems save(v) File "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "/usr/lib/python2.7/pickle.py", line 655, in save_dict self._batch_setitems(obj.iteritems()) File "/usr/lib/python2.7/pickle.py", line 692, in _batch_setitems save(v) File "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "/usr/lib/python2.7/pickle.py", line 568, in save_tuple save(element) File "/usr/lib/python2.7/pickle.py", line 306, in save rv = reduce(self.proto) TypeError: can't pickle _cffi_backend.CTypeDescr objects
My output of dataset_train.printSchema() and output of your MnistExample's printSchema() look same. I am still struggling to find out where the pickle(serialization) problem comes. TypeError says that I guess my input type triggers this problem....
Yes, but I load the images in a different manner.
What you could do (which is more memory efficient as well). Get a list of the paths of all your images. Parallelize that list (thus making an rdd of those paths). Next, do something like:
def load_images(iterator):
for path in iterator:
row = {}
image = read_image()
row['image'] = image
row['label'] = get_label(image)
yield row
new_df = df.mapPartitions(load_images)
This should prevent any memory errors, and serialization issues.
Joeri
In your first code snippet. Try setting this
dataset_train = df.mapPartitions(load_images)
to
dataset_train = df.mapPartitions(load_images).toDF()
Joeri
I got this error: TypeError: Can not infer schema for type: <class 'numpy.float32'>
from keras import backend as K
import cv2
from keras.utils import np_utils
classes = 100
path_list=[]
i=0
raw_path = '../data/ImageData/'
while i < classes:
path = raw_path + str(i)
for img_path in glob.glob(os.path.join(path, '*.png')):
path_list.append(img_path)
i+=1
df = sc.parallelize(path_list)
def load_images(iterator):
for path in iterator:
a,b,c,image_label,d = img_path.split("/")
row = {}
img = image.load_img(path, target_size=(224, 224))
x = image.img_to_array(img)
x = np.expand_dims(x, axis=0)
x = preprocess_input(x)
x = x.flatten()
x_list = list(x)
row['feature'] = x_list
row['label'] = int(image_label)
yield row
dataset_train = df.mapPartitions(load_images).toDF()
Update!! I added below code. so error was gone
_x_list = list(x)
x_list = [float(v) for v in _x_list]
I got another error.. I will do it and I will update soon
When I run "dense_transformer = DenseTransformer(input_col="features", output_col="features_dense")", I still get error
def load_images(iterator):
for path in iterator:
a,b,c,image_label,d = img_path.split("/")
row = {}
img = image.load_img(path, target_size=(224, 224))
x = image.img_to_array(img)
x = np.expand_dims(x, axis=0)
x = preprocess_input(x)
x = x.flatten()
_x_list = list(x)
x_list = [float(v) for v in _x_list]
#x_list = DenseVector(x_list)
row['features'] = x_list
row['label'] = int(image_label)
yield row
dataset_train = df.mapPartitions(load_images).toDF()
This showed this error And I trained the model, I got error like "PicklingError: Could not pickle object as excessively deep recursion required."
So, I compared between type of my spark dataframe and type of your mnist dataframe. I found that your type of 'features' is Vector. So I tried to change my feature list to densevector. But I got different error This error comes from
trainer = ADAG(keras_model=model, worker_optimizer=sgd, loss=loss_mlp, num_workers=num_workers,
batch_size=4, communication_window=15, num_epoch=1,
features_col="matrix", label_col="label_encoded")
This is error "PicklingError: Could not serialize object: TypeError: can't pickle _hashlib.HASH objects"
I am getting confused more about rdd..
What is the type of output of VectorAssembler?? I try to understand vectorAssembler source and find the difference between my input's type and VA's output type.
Could be due to the fact that your model is to big? How many trainable parameters does it have?
I am using a MobileNet. I will try to do fine-tuning with convolution layers to be freezed. According to the paper, there are 3.3 Million Parameters + 1000(it is one more FC layer for fine-tuning).
Should be doable, I trained models with 200 Mil + params, so that should be fine. Do you have the code for the Keras model?
This is my test keras model I'v tested. I will try to fine-tune the pre-trained model. when I used your distributed training, is not model.complie needed? right? I tested both though, I got error anyway
img_rows, img_cols = 224, 224 # Resolution of inputs
channel = 3
classes = 100
batch_size = 10
nb_epoch = 20
# Load our model
model = MobileNet(input_shape=(img_rows, img_cols, channel))
model.outputs = model.layers[-3].output
#x = Dropout(0.2)(model.outputs)
x = Conv2D(1000, (1, 1), padding='same', name='features')(model.outputs)
x = Conv2D(classes, (1, 1), padding='same', name='conv_preds2')(x)
x = Activation('softmax', name='act_softmax')(x)
x = Reshape((classes,), name='reshape_2')(x)
model = Model(model.inputs, x)
#Freeze layers
for layer in model.layers[:-7]:
layer.trainable = False
# Start Fine-tuning
sgd = SGD(lr=1e-3, decay=1e-6, momentum=0.9, nesterov=True)
I run the model with a different model(your convnet model in mnist.py). I got same pickle error. I doubts my input feature is the problem..
# Construct the model.
convnet = Sequential()
convnet.add(Convolution2D(nb_filters, kernel_size[0], kernel_size[1],
border_mode='valid',
input_shape=input_shape))
convnet.add(Activation('relu'))
convnet.add(Convolution2D(nb_filters, kernel_size[0], kernel_size[1]))
convnet.add(Activation('relu'))
convnet.add(MaxPooling2D(pool_size=pool_size))
convnet.add(Flatten())
convnet.add(Dense(225))
convnet.add(Activation('relu'))
convnet.add(Dense(nb_classes))
convnet.add(Activation('softmax'))
Could you give me the full stacktrace of the error? So I know where exactly it originates from?
This is the code for making spark df
from keras import backend as K
import cv2
from keras.utils import np_utils
from pyspark.mllib.linalg import Vectors
classes = 100
path_list=[]
i=0
raw_path = '../data/cifar100/'
while i < classes:
path = raw_path + str(i)
for img_path in glob.glob(os.path.join(path, '*.png')):
path_list.append(img_path)
i+=1
df = sc.parallelize(path_list)
def load_images(iterator):
for path in iterator:
a,b,c,image_label,d = img_path.split("/")
row = {}
img = image.load_img(path, target_size=(224, 224))
x = image.img_to_array(img)
x = np.expand_dims(x, axis=0)
x = preprocess_input(x)
x = x.flatten()
_x_list = list(x)
x_list = [float(v) for v in _x_list]
#x_list = DenseVector(x_list)
row['features'] = x_list
row['label'] = int(image_label)
yield row
dataset_train = df.mapPartitions(load_images)
This is the full stackrace of the error
root@user-node2:/HOME/rayjang/cnn_finetune# ../spark-2.0.0-bin-hadoop2.7/bin/spark-submit --master spark://143.248.80.197:7077 ./1029.py
Using TensorFlow backend.
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
17/10/29 23:57:17 INFO SparkContext: Running Spark version 2.0.0
17/10/29 23:57:17 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/10/29 23:57:17 INFO SecurityManager: Changing view acls to: root
17/10/29 23:57:17 INFO SecurityManager: Changing modify acls to: root
17/10/29 23:57:17 INFO SecurityManager: Changing view acls groups to:
17/10/29 23:57:17 INFO SecurityManager: Changing modify acls groups to:
17/10/29 23:57:17 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); groups with view permissions: Set(); users with modify permissions: Set(root); groups with modify permissions: Set()
17/10/29 23:57:17 INFO Utils: Successfully started service 'sparkDriver' on port 44338.
17/10/29 23:57:17 INFO SparkEnv: Registering MapOutputTracker
17/10/29 23:57:17 INFO SparkEnv: Registering BlockManagerMaster
17/10/29 23:57:17 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-8a2b1aef-1606-49e6-9c39-70b70127e6bd
17/10/29 23:57:17 INFO MemoryStore: MemoryStore started with capacity 3.0 GB
17/10/29 23:57:18 INFO SparkEnv: Registering OutputCommitCoordinator
17/10/29 23:57:18 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
17/10/29 23:57:18 INFO Utils: Successfully started service 'SparkUI' on port 4041.
17/10/29 23:57:18 INFO SparkUI: Bound SparkUI to 143.248.80.197, and started at http://143.248.80.197:4041
17/10/29 23:57:18 INFO Utils: Copying /HOME/rayjang/cnn_finetune/./1029.py to /tmp/spark-0f3568e8-18bb-4631-8b76-fc6439c697cd/userFiles-2be57920-b807-40f5-838a-dd4ede4c3d85/1029.py
17/10/29 23:57:18 INFO SparkContext: Added file file:/HOME/rayjang/cnn_finetune/./1029.py at file:/HOME/rayjang/cnn_finetune/./1029.py with timestamp 1509289038226
17/10/29 23:57:18 INFO Executor: Starting executor ID driver on host localhost
17/10/29 23:57:18 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 37483.
17/10/29 23:57:18 INFO NettyBlockTransferService: Server created on 143.248.80.197:37483
17/10/29 23:57:18 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 143.248.80.197, 37483)
17/10/29 23:57:18 INFO BlockManagerMasterEndpoint: Registering block manager 143.248.80.197:37483 with 3.0 GB RAM, BlockManagerId(driver, 143.248.80.197, 37483)
17/10/29 23:57:18 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 143.248.80.197, 37483)
17/10/29 23:57:18 INFO SparkContext: Starting job: runJob at PythonRDD.scala:441
17/10/29 23:57:18 INFO DAGScheduler: Got job 0 (runJob at PythonRDD.scala:441) with 1 output partitions
17/10/29 23:57:18 INFO DAGScheduler: Final stage: ResultStage 0 (runJob at PythonRDD.scala:441)
17/10/29 23:57:18 INFO DAGScheduler: Parents of final stage: List()
17/10/29 23:57:18 INFO DAGScheduler: Missing parents: List()
17/10/29 23:57:18 INFO DAGScheduler: Submitting ResultStage 0 (PythonRDD[1] at RDD at PythonRDD.scala:48), which has no missing parents
17/10/29 23:57:18 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 5.3 KB, free 3.0 GB)
17/10/29 23:57:18 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 3.6 KB, free 3.0 GB)
17/10/29 23:57:18 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 143.248.80.197:37483 (size: 3.6 KB, free: 3.0 GB)
17/10/29 23:57:18 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1012
17/10/29 23:57:18 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (PythonRDD[1] at RDD at PythonRDD.scala:48)
17/10/29 23:57:18 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
17/10/29 23:57:18 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, partition 0, PROCESS_LOCAL, 8752 bytes)
17/10/29 23:57:18 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
17/10/29 23:57:18 INFO Executor: Fetching file:/HOME/rayjang/cnn_finetune/./1029.py with timestamp 1509289038226
17/10/29 23:57:18 INFO Utils: /HOME/rayjang/cnn_finetune/./1029.py has been previously copied to /tmp/spark-0f3568e8-18bb-4631-8b76-fc6439c697cd/userFiles-2be57920-b807-40f5-838a-dd4ede4c3d85/1029.py
Using TensorFlow backend.
17/10/29 23:57:19 INFO PythonRunner: Times: total = 813, boot = 138, init = 652, finish = 23
17/10/29 23:57:19 INFO MemoryStore: Block taskresult_0 stored as bytes in memory (estimated size 1331.1 KB, free 3.0 GB)
17/10/29 23:57:19 INFO BlockManagerInfo: Added taskresult_0 in memory on 143.248.80.197:37483 (size: 1331.1 KB, free: 3.0 GB)
17/10/29 23:57:19 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1363049 bytes result sent via BlockManager)
17/10/29 23:57:19 INFO TransportClientFactory: Successfully created connection to /143.248.80.197:37483 after 14 ms (0 ms spent in bootstraps)
17/10/29 23:57:19 INFO BlockManagerInfo: Removed taskresult_0 on 143.248.80.197:37483 in memory (size: 1331.1 KB, free: 3.0 GB)
17/10/29 23:57:19 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 1004 ms on localhost (1/1)
17/10/29 23:57:19 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
17/10/29 23:57:19 INFO DAGScheduler: ResultStage 0 (runJob at PythonRDD.scala:441) finished in 1.017 s
17/10/29 23:57:19 INFO DAGScheduler: Job 0 finished: runJob at PythonRDD.scala:441, took 1.256326 s
/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/session.py:336: UserWarning: Using RDD of dict to inferSchema is deprecated. Use pyspark.sql.Row instead
17/10/29 23:57:20 INFO SharedState: Warehouse path is 'file:/HOME/rayjang/cnn_finetune/spark-warehouse'.
17/10/29 23:57:20 INFO SparkContext: Starting job: runJob at PythonRDD.scala:441
17/10/29 23:57:20 INFO DAGScheduler: Got job 1 (runJob at PythonRDD.scala:441) with 1 output partitions
17/10/29 23:57:20 INFO DAGScheduler: Final stage: ResultStage 1 (runJob at PythonRDD.scala:441)
17/10/29 23:57:20 INFO DAGScheduler: Parents of final stage: List()
17/10/29 23:57:20 INFO DAGScheduler: Missing parents: List()
17/10/29 23:57:20 INFO DAGScheduler: Submitting ResultStage 1 (PythonRDD[9] at RDD at PythonRDD.scala:48), which has no missing parents
17/10/29 23:57:20 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 14.9 KB, free 3.0 GB)
17/10/29 23:57:20 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 8.3 KB, free 3.0 GB)
17/10/29 23:57:20 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 143.248.80.197:37483 (size: 8.3 KB, free: 3.0 GB)
17/10/29 23:57:20 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1012
17/10/29 23:57:20 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (PythonRDD[9] at RDD at PythonRDD.scala:48)
17/10/29 23:57:20 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks
17/10/29 23:57:20 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, partition 0, PROCESS_LOCAL, 8752 bytes)
17/10/29 23:57:20 INFO Executor: Running task 0.0 in stage 1.0 (TID 1)
Using TensorFlow backend.
17/10/29 23:57:21 INFO CodeGenerator: Code generated in 110.023497 ms
17/10/29 23:57:21 INFO BlockManagerInfo: Removed broadcast_0_piece0 on 143.248.80.197:37483 in memory (size: 3.6 KB, free: 3.0 GB)
Using TensorFlow backend.
17/10/29 23:57:22 INFO PythonRunner: Times: total = 921, boot = 1, init = 910, finish = 10
17/10/29 23:57:22 INFO MemoryStore: Block taskresult_1 stored as bytes in memory (estimated size 1332.4 KB, free 3.0 GB)
17/10/29 23:57:22 INFO BlockManagerInfo: Added taskresult_1 in memory on 143.248.80.197:37483 (size: 1332.4 KB, free: 3.0 GB)
17/10/29 23:57:22 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 1364372 bytes result sent via BlockManager)
17/10/29 23:57:22 INFO BlockManagerInfo: Removed taskresult_1 on 143.248.80.197:37483 in memory (size: 1332.4 KB, free: 3.0 GB)
17/10/29 23:57:22 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 1785 ms on localhost (1/1)
17/10/29 23:57:22 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
17/10/29 23:57:22 INFO DAGScheduler: ResultStage 1 (runJob at PythonRDD.scala:441) finished in 1.788 s
17/10/29 23:57:22 INFO DAGScheduler: Job 1 finished: runJob at PythonRDD.scala:441, took 1.811145 s
17/10/29 23:57:22 INFO SparkContext: Starting job: runJob at PythonRDD.scala:441
17/10/29 23:57:22 INFO DAGScheduler: Got job 2 (runJob at PythonRDD.scala:441) with 1 output partitions
17/10/29 23:57:22 INFO DAGScheduler: Final stage: ResultStage 2 (runJob at PythonRDD.scala:441)
17/10/29 23:57:22 INFO DAGScheduler: Parents of final stage: List()
17/10/29 23:57:22 INFO DAGScheduler: Missing parents: List()
17/10/29 23:57:22 INFO DAGScheduler: Submitting ResultStage 2 (PythonRDD[17] at RDD at PythonRDD.scala:48), which has no missing parents
17/10/29 23:57:22 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 21.3 KB, free 3.0 GB)
17/10/29 23:57:22 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 9.7 KB, free 3.0 GB)
17/10/29 23:57:22 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 143.248.80.197:37483 (size: 9.7 KB, free: 3.0 GB)
17/10/29 23:57:22 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1012
17/10/29 23:57:22 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 2 (PythonRDD[17] at RDD at PythonRDD.scala:48)
17/10/29 23:57:22 INFO TaskSchedulerImpl: Adding task set 2.0 with 1 tasks
17/10/29 23:57:22 INFO TaskSetManager: Starting task 0.0 in stage 2.0 (TID 2, localhost, partition 0, PROCESS_LOCAL, 8752 bytes)
17/10/29 23:57:22 INFO Executor: Running task 0.0 in stage 2.0 (TID 2)
Using TensorFlow backend.
Using TensorFlow backend.
17/10/29 23:57:24 INFO CodeGenerator: Code generated in 14.185637 ms
Using TensorFlow backend.
17/10/29 23:57:25 INFO PythonRunner: Times: total = 950, boot = 1, init = 865, finish = 84
17/10/29 23:57:25 INFO MemoryStore: Block taskresult_2 stored as bytes in memory (estimated size 3.0 MB, free 3.0 GB)
17/10/29 23:57:25 INFO BlockManagerInfo: Added taskresult_2 in memory on 143.248.80.197:37483 (size: 3.0 MB, free: 3.0 GB)
17/10/29 23:57:25 INFO Executor: Finished task 0.0 in stage 2.0 (TID 2). 3130261 bytes result sent via BlockManager)
17/10/29 23:57:25 INFO BlockManagerInfo: Removed taskresult_2 on 143.248.80.197:37483 in memory (size: 3.0 MB, free: 3.0 GB)
17/10/29 23:57:25 INFO TaskSetManager: Finished task 0.0 in stage 2.0 (TID 2) in 2482 ms on localhost (1/1)
17/10/29 23:57:25 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool
17/10/29 23:57:25 INFO DAGScheduler: ResultStage 2 (runJob at PythonRDD.scala:441) finished in 2.485 s
17/10/29 23:57:25 INFO DAGScheduler: Job 2 finished: runJob at PythonRDD.scala:441, took 2.492799 s
/HOME/rayjang/cnn_finetune/./1029.py:172: UserWarning: Update your `Conv2D` call to the Keras 2 API: `Conv2D(3, (3, 3), padding="valid", input_shape=(224, 224,...)`
input_shape=input_shape))
/HOME/rayjang/cnn_finetune/./1029.py:174: UserWarning: Update your `Conv2D` call to the Keras 2 API: `Conv2D(3, (3, 3))`
convnet.add(Convolution2D(nb_filters, kernel_size[0], kernel_size[1]))
root
|-- features: array (nullable = true)
| |-- element: double (containsNull = true)
|-- matrix: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: array (containsNull = true)
| | | |-- element: double (containsNull = true)
|-- label: long (nullable = true)
|-- label_encoded: array (nullable = true)
| |-- element: double (containsNull = true)
17/10/29 23:57:25 INFO CodeGenerator: Code generated in 22.765937 ms
2017-10-29 23:57:25.519880: W tensorflow/core/platform/cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use SSE4.1 instructions, but these are available on your machine and could speed up CPU computations.
2017-10-29 23:57:25.519900: W tensorflow/core/platform/cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use SSE4.2 instructions, but these are available on your machine and could speed up CPU computations.
2017-10-29 23:57:25.519904: W tensorflow/core/platform/cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use AVX instructions, but these are available on your machine and could speed up CPU computations.
2017-10-29 23:57:25.519907: W tensorflow/core/platform/cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use AVX2 instructions, but these are available on your machine and could speed up CPU computations.
2017-10-29 23:57:25.519910: W tensorflow/core/platform/cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use FMA instructions, but these are available on your machine and could speed up CPU computations.
2017-10-29 23:57:25.619291: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:893] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2017-10-29 23:57:25.619505: I tensorflow/core/common_runtime/gpu/gpu_device.cc:955] Found device 0 with properties:
name: TITAN X (Pascal)
major: 6 minor: 1 memoryClockRate (GHz) 1.531
pciBusID 0000:01:00.0
Total memory: 11.90GiB
Free memory: 11.69GiB
2017-10-29 23:57:25.619517: I tensorflow/core/common_runtime/gpu/gpu_device.cc:976] DMA: 0
2017-10-29 23:57:25.619521: I tensorflow/core/common_runtime/gpu/gpu_device.cc:986] 0: Y
2017-10-29 23:57:25.619526: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1045] Creating TensorFlow device (/gpu:0) -> (device: 0, name: TITAN X (Pascal), pci bus id: 0000:01:00.0)
Traceback (most recent call last):
File "/HOME/rayjang/cnn_finetune/./1029.py", line 210, in <module>
trained_model = trainer.train(training_set)
File "/usr/local/lib/python2.7/dist-packages/distkeras/trainers.py", line 638, in train
self.history = dataset.rdd.mapPartitionsWithIndex(worker.train).collect()
File "/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/rdd.py", line 776, in collect
File "/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/rdd.py", line 2403, in _jrdd
File "/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/rdd.py", line 2336, in _wrap_function
File "/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/rdd.py", line 2315, in _prepare_for_python_RDD
File "/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 428, in dumps
File "/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 657, in dumps
File "/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 107, in dump
File "/usr/lib/python2.7/pickle.py", line 224, in dump
self.save(obj)
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python2.7/pickle.py", line 568, in save_tuple
save(element)
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 401, in save_instancemethod
File "/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 535, in save_reduce
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python2.7/pickle.py", line 554, in save_tuple
save(element)
File "/usr/lib/python2.7/pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 553, in save_reduce
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python2.7/pickle.py", line 655, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems
save(v)
File "/usr/lib/python2.7/pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 553, in save_reduce
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python2.7/pickle.py", line 655, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems
save(v)
File "/usr/lib/python2.7/pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 553, in save_reduce
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python2.7/pickle.py", line 655, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems
save(v)
File "/usr/lib/python2.7/pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 553, in save_reduce
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python2.7/pickle.py", line 655, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems
save(v)
File "/usr/lib/python2.7/pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 553, in save_reduce
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python2.7/pickle.py", line 655, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems
save(v)
File "/usr/lib/python2.7/pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 553, in save_reduce
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python2.7/pickle.py", line 655, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems
save(v)
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python2.7/pickle.py", line 655, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems
save(v)
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python2.7/pickle.py", line 606, in save_list
self._batch_appends(iter(obj))
File "/usr/lib/python2.7/pickle.py", line 639, in _batch_appends
save(x)
File "/usr/lib/python2.7/pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 553, in save_reduce
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python2.7/pickle.py", line 655, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems
save(v)
File "/usr/lib/python2.7/pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 553, in save_reduce
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python2.7/pickle.py", line 655, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems
save(v)
File "/usr/lib/python2.7/pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 553, in save_reduce
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python2.7/pickle.py", line 655, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems
save(v)
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python2.7/pickle.py", line 606, in save_list
self._batch_appends(iter(obj))
File "/usr/lib/python2.7/pickle.py", line 639, in _batch_appends
save(x)
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python2.7/pickle.py", line 568, in save_tuple
save(element)
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python2.7/pickle.py", line 655, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems
save(v)
File "/usr/lib/python2.7/pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 553, in save_reduce
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python2.7/pickle.py", line 655, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems
save(v)
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python2.7/pickle.py", line 606, in save_list
self._batch_appends(iter(obj))
File "/usr/lib/python2.7/pickle.py", line 642, in _batch_appends
save(tmp[0])
File "/usr/lib/python2.7/pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 553, in save_reduce
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python2.7/pickle.py", line 655, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems
save(v)
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python2.7/pickle.py", line 606, in save_list
self._batch_appends(iter(obj))
File "/usr/lib/python2.7/pickle.py", line 642, in _batch_appends
save(tmp[0])
File "/usr/lib/python2.7/pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 553, in save_reduce
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python2.7/pickle.py", line 655, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems
save(v)
File "/usr/lib/python2.7/pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "/HOME/rayjang/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 553, in save_reduce
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python2.7/pickle.py", line 655, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems
save(v)
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python2.7/pickle.py", line 606, in save_list
self._batch_appends(iter(obj))
File "/usr/lib/python2.7/pickle.py", line 639, in _batch_appends
save(x)
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python2.7/pickle.py", line 568, in save_tuple
save(element)
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python2.7/pickle.py", line 655, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems
save(v)
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python2.7/pickle.py", line 655, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems
save(v)
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python2.7/pickle.py", line 655, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib/python2.7/pickle.py", line 692, in _batch_setitems
save(v)
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python2.7/pickle.py", line 568, in save_tuple
save(element)
File "/usr/lib/python2.7/pickle.py", line 306, in save
rv = reduce(self.proto)
TypeError: can't pickle _cffi_backend.CTypeDescr objects
I deleted what i posted one minute ago because i did simple mistakes to test... I am still lost..
This one is from your mnist example. I made 'features' columns by using VectorAssembler to check type of cell of features. It is DenseVector. Shoud I change the numpy to DenseVector??
Update! ( I added "x_list = DenseVector(x_list)" when I made 'dataset_train' df. but still got pickle error but different.
`--------------------------------------------------------------------------- AttributeError Traceback (most recent call last)
when I checked the type of 'features' as VectorAssembler in mnist.py, its type is SparseVector. so I changed my code like this.
BUT I got different error..
Exception: It appears that you are attempting to broadcast an RDD or reference an RDD from an action or transformation. RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(lambda x: rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
Why encode it as a SparseVector? Can't you just convert it to DenseVector? If I recall correctly I wrote some utility methods which do just this: to_dense_vector
.
Joeri
My model is CNN model based on mobileNet. On your code, 'matrix' column is needed to train convnet by ADAG. As I understand, 'matrix' column is made by the output of 'features' column of VectorAssembler. The type of 'features' is SparseVector.
I have one more question. What I need is 'matrix' and 'Encoded label' columns of pyspark dataframe. Is it possible to make 'matrix' colum by using 'to_dense_vector?? I saw the code of transfomer.py, If I undertand correctly, 'matrix' column is denseVector. right??? sorry for confusing u.
where is 'to_dense_vector' method? I looked through every file you wrote but I can't find 'to_dense_vector' method.
AND I tried again to encode it as DenseVector and run it. I got this same error
Exception: It appears that you are attempting to broadcast an RDD or reference an RDD from an action or transformation. RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(lambda x: rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
I decided to follow your way in your mnist example and change my data smaller. I changed my csv. Originally, I did normalize before saving a file as csv. So all output cell is float.
I changed plance to normalize after the loading it as spark dataframe as you did. ( also.. I decrease image' dimensions from 224224 to 128128)
I guess that VectorAssembler is working now because of all data type is int (less memory is needed)
BUT my code is stuck when I ran the one-hot encoding(label -> encoded_label)
When I ran it in jupyter, jupyter stopped and shut down.
if i ran it as a *.py in command, I got wierd error...
Job aborted due to stage failure: Task 0 in stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 (TID 7, localhost): java.util.concurrent.ExecutionException: java.lang.Exception: failed to compile: java.lang.NegativeArraySizeException
java.lang.NegativeArraySizeException
Euhm, how?
That error came from
encoder = OneHotTransformer(nb_classes, input_col="label", output_col="label_encoded") dataset_train = encoder.transform(raw_dataset_train)
I followed your example with my own data. My temporal conclusion is that your code is not working with big image or data of many columns . I ran your code with toy example with a few columns. It worked BUT With my own data with 128'128'3 columns, it is not working. I guess that this problem comes from the slowness of dataframe unlike array or list. I am trying to find how to solve this issue.
How can this OneHotEncoder not work under your setting? It basically takes an integer input, computes the max value, and makes a vector out of those values.
type is also int. I can't understand why it doesn't work. When I check the source code of transformer, It is not that heavy (One-hot encoding) Can I send u my data? Can u test it(just load it and run it)? I think that it doesn't take long.
Can you provide me a shareable link? Or is the data not public? I have time to check it in detail on Tuesday (I'm working on a project with a deadline atm).
Joeri
This is my data. If you download it, plz let me know. After u download it, I will delete the link.
Thanks. I need to train my own model in distirbuted way . BUT I am still stuck in this problem... I will try to find the way to solve the problem anyway.
I got the data. Could you send me your code to joeri.hermans@doct.ulg.ac.be? So I can inspect the code myself.
Joeri
https://github.com/cerndb/dist-keras/blob/master/examples/mnist.ipynb
I use my data as the train data in your mnist.ipynb
I tried to modify my data to use it as input of trainer without VectorAssembler and your custom tranformer. I can't find the way. I decided to follow your way of mnist example as I make my data smaller and make type of my data integer.
Ok, I'll keep you posted. But you are also using the same model?
I will use different model after I test my own data. But, now I use your simple convnet model to test my own data. I got error before training the model. I got error while I modified input data. I will test my model too. I will send it for u later after I organize messy my model code
Only different thing with ur mnist example code is the number of classes. Mnist classes are 10. My classes are 100
I will send my code soon. I realize that I added some configuration since my data has too many dimension.
I e-mail u.
After VectorAssembler job was done, I try to save the dataframe to csv.
It also caused error like the below
java.util.concurrent.ExecutionException: java.lang.Exception: failed to compile: java.io.EOFException
When I did with toy example( jsut 20 columns), It worked well.
I still guess that the problem comes from that dataframe cannot hold too many columns to calculate data.
I have my own data numpy arrays with the same format of the return value of mnist.load_data(). (X_train, y_train), (X_test, y_test) = myOwnData.load_data() The shape of x_train is (number of samples,244,244,3) The shape of y_train is (number of samples, number of classes)
My data has also same format. the shape of feature(x) -> (number of samples,244,244,3) the shape of label(x) -> (number of samples, number of classes) (I read image file and convert them to ndarray for features, and I also made label ndarray manually)
I tested my code in single machine by using 'model.fit(model, x_train, y_train...)' in original Keras. I am trying to test in dist-keras I am struggling to use my code with dist-keras because of the input type.
One solution I thought is "combine 2dnarrays and make csv" -> read csv file like mnist example u made
The other way I thought is "use numpy parsing example you made" BUT I don't know how to combine two spark dataframes after converting 2 dnarrays to spark dataframes (I also chekced one of closed issues with similar topic, but I cannot apply it in my case since I am not used to using numpy and pyspark.
Also, For efficient training I am wondering which else way is more efficient.