Closed AliceSky closed 3 years ago
Many thanks for your comments, I'm glad you like it.
You can find the answer to you questions and more information in the blog post I published about this app, available at this link: https://riccardo-cantini.netlify.app/post/speech_emotion_detection/
Have a nice day :)
Hi~I have reviewed about your wonderful blog post. NowI want to extract features respectively from training data and test date. So I revise the method of reading data in preprocess.py . Unluckily, I always get the result : " ValueError: Shapes (None, 6) and (None, 7) are incompatible. Can you help me?
The code and the traceback are here:
Total params: 759,047
Trainable params: 759,047
Non-trainable params: 0
Epoch 1/30
Traceback (most recent call last):
File "model0518.py", line 219, in
/home/alice/anaconda3/envs/ser3/lib/python3.6/site-packages/tensorflow/python/keras/engine/training.py:1233 test_function * return step_function(self, iterator) /home/alice/anaconda3/envs/ser3/lib/python3.6/site-packages/tensorflow/python/keras/engine/training.py:1224 step_function outputs = model.distribute_strategy.run(run_step, args=(data,)) /home/alice/anaconda3/envs/ser3/lib/python3.6/site-packages/tensorflow/python/distribute/distribute_lib.py:1259 run return self._extended.call_for_each_replica(fn, args=args, kwargs=kwargs) /home/alice/anaconda3/envs/ser3/lib/python3.6/site-packages/tensorflow/python/distribute/distribute_lib.py:2730 call_for_each_replica return self._call_for_each_replica(fn, args, kwargs) /home/alice/anaconda3/envs/ser3/lib/python3.6/site-packages/tensorflow/python/distribute/distribute_lib.py:3417 _call_for_each_replica return fn(args, kwargs) /home/alice/anaconda3/envs/ser3/lib/python3.6/site-packages/tensorflow/python/keras/engine/training.py:1217 run_step outputs = model.test_step(data) /home/alice/anaconda3/envs/ser3/lib/python3.6/site-packages/tensorflow/python/keras/engine/training.py:1186 test_step y, y_pred, sample_weight, regularization_losses=self.losses) /home/alice/anaconda3/envs/ser3/lib/python3.6/site-packages/tensorflow/python/keras/engine/compile_utils.py:203 call loss_value = loss_obj(y_t, y_p, sample_weight=sw) /home/alice/anaconda3/envs/ser3/lib/python3.6/site-packages/tensorflow/python/keras/losses.py:152 call losses = call_fn(y_true, y_pred) /home/alice/anaconda3/envs/ser3/lib/python3.6/site-packages/tensorflow/python/keras/losses.py:256 call return ag_fn(y_true, y_pred, self._fn_kwargs) /home/alice/anaconda3/envs/ser3/lib/python3.6/site-packages/tensorflow/python/util/dispatch.py:201 wrapper return target(args, kwargs) /home/alice/anaconda3/envs/ser3/lib/python3.6/site-packages/tensorflow/python/keras/losses.py:1537 categorical_crossentropy return K.categorical_crossentropy(y_true, y_pred, from_logits=from_logits) /home/alice/anaconda3/envs/ser3/lib/python3.6/site-packages/tensorflow/python/util/dispatch.py:201 wrapper return target(*args, **kwargs) /home/alice/anaconda3/envs/ser3/lib/python3.6/site-packages/tensorflow/python/keras/backend.py:4833 categorical_crossentropy target.shape.assert_is_compatible_with(output.shape) /home/alice/anaconda3/envs/ser3/lib/python3.6/site-packages/tensorflow/python/framework/tensor_shape.py:1134 assert_is_compatible_with raise ValueError("Shapes %s and %s are incompatible" % (self, other))
ValueError: Shapes (None, 6) and (None, 7) are incompatible
preprocess.py
seed_value= 42 import os os.environ['PYTHONHASHSEED']=str(seed_value) import random random.seed(seed_value) import numpy as np np.random.seed(seed_value) import tensorflow as tf tf.random.set_seed(seed_value)
import math import librosa import os import matplotlib.pyplot as plt from keras_preprocessing.sequence import pad_sequences import numpy as np from sklearn.preprocessing import StandardScaler as std, OneHotEncoder as enc from imblearn.over_sampling import SMOTE import pickle
sr = 16000 duration = 5 frame_length = 512 N_FRAMES = math.ceil(sr*duration/frame_length) N_FEATURES = 46 N_EMOTIONS = 7 emo_codes = {"W": 0, "L": 1, "E": 2, "A": 3, "F": 4, "T": 5, "N": 6} emo_labels_deu = ["wut", "langeweile", "ekel", "angst", "freude", "trauer", "neutral"] emo_labels_en = ["anger", "boredom", "disgust", "fear", "happiness", "sadness", "neutral"] emo_labels_ita = ["rabbia", "noia", "disgusto", "paura", "felicità", "tristrezza", "neutro"] path_train = "/home/alice/dat/Corpus/newberlin/TRAIN" path_test="/home/alice/dat/Corpus/newberlin/TEST"
def get_emotion_label(file_name): emo_code = file_name[5] return emo_codes[emo_code]
def get_emotion_name(file_name, lang="en"): emo_code = file_name[5] if lang == "deu": return emo_labels_deu[emo_codes[emo_code]] elif lang == "en": return emo_labels_en[emo_codes[emo_code]] elif lang == "ita": return emo_labels_ita[emo_codes[emo_code]] else: raise Exception("wrong language")
def feature_extraction_train(): wavs = []
for file in os.listdir(path_train):
y, _ = librosa.load(path_train + "/" + file, sr=sr, mono=True, duration=duration)
wavs.append(y)
# pad to fixed length (zero, 'pre')
wavs_padded = pad_sequences(wavs, maxlen=sr * duration, dtype="float32")
features_train = [] #(N_SAMPLES, N_FRAMES, N_FEATURES)
emotions_train = []
for y, name in zip(wavs_padded, os.listdir(path_train)):
frames = []
spectral_centroid = librosa.feature.spectral_centroid(y=y, sr=sr, hop_length=frame_length)[0]
spectral_contrast = librosa.feature.spectral_contrast(y=y, sr=sr, hop_length=frame_length)[0]
spectral_bandwidth = librosa.feature.spectral_bandwidth(y=y, sr=sr, hop_length=frame_length)[0]
spectral_rolloff = librosa.feature.spectral_rolloff(y=y, sr=sr, hop_length=frame_length)[0]
zero_crossing_rate = librosa.feature.zero_crossing_rate(y, hop_length=frame_length)[0]
S, phase = librosa.magphase(librosa.stft(y=y, hop_length=frame_length))
rms = librosa.feature.rms(y=y, hop_length=frame_length, S=S)[0]
mfcc = librosa.feature.mfcc(y=y, sr=sr, hop_length=frame_length)
mfcc_der = librosa.feature.delta(mfcc)
for i in range(N_FRAMES):
f=[]
f.append(spectral_centroid[i])
f.append(spectral_contrast[i])
f.append(spectral_bandwidth[i])
f.append(spectral_rolloff[i])
f.append(zero_crossing_rate[i])
f.append(rms[i])
for m_coeff in mfcc[:,i]:
f.append(m_coeff)
for m_coeff_der in mfcc_der[:, i]:
f.append(m_coeff_der)
frames.append(f)
features_train.append(frames)
emotions_train.append(get_emotion_label(name))
features_train = np.array(features_train)
emotions_train = np.array(emotions_train)
print(str(features_train.shape))
pickle.dump(features_train, open("features_train.p", "wb"))
pickle.dump(emotions_train, open("emotions_train.p", "wb"))
def get_train(): features_train = pickle.load(open("features_train.p", "rb")) emotions_train = pickle.load(open("emotions_train.p", "rb"))
# flatten
N_SAMPLES = len(features_train)
features_train.shape = (N_SAMPLES, N_FRAMES * N_FEATURES)
# standardize data
scaler = std()
features_train = scaler.fit_transform(features_train)
# shuffle
perm = np.random.permutation(N_SAMPLES)
features_train = features_train[perm]
emotions_train = emotions_train[perm]
# get balanced test set of real samples
X_train = []
y_train = []
for f,e in zip(features_train, emotions_train):
X_train.append(f)
y_train.append(e)
X_train = np.asarray(X_train)
y_train = np.asarray(y_train)
sm = SMOTE()
X_train, y_train = sm.fit_resample(X_train, y_train)
# restore 3D shape
X_train.shape = (len(X_train), N_FRAMES, N_FEATURES)
# encode labels in one-hot vectors
encoder = enc(sparse=False)
y_train = np.array(y_train).reshape(-1, 1)
y_train = encoder.fit_transform(y_train)
return X_train, y_train
def feature_extraction_test(): wavs = []
for file in os.listdir(path_test):
y, _ = librosa.load(path_test + "/" + file, sr=sr, mono=True, duration=duration)
wavs.append(y)
# pad to fixed length (zero, 'pre')
wavs_padded = pad_sequences(wavs, maxlen=sr * duration, dtype="float32")
features_test = [] #(N_SAMPLES, N_FRAMES, N_FEATURES)
emotions_test = []
for y, name in zip(wavs_padded, os.listdir(path_test)):
frames = []
spectral_centroid = librosa.feature.spectral_centroid(y=y, sr=sr, hop_length=frame_length)[0]
spectral_contrast = librosa.feature.spectral_contrast(y=y, sr=sr, hop_length=frame_length)[0]
spectral_bandwidth = librosa.feature.spectral_bandwidth(y=y, sr=sr, hop_length=frame_length)[0]
spectral_rolloff = librosa.feature.spectral_rolloff(y=y, sr=sr, hop_length=frame_length)[0]
zero_crossing_rate = librosa.feature.zero_crossing_rate(y, hop_length=frame_length)[0]
S, phase = librosa.magphase(librosa.stft(y=y, hop_length=frame_length))
rms = librosa.feature.rms(y=y, hop_length=frame_length, S=S)[0]
mfcc = librosa.feature.mfcc(y=y, sr=sr, hop_length=frame_length)
mfcc_der = librosa.feature.delta(mfcc)
for i in range(N_FRAMES):
f=[]
f.append(spectral_centroid[i])
f.append(spectral_contrast[i])
f.append(spectral_bandwidth[i])
f.append(spectral_rolloff[i])
f.append(zero_crossing_rate[i])
f.append(rms[i])
for m_coeff in mfcc[:,i]:
f.append(m_coeff)
for m_coeff_der in mfcc_der[:, i]:
f.append(m_coeff_der)
frames.append(f)
features_test.append(frames)
emotions_test.append(get_emotion_label(name))
features_test = np.array(features_test)
emotions_test = np.array(emotions_test)
print(str(features_test.shape))
pickle.dump(features_test, open("features_test.p", "wb"))
pickle.dump(emotions_test, open("emotions_test.p", "wb"))
def get_test(): features_test = pickle.load(open("features_test.p", "rb")) emotions_test = pickle.load(open("emotions_test.p", "rb"))
# flatten
N_SAMPLES = len(features_test)
features_test.shape = (N_SAMPLES, N_FRAMES * N_FEATURES)
# standardize data
scaler = std()
features_test = scaler.fit_transform(features_test)
# shuffle
perm = np.random.permutation(N_SAMPLES)
features_test = features_test[perm]
emotions_test = emotions_test[perm]
# get balanced test set of real samples
X_test = []
y_test = []
for f,e in zip(features_test, emotions_test):
X_test.append(f)
y_test.append(e)
X_test = np.asarray(X_test)
y_test = np.asarray(y_test)
# balance train classes
# restore 3D shape
X_test.shape = (len(X_test), N_FRAMES, N_FEATURES)
# encode labels in one-hot vectors
encoder = enc(sparse=False)
y_test = np.array(y_test).reshape(-1, 1)
y_test = encoder.fit_transform(y_test)
return X_test, y_test
seed_value= 42 import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
os.environ['PYTHONHASHSEED']=str(seed_value) import random random.seed(seed_value) import numpy as np np.random.seed(seed_value) import tensorflow as tf tf.random.set_seed(seed_value)
import tensorflow as tf physical_devices = tf.config.experimental.list_physical_devices('GPU') assert len(physical_devices) > 0, "Not enough GPU hardware devices available" tf.config.experimental.set_memory_growth(physical_devices[0], True)
import numpy as np import keras from keras import layers from keras.callbacks import ModelCheckpoint, EarlyStopping from keras.engine.saving import model_from_json from sklearn.metrics import classification_report, confusion_matrix from keras.utils.vis_utils import plot_model import seaborn import preprocessing as pre_proc import matplotlib.pyplot as plt
ENABLE_ATTENTION = True
def create_model(units=256): input = keras.Input(shape=(pre_proc.N_FRAMES, pre_proc.N_FEATURES)) if MODEL == "Attention_BLSTM": states, forwardh, , backwardh, = layers.Bidirectional( layers.LSTM(units, return_sequences=True, return_state=True) )(input) last_state = layers.Concatenate()([forward_h, backward_h]) hidden = layers.Dense(units, activation="tanh", use_bias=False, kernel_initializer=keras.initializers.RandomNormal(mean=0., stddev=1.) )(states) out = layers.Dense(1, activation='linear', use_bias=False, kernel_initializer=keras.initializers.RandomNormal(mean=0., stddev=1.) )(hidden) flat = layers.Flatten()(out) energy = layers.Lambda(lambda x:x/np.sqrt(units))(flat) normalize = layers.Softmax() normalize._init_set_name("alpha") alpha = normalize(energy) context_vector = layers.Dot(axes=1)([states, alpha]) context_vector = layers.Concatenate()([context_vector, last_state]) elif MODEL == "BLSTM": context_vector = layers.Bidirectional(layers.LSTM(units, return_sequences=False))(input) else: raise Exception("Unknown model architecture!") pred = layers.Dense(pre_proc.N_EMOTIONS, activation="softmax")(context_vector) model = keras.Model(inputs=[input], outputs=[pred])
model._init_set_name(MODEL)
print(str(model.summary()))
return model
def train_and_test_model(model):
#X_train, X_test, y_train, y_test = pre_proc.get_train_test()
X_train, y_train = pre_proc.get_train()
X_test, y_test = pre_proc.get_test()
model.compile(loss='categorical_crossentropy', optimizer="adam", metrics=['accuracy'])
plot_model(model, MODEL+"_model.png", show_shapes=True)
best_weights_file = MODEL+"_weights.h5"
es = EarlyStopping(monitor='val_loss', mode='min', verbose=2, patience=10)
mc = ModelCheckpoint(best_weights_file, monitor='val_loss', mode='min', verbose=2,
save_best_only=True)
history = model.fit(
X_train, y_train,
validation_data=(X_test, y_test),
epochs=30,
batch_size=32,
callbacks=[es, mc],
verbose=2
)
save(model)
# model testing
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
if MODEL == "Attention_BLSTM":
plt.title('model accuracy - BLSTM with attention')
else:
plt.title('model accuracy - BLSTM without attention')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['train', 'validation'], loc='upper left')
plt.savefig(MODEL+"_accuracy.png")
plt.gcf().clear() # clear
# loss on validation
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
if MODEL == "Attention_BLSTM":
plt.title('model loss - BLSTM with attention')
else:
plt.title('model loss - BLSTM without attention')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'validation'], loc='upper left')
plt.savefig(MODEL+"_loss.png")
plt.gcf().clear() # clear
# test acc and loss
model.load_weights(best_weights_file) # load the best saved model
model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
test_metrics = model.evaluate(X_test, y_test,batch_size=32)
print("\n%s: %.2f%%" % ("test " + model.metrics_names[1], test_metrics[1] * 100))
print("%s: %.2f" % ("test " + model.metrics_names[0], test_metrics[0]))
print("test accuracy: " + str(format(test_metrics[1], '.3f')) + "\n")
print("test loss: " + str(format(test_metrics[0], '.3f')) + "\n")
# test acc and loss per class
real_class = np.argmax(y_test, axis=1)
pred_class_probs = model.predict(X_test)
pred_class = np.argmax(pred_class_probs, axis=1)
report = classification_report(real_class, pred_class)
print("classification report:\n" + str(report) + "\n")
cm = confusion_matrix(real_class, pred_class)
print("confusion_matrix:\n" + str(cm) + "\n")
data = np.array([value for value in cm.flatten()]).reshape(7,7)
#data = np.array([value for value in cm.flatten()])
if MODEL == "Attention_BLSTM":
plt.title('BLSTM with attention')
else:
plt.title('BLSTM without attention')
seaborn.heatmap(cm, xticklabels=pre_proc.emo_labels_en, yticklabels=pre_proc.emo_labels_en, annot=data, cmap="Reds")
plt.savefig(MODEL+"_conf_matrix.png")
def visualize_attention(model): best_weights_file = MODEL + "_weights.h5" model.load_weights(best_weights_file) model.compile(loss='categoricalcrossentropy', optimizer='adam', metrics=['accuracy']) , Xtest, , _ = pre_proc.get_test() predictions = model.predict(X_test) labels = np.argmax(predictions, axis=1)
attention = model.get_layer(name="alpha")
weigth_model = keras.Model(inputs=model.input, outputs=attention.output)
attention_weights = weigth_model.predict(X_test)
d = {}
for w, l in zip(attention_weights, labels):
if l not in d:
d[l] = w
else:
d[l] += w
data = []
for x, y in d.items():
norm_w = y / np.sum(y)
data.append(norm_w)
#reshape and trim
bins = 10
bin_c = pre_proc.N_FRAMES//bins
trim = pre_proc.N_FRAMES%bins
data = np.asarray(data).reshape(pre_proc.N_EMOTIONS, pre_proc.N_FRAMES)[:, trim:]
data = np.sum(data.reshape([7, bins, bin_c]), axis=2).reshape(pre_proc.N_EMOTIONS,bins)
plt.clf()
seaborn.heatmap(data, yticklabels=pre_proc.emo_labels_en, cmap="Reds")
plt.savefig("visualize_attention.png")
def load(): with open("model.json", 'r') as f: model = model_from_json(f.read()) best_weights_file = MODEL + "_weights.h5"
model.load_weights(best_weights_file)
return model
def save(model): model_json = model.to_json() with open(MODEL+"_model.json", "w") as json_file: json_file.write(model_json) print("model saved")
######### SPEECH EMOTION RECOGNITION #########
pre_proc.feature_extraction_train() pre_proc.feature_extraction_test()
if ENABLE_ATTENTION: MODEL = "Attention_BLSTM" else: MODEL = "BLSTM"
model = create_model()
train_and_test_model(model)
if ENABLE_ATTENTION: visualize_attention(model)
Hi, the error comes from ''train_and_test_model(model)''. Did you correctly extracted the features? The error refers to an incompatibility between the shape of the input layer and the shape of the input tensor. If you did some modifications, check (before starting training) that this shape "keras.Input(shape=(pre_proc.N_FRAMES, pre_proc.N_FEATURES))" is compatible with the shape of the training set (you can load the training set from the pickle generated by the preprocessing method).
I'm sorry, I am too careless and I revise the code ,but i run the original code so it run successfully...... I extracted the features according to the preprocess.py and just put files used for training in "TRAIN" folder and put files used for testing in "TEST" folder and then i extract the features respectively. Because i want to have specific test folder so that i can add some train files or test files easily. Maybe i can't find a solution by myself and my two revised python code is in my second comment. Could you please look at my code and give me a specific modification method. Thanks in advance!!!
Hi, if you want to divide train and test in two separate folders (I don't know if it is usefull in general, may be it is for your application) it is pretty simple. You can give the path as a parameter to the "feature_extraction" method, invoking it with the training and testing paths for generating training and testing pickles respectively (you must rename the pickles in order to avoid they get overwritten in this process). Afterwards, you can readapt the "get_train_test" method to work with the training/testing pickles by removing the loop that create the train/test partitions. Of course the scaling operation in such a scenario is biased as the scaler is fitted on a subset of data. Pay also attention to the shapes of the numpy array/tensors you generate, checking that everything is consistent before starting training. Have a nice day!
I have already solved this problem and thank you very much !!! Have a nice day! :)
You're welcome. Enjoy the code! 💪🏻😁
Hi~ It's a wonderful code and thank you for your sharing :) 1) I notice that the train set and test set have been divided randomly and I want to replace the emodb with my corpus. Can i get the result of division? That is to say, I can know the the specific number of the recordings which belong to train set or test set.
2) Can i get the accuracy of a specific test recording? Have a nice day and waiting for your reply.