cerndb / dist-keras

Distributed Deep Learning, with a focus on distributed training, using Keras and Apache Spark.
http://joerihermans.com/work/distributed-keras/
GNU General Public License v3.0
624 stars 169 forks source link

something wrong with my embedding's test #7

Closed sodaling closed 7 years ago

sodaling commented 7 years ago

Here is my python runned on pyspark.

from keras.layers.embeddings import Embedding
from keras.layers.recurrent import LSTM, GRU
from pyspark.ml.feature import VectorAssembler
from keras.models import Sequential
from keras.layers.core import *
from distkeras.trainers import *
from pyspark import SQLContext

sqlContext = SQLContext(sc)
reader = sqlContext
raw_dataset = reader.read.format('com.databricks.spark.csv').options(header='false', inferSchema='true').load(
    '/home/hpcc/test/11.csv')
# raw_dataset = raw_dataset.repartition(4)
features = raw_dataset.columns
features.remove('C0')
vector_assembler = VectorAssembler(inputCols=features, outputCol="features")
dataset = vector_assembler.transform(raw_dataset)
dataset = dataset.select("features", "C0")
model1 = Sequential()
model1.add(Embedding(input_dim=52965, output_dim=256))
model1.add(LSTM(128))
model1.add(Dropout(0.5))
model1.add(Dense(1))
model1.add(Activation('sigmoid'))
model1.summary()
optimizer_mlp= 'adagrad'
loss_mlp = 'binary_crossentropy'
dataset.cache()
trainer = DOWNPOUR(keras_model=model1, worker_optimizer=optimizer_mlp, loss=loss_mlp, num_workers=4,
                   batch_size=16, communication_window=5, learning_rate=0.1, num_epoch=1,
                   features_col="features", label_col="C0")
trainer.set_parallelism_factor(1)
trained_model = trainer.train(dataset)

But it print error like this:

16/12/30 20:53:09 ERROR Executor: Exception in task 1.0 in stage 4.0 (TID 7)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
    process()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "build/bdist.linux-x86_64/egg/distkeras/workers.py", line 193, in train
    X = np.asarray([x[self.features_column] for x in feature_iterator])
  File "/root/anaconda2/lib/python2.7/site-packages/numpy/core/numeric.py", line 482, in asarray
    return array(a, dtype, copy=False, order=order)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/mllib/linalg/__init__.py", line 769, in __getitem__
    raise ValueError("Index %d out of bounds." % index)
ValueError: Index 50 out of bounds.

My 11.csv is like this :

1,23,5211,871,1223,11,5,355,9999,1,28032,33057,1259,5575,4,1,9604,23790,83,136,18,4695,3,121,1,151,521,2,130,24677,4,1,612,947,612,1645,4534,59,29008,1,21679,1,416,3474,3,4189,1,142,5,11323,3
........

It's C0 column is label,and C1 to C50 are features. Hope for your answer.Thanks a lots.

JoeriHermans commented 7 years ago

I guess the issue is that the Embedding dimension is equal to 52965 while the number of features is equal to 50. Could you provide the output of raw_dataset.printSchema()? This should confirm that the issue is not related to dist-keras.

sodaling commented 7 years ago

dataset.printSchema()

root
|-- features: vector (nullable = true)
|-- C0: integer (nullable = true)

dataset.show()


+--------------------+---+
|            features| C0|
+--------------------+---+
|[23.0,5211.0,871....|  1|
|[138.0,10992.0,4....|  1|
|[906.0,182.0,13.0...|  1|
|[1307.0,29.0,11.0...|  1|
|[836.0,1.0,46.0,2...|  1|
|[216.0,22425.0,43...|  1|
|[1208.0,1.0,146.0...|  1|
|[2.0,794.0,7053.0...|  1|
|[930.0,307.0,2681...|  1|
|[0.0,0.0,0.0,0.0,...|  1|
|[452.0,59.0,6237....|  1|
|[40.0,2.0,2705.0,...|  1|
|[5.0,21.0,228.0,1...|  1|
|[2982.0,2.0,2401....|  1|
|[2.0,819.0,364.0,...|  1|
|[405.0,28295.0,78...|  1|
|[714.0,278.0,2653...|  1|
|[21.0,129.0,5090....|  1|
|[908.0,4.0,120.0,...|  1|
|[321.0,944.0,1733...|  1|
+--------------------+---+
only showing top 20 rows

I know this issue is not related to dist-keras.But my dissertation is build on your project........
And i have no idea with this........So i have to ask you ........
JoeriHermans commented 7 years ago

Can you show the output of this:

num_features = len(dataset.select("features").take(1)[0]["features"])
print(num_features)

This should confirm that the input dimension of your embedding is wrong.

sodaling commented 7 years ago

num_features ··· 50 ···

According to the keras's documentation,embedding layer's input_dim: int > 0. Size of the vocabulary, ie. 1 + maximum integer index occurring in the input data. So i set 52965 this parameter.But i can't figure out the meaning of 'ValueError: Index 50 out of bounds.'. I ran this keras's model sucessfully not in spark model.Maybe i changed the data to spark's dataframe in a wrong way? Is it possible that embedding layer in distributed environment has something different? P.S:Happy new year!

JoeriHermans commented 7 years ago

The problem is here, that your input to your model is 50, while it expects 52965 inputs. So that is why it is generating an out of bounds error. As far as I can deduce from your dataframe, you are not specifying this. What do these features represent? Since these are not one-hot-encoded vectors.

Happy new year!

sodaling commented 7 years ago

Did you run any successful embedding example,such as keras's lstm example? My 11.csv files is a Customer Reviews file whose words replace by index.And i ran it successfully in my computer.Just a simple lstm example.

JoeriHermans commented 7 years ago

No, not really. Is this data supplied with the Keras example? Then I'll try running it myself.

sodaling commented 7 years ago

Can you run this keras's example one time?My code is based on this example except dataset is changed. https://github.com/ogrisel/keras/blob/master/examples/imdb_lstm.py thanks for your patience.

sodaling commented 7 years ago

As you said ,that my input to my model is 50, while it expects 52965 inputs. So that is why it is generating an out of bounds error. But the problem is that keras's embedding input dim accept: int > 0. Size of the vocabulary, ie. 1 + maximum integer index occurring in the input data.Not my input shape is equal to input dim.Input dim is just maximum integer index occurring in the input data. My input shape is about 50 which equal to my length of each sentence.

sodaling commented 7 years ago

Sorry to bother you,i have change my code to this:

dataset = dataset.select("features", "C0")
model1 = Sequential()
model1.add(Dense(1000, input_shape=(50,)))
model1.add(LSTM(128))
model1.add(Dropout(0.5))
model1.add(Dense(1))
model1.add(Activation('sigmoid'))
model1.summary()
optimizer_mlp= 'adagrad'
loss_mlp = 'binary_crossentropy'
dataset.cache()

I have changed my keras's model's first layer from embedding to simple Dense layer.It is like that the same error happened again:

ValueError: Index 40 out of bounds.

I think my dataset has something wrong....But still have no idea....

JoeriHermans commented 7 years ago

Could you provide me a link to the dataset so I can test it?

sodaling commented 7 years ago

Oh my gods! i have figured it out.By add one line

dense_transformer = DenseTransformer(input_col="features", output_col="features_dense")
dataset = dense_transformer.transform(dataset)

But i don't know why it worked.I read your DenseTransformer's code and still confused. Do I need to add this line of code for each dataset?

JoeriHermans commented 7 years ago

Ohhh yeah. That makes sense! Good job.

Basically, what happened is that Spark sometimes reads the data as a SparseVector. However, Keras requires that the data is in a DenseVector. So in order to comply with those constraints, you first need to transform the SparseVector into a DenseVector. Hence the name "DenseTransformer". I will add this to the README, thanks for notifying me.

If this is sufficient information, you can close the issue.

sodaling commented 7 years ago

Thank you very much! I would like to thank you at the end of my dissertation!Hope for your next great jobs!