Open heinhtet14 opened 1 year ago
are you looking for the GAN training code?
Sir, if possible I would like to get resources for GAN training code
The code has not been maintained for awhile. I hope it's straightforward enough to follow. Checkout the fit
function
import numpy as np
from keras.models import Sequential, Model
from keras.layers import Dense, Dropout, Activation, LeakyReLU, PReLU, Input, Concatenate, Add
from keras.layers.normalization import BatchNormalization
from keras.callbacks import EarlyStopping
from keras.optimizers import Adam
from keras.initializers import RandomNormal
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt
import matplotlib as mpl
mpl.use('Agg')
import pickle
from pathlib import Path
from timeit import default_timer as timer
import os
import random
from pysig import dtw
import logging
class DNN_handle():
def __init__(self,
vae_dir: list, fbank_dir: list, spk: str, out_dim: int, seed):
self.vae_dir = vae_dir
self.fbank_dir = fbank_dir
self.spk = spk
self.seed = seed
self.generator = None
self.discriminator = None
self.output_dim = out_dim
self.d_losses = []
self.g_losses = []
self.d_valid_loss = []
self.g_valid_loss = []
self.train_mse = []
self.valid_mse = []
self.weight_init = RandomNormal(mean=0., stddev=0.02)
self._build_discriminator()
self._build_generator()
self._build_adversarial()
def get_context(self, X, w=5):
N, D = X.shape
# zero padding
X = np.r_[np.zeros((w, D)) + X[0], X, np.zeros((w, D)) + X[-1]]
X = np.array([X[i:i + 2 * w + 1].flatten() for i in range(N)])
return X
def _build_generator(self):
main_input = Input(shape=(self.output_dim,))
cl_input = Input(shape=(self.output_dim*5,)) # left context
cr_input = Input(shape=(self.output_dim*5,)) # right context
x = Concatenate()([cl_input, main_input, cr_input])
shortcut_1 = x
x = Dense(512, kernel_initializer=self.weight_init)(x)
x = BatchNormalization(scale=False)(x)
x = LeakyReLU(0.2)(x)
x = Dropout(0.2)(x)
x = Dense(512, kernel_initializer=self.weight_init)(x)
x = BatchNormalization(scale=False)(x)
x = LeakyReLU(0.2)(x)
x = Dropout(0.2)(x)
x = Concatenate()([x, shortcut_1])
x = Dense(512, kernel_initializer=self.weight_init)(x)
x = BatchNormalization(scale=False)(x)
x = LeakyReLU(0.2)(x)
x = Dropout(0.2)(x)
x = Dense(512, kernel_initializer=self.weight_init)(x)
x = BatchNormalization(scale=False)(x)
x = LeakyReLU(0.2)(x)
x = Dropout(0.2)(x)
x = Dense(self.output_dim, kernel_initializer=self.weight_init)(x)
x = Add()([x, main_input])
self.generator = Model(inputs=[cl_input, main_input, cr_input], outputs=x)
def _build_discriminator(self):
x = Input(shape=(self.output_dim,))
y = Input(shape=(self.output_dim,))
combined_imgs = Concatenate()([x, y])
validity = Dense(units=256, kernel_initializer=self.weight_init)(combined_imgs)
validity = LeakyReLU(0.2)(validity)
validity = Dropout(0.5)(validity)
validity = Dense(units=256, kernel_initializer=self.weight_init)(validity)
validity = LeakyReLU(0.2)(validity)
validity = Dropout(0.5)(validity)
validity = Dense(units=1, activation='sigmoid', kernel_initializer=self.weight_init)(validity)
self.discriminator = Model([x, y], validity)
def _build_adversarial(self):
# compile generator for pretrain with MAE
if 1: # use Adam
optimizer_1 = Adam()
optimizer_2 = Adam(lr=0.0001, beta_1=0.5)
optimizer_3 = Adam(lr=0.0002, beta_1=0.5)
else: # use RAdam
from keras_radam import RAdam
optimizer_1 = RAdam()
optimizer_2 = RAdam(lr=0.0001, beta_1=0.5)
optimizer_3 = RAdam(lr=0.0002, beta_1=0.5)
self.generator.compile(
optimizer=optimizer_1,
loss = 'mse'
)
# compile discriminator
self.discriminator.compile(
optimizer=optimizer_2,
loss = 'mse',
metrics = ['accuracy']
)
# compile the full GAN
self.set_trainable(self.discriminator, False)
main_input = Input(shape=(self.output_dim,))
cl_input = Input(shape=(self.output_dim*5,)) # left context
cr_input = Input(shape=(self.output_dim*5,)) # right context
y = Input(shape=(self.output_dim,))
y_fake = self.generator([cl_input, main_input, cr_input])
valid = self.discriminator([main_input, y_fake])
self.model = Model(inputs=[cl_input, main_input, cr_input, y], outputs=[valid, y_fake])
self.model.compile(
optimizer=optimizer_3,
loss=['mse', 'mae'],
loss_weights=[1., 1.],
metrics=['accuracy']
)
self.set_trainable(self.discriminator, True)
def set_trainable(self, m, val):
m.trainable = val
for l in m.layers:
l.trainable = val
def train_discriminator(self, x_train_batch, y_train_batch, x_test_batch, y_test_batch):
# TODO: finish it
batch_size = x_train_batch.shape[0]
valid = np.ones((batch_size,1))
fake = np.zeros((batch_size,1))
train_true_imgs = y_train_batch[:]
train_gen_imgs = self.generator.predict([
x_train_batch[:,:5*self.output_dim],
x_train_batch[:,5*self.output_dim:6*self.output_dim],
x_train_batch[:,6*self.output_dim:]])
valid_true_imgs = y_test_batch[:]
valid_gen_imgs = self.generator.predict([
x_test_batch[:,:5*self.output_dim],
x_test_batch[:,5*self.output_dim:6*self.output_dim],
x_test_batch[:,6*self.output_dim:]])
history_real = self.discriminator.fit([x_train_batch[:,5*self.output_dim:6*self.output_dim], train_true_imgs], valid,
batch_size=batch_size,
epochs=1,
validation_data=[[x_test_batch[:,5*self.output_dim:6*self.output_dim], valid_true_imgs], np.ones((x_test_batch.shape[0],1))])
history_fake = self.discriminator.fit([x_train_batch[:,5*self.output_dim:6*self.output_dim], train_gen_imgs], fake,
batch_size=batch_size,
epochs=1,
validation_data=[[x_test_batch[:,5*self.output_dim:6*self.output_dim], valid_gen_imgs], np.zeros((x_test_batch.shape[0],1))])
d_loss_real = history_real.history['loss'][0]
d_loss_fake = history_fake.history['loss'][0]
d_acc_real = history_real.history['acc'][0]
d_acc_fake = history_fake.history['acc'][0]
d_loss = 0.5 * (d_loss_real + d_loss_fake)
d_acc = 0.5 * (d_acc_real + d_acc_fake)
d_val_loss_real = history_real.history['val_loss'][0]
d_val_loss_fake = history_fake.history['val_loss'][0]
d_val_acc_real = history_real.history['val_acc'][0]
d_val_acc_fake = history_fake.history['val_acc'][0]
d_val_loss = 0.5 * (d_val_loss_real + d_val_loss_fake)
d_val_acc = 0.5 * (d_val_acc_real + d_val_acc_fake)
return [d_loss, d_loss_real, d_loss_fake, d_acc, d_acc_real, d_acc_fake, d_val_loss, d_val_acc]
def train_generator(self, x_train_batch, y_train_batch, x_test_batch, y_test_batch):
# TODO: finish it
batch_size = x_train_batch.shape[0]
valid = np.ones((batch_size,1))
history = self.model.fit([
x_train_batch[:,:5*self.output_dim],
x_train_batch[:,5*self.output_dim:6*self.output_dim],
x_train_batch[:,6*self.output_dim:],
y_train_batch],
[valid, y_train_batch],
batch_size=batch_size,
epochs=1,
validation_data=([
x_test_batch[:,:5*self.output_dim],
x_test_batch[:,5*self.output_dim:6*self.output_dim],
x_test_batch[:,6*self.output_dim:],
y_test_batch],
[np.ones((x_test_batch.shape[0],1)), y_test_batch]))
ks = list(history.history.keys())
# print('ks=', ks)
return history.history['loss'][0], history.history[ks[7]][0], history.history['val_loss'][0], history.history[ks[2]][0]
def prep_vae_scaler(self):
src_lst = list(self.vae_dir.glob(f'{self.spk}*_H.npy'))
for i, p in enumerate(src_lst):
if i == 0:
vae = np.load(p)
assert vae.shape[1] == 12
else:
vae = np.r_[vae, np.load(p)]
self.scaler_vae_src = StandardScaler().fit(vae)
# pickle.dump(self.scaler_ppt_src, open(f"intel_scaler_{self.spk}_ppt_src", "wb"))
tgt_lst = [Path(f"{self.vae_dir}/{p.name.replace('_H','_C')}") for p in src_lst]
for i, p in enumerate(tgt_lst):
if i == 0:
vae = np.load(p)
assert vae.shape[1] == 12
else:
vae = np.r_[vae, np.load(p)]
self.scaler_vae_tgt = StandardScaler().fit(vae) # Hz
def prep_vae_data(self):
lst = list(self.vae_dir.glob(f'{self.spk}*_H.npy'))
test_file = lst[self.seed]
del lst[self.seed]
random.Random(self.seed).shuffle(lst)
train_lst = lst[:22]
valid_lst = lst[22:]
assert len(train_lst) == 22
assert len(valid_lst) == 2
for i, p in enumerate(train_lst):
# do DTW
fbank_src = np.load(self.fbank_dir.joinpath(p.name))
fbank_tgt = np.load(self.fbank_dir.joinpath(p.name.replace('_H','_C')))
_, cost, _, path = dtw.dtw(fbank_tgt, fbank_src, 'euclidean')
fx, _ = dtw.path2fun(path, cost)
vae_src = np.load(p)
vae_tgt = np.load(self.vae_dir.joinpath(p.name.replace('_H','_C')))
vae_src = self.scaler_vae_src.transform(vae_src[fx])
vae_tgt = self.scaler_vae_tgt.transform(vae_tgt)
if i == 0:
train_dat_src = self.get_context(vae_src)
train_dat_tgt = vae_tgt
else:
train_dat_src = np.r_[train_dat_src, self.get_context(vae_src)]
train_dat_tgt = np.r_[train_dat_tgt, vae_tgt]
for i, p in enumerate(valid_lst):
# do DTW
fbank_src = np.load(f"{self.fbank_dir}/{p.name}")
fbank_tgt = np.load(f"{self.fbank_dir}/{p.name.replace('_H','_C')}")
_, cost, _, path = dtw.dtw(fbank_tgt, fbank_src, 'euclidean')
fx, _ = dtw.path2fun(path, cost)
vae_src = np.load(p)
vae_tgt = np.load(self.vae_dir.joinpath(p.name.replace('_H','_C')))
vae_src = self.scaler_vae_src.transform(vae_src[fx])
vae_tgt = self.scaler_vae_tgt.transform(vae_tgt)
if i == 0:
valid_dat_src = self.get_context(vae_src)
valid_dat_tgt = vae_tgt
else:
valid_dat_src = np.r_[valid_dat_src, self.get_context(vae_src)]
valid_dat_tgt = np.r_[valid_dat_tgt, vae_tgt]
logging.warning(f'{train_dat_src.shape} {valid_dat_src.shape}')
logging.warning(f'{train_dat_tgt.shape} {valid_dat_tgt.shape}')
return train_dat_src, valid_dat_src, train_dat_tgt, valid_dat_tgt
def predict_vae(self, vae_src):
dat_dim = vae_src.shape[1]
vae_qry = self.scaler_vae_src.transform(vae_src)
curr_qry = self.get_context(vae_qry)
curr_pred = self.generator.predict([
curr_qry[:,:5*dat_dim],
curr_qry[:,5*dat_dim:6*dat_dim],
curr_qry[:,6*dat_dim:]])
curr_pred = self.scaler_vae_tgt.inverse_transform(curr_pred)
return curr_pred
def fit_generator(self,
x_train,
y_train,
x_test,
y_test,
batch_size,
epochs):
'''
pretrain Generator
'''
earlystopping = EarlyStopping(monitor='val_loss',
patience=2,
verbose=1)
self.generator.fit([x_train[:,:5*self.output_dim],
x_train[:,5*self.output_dim:6*self.output_dim], x_train[:,6*self.output_dim:]], y_train,
batch_size=batch_size,
epochs=epochs,
verbose=2,
shuffle=True,
validation_data=([x_test[:,:5*self.output_dim],
x_test[:,5*self.output_dim:6*self.output_dim],
x_test[:,6*self.output_dim:]], y_test),
callbacks=[earlystopping])
def fit_discriminator(self,
x_train,
y_train,
x_test,
y_test,
batch_size,
epochs):
'''
Pretrain discriminator
'''
for epoch in range(epochs):
nsteps = int(x_train.shape[0] // batch_size)
idx = np.arange(x_train.shape[0])
random.shuffle(idx)
for step in range(nsteps):
idx_batch = idx[step*batch_size:(step+1)*batch_size]
x_train_batch = x_train[idx_batch]
y_train_batch = y_train[idx_batch]
assert x_train_batch.shape[0] == batch_size
assert y_train_batch.shape[0] == batch_size
idx_valid = np.random.randint(0, x_test.shape[0], batch_size)
x_valid_batch = x_test[idx_valid]
y_valid_batch = y_test[idx_valid]
d = self.train_discriminator(x_train_batch, y_train_batch, x_valid_batch, y_valid_batch)
logging.warning(
"[%d / %d] [%d / %d] [D loss: (%.3f)(R %.3f, F %.3f)] [D acc: (%.3f)(%.3f, %.3f)] [D val: (%.3f, %.3f)]" % (epoch, epochs, step, nsteps, d[0], d[1], d[2], d[3], d[4], d[5], d[6], d[7]))
def validate(self, x_test, y_test, batch_size):
# n_test = x_test.shape[0]
# valid = np.ones((n_test,1))
# fake = np.zeros((n_test,1))
# true_imgs = y_test[:]
# gen_imgs = self.generator.predict([
# x_test[:,:5*self.output_dim],
# x_test[:,5*self.output_dim:6*self.output_dim],
# x_test[:,6*self.output_dim:]])
# d_loss_real, d_acc_real = self.discriminator.evaluate(true_imgs, valid, batch_size=batch_size)
# d_loss_fake, d_acc_fake = self.discriminator.evaluate(gen_imgs, fake, batch_size=batch_size)
# g_loss, _ = self.discriminator.evaluate(gen_imgs, valid, batch_size=batch_size)
mse = self.generator.evaluate([
x_test[:,:5*self.output_dim],
x_test[:,5*self.output_dim:6*self.output_dim],
x_test[:,6*self.output_dim:]], y_test, batch_size=batch_size)
return mse
def fit(self,
x_train,
y_train,
x_test,
y_test,
batch_size,
epochs,
run_folder,
print_every_n_batches = 20):
if not Path(run_folder).is_dir():
Path(run_folder).mkdir()
for epoch in range(epochs):
nsteps = int(x_train.shape[0] // batch_size)
step_d_losses = []
step_g_losses = []
step_d_val_losses = []
step_g_val_losses = []
step_g_trn_err = []
step_g_val_err = []
idx = np.arange(x_train.shape[0])
random.shuffle(idx)
for step in range(nsteps):
idx_train = idx[step*batch_size:(step+1)*batch_size]
x_train_batch = x_train[idx_train]
y_train_batch = y_train[idx_train]
assert x_train_batch.shape[0] == batch_size
assert y_train_batch.shape[0] == batch_size
idx_valid = np.random.randint(0, x_test.shape[0], batch_size)
x_valid_batch = x_test[idx_valid]
y_valid_batch = y_test[idx_valid]
d = self.train_discriminator(x_train_batch, y_train_batch, x_valid_batch, y_valid_batch)
g = self.train_generator(x_train_batch, y_train_batch, x_valid_batch, y_valid_batch)
logging.warning("[%d / %d] [%d / %d] [D loss: (%.3f)(R %.3f, F %.3f)] [D acc: (%.3f)(%.3f, %.3f)] [G loss: %.3f] [G acc: %.3f]" % (epoch, epochs, step, nsteps, d[0], d[1], d[2], d[3], d[4], d[5], g[0], g[1]))
step_d_losses.append(d[0])
step_g_losses.append(g[0])
step_d_val_losses.append(d[6])
step_g_val_losses.append(g[2])
step_g_trn_err.append(g[1])
step_g_val_err.append(g[3])
if epoch % (print_every_n_batches) == 0 and epoch != 0:
self.generator.save_weights(os.path.join(run_folder, 'weights-%d-%d.h5' % (epoch, self.seed)))
# validation
# err = self.validate(x_test, y_test, batch_size)
# plot
# y_pred = self.predict_vae(x_test)
# fig, (ax1, ax2) = plt.subplots(2, 1)
# ax1.imshow(y_pred.T, aspect='auto')
# ax2.imshow(y_test.T, aspect='auto')
# plt.savefig(f'{run_folder}/epoch_{epoch}_{self.seed}.pdf')
# plt.close()
self.d_losses.append(np.mean(step_d_losses))
self.g_losses.append(np.mean(step_g_losses))
self.d_valid_loss.append(np.mean(step_d_val_losses))
self.g_valid_loss.append(np.mean(step_g_val_losses))
self.train_mse.append(np.mean(step_g_trn_err))
self.valid_mse.append(np.mean(step_g_val_err))
if (epoch != 0) and ((epoch+1)%5==0):
plt.figure()
plt.subplot(3,1,1)
plt.plot(self.d_losses, label='d_losses')
plt.plot(self.g_losses, label='g_losses')
plt.legend()
plt.subplot(3,1,2)
plt.plot(self.d_valid_loss, label='d_valid_losses')
plt.plot(self.g_valid_loss, label='g_valid_losses')
plt.legend()
plt.subplot(3,1,3)
plt.plot(self.valid_mse, label='G valid err')
plt.plot(self.train_mse, label='G train err')
plt.legend()
plt.savefig( os.path.join(run_folder, f'Losses-{self.seed}') )
plt.close
def load_weights(self, filepath):
self.generator.load_weights(filepath)
if __name__ == "__main__":
home_dir = Path(f"/Users/dintu/work_sp/data/")
#home_dir = Path(f"/root/Training/tdinh/formant-tracking/src/synthesis/")
spec_dir = home_dir.joinpath('intel_spec')
fbank_dir = home_dir.joinpath('intel_fbank') # for DTW
vae_dir = home_dir.joinpath('intel_vae')
BATCH_SIZE=64
# CSM7 PDF7 PDM6
spk = 'PD_F7'
run_folder = f'gan_models_{spk}'
feat_type = 'vae'
for seed in range(25):
logging.warning(f'Prepare feat_type={feat_type} seed={seed}')
dnn_handle = DNN_handle(vae_dir, fbank_dir, spk, out_dim=12, seed=seed)
st = timer()
dnn_handle.prep_vae_scaler()
train_dat_src, valid_dat_src, train_dat_tgt, valid_dat_tgt, = dnn_handle.prep_vae_data()
en = timer()
logging.warning(f'Loading files in {en - st} s')
if 1:
logging.warning('Pretrain Generator')
dnn_handle.fit_generator(train_dat_src, train_dat_tgt, valid_dat_src, valid_dat_tgt, batch_size=BATCH_SIZE, epochs=10)
if 0:
logging.warning('Pretrain Discriminator')
dnn_handle.fit_discriminator(train_dat_src, train_dat_tgt, valid_dat_src, valid_dat_tgt, batch_size=BATCH_SIZE, epochs=5)
dnn_handle.fit(train_dat_src, train_dat_tgt, valid_dat_src, valid_dat_tgt, batch_size=BATCH_SIZE, epochs=151, run_folder=run_folder)
For feature extraction, use this function
https://github.com/tuanad121/Python-WORLD/blob/master/world/main.py#L367
Thank you so much sir. For mcep-32 extraction, I extracted using pyworld.code_spectral_envelope function. Is it the correct way, sir?
you can use this
https://github.com/tuanad121/Python-WORLD/blob/master/world/main.py#L324
Dear sir, it seems that I have been using different library for feature extraction and labels. I used PyWorld-Wrapper For VUV binary voicing, I coded like this: f0_threshold = 70.0 # Set f0_threshold for voiced unvoiced classification f0, timeaxis = pw.harvest(data, sample_rate) uv_labels = ['1' if f > f0_threshold else '0' for f in f0]
For 2 band Aperiodicity, I coded like this:
f0, sp = pw.dio(data, sample_rate)
sp = pw.stonemask(data, f0, sp, sample_rate)
# Compute the 2-band aperiodicity spectrum
ap = pw.cheaptrick(data, f0, sp, sample_rate)
freq_axis = np.linspace(0, sample_rate / 2, len(ap[0]))
ap_3000 = np.mean(ap[:, np.where(freq_axis <= 3000)[0]], axis=1)
ap_8000 = np.mean(ap[:, np.where((freq_axis > 3000) & (freq_axis <= 8000))[0]], axis=1)
# Calculate the degree of voicing scalar value
ap_ratio = ap_3000 / ap_8000
ap_scalar = np.log(ap_ratio)
# Convert ap_scalar to a list of strings
ap_str = [str(x) for x in ap_scalar]
For MCEP-32 coefficients extraction, I coded like this: f0, timeaxis = pw.harvest(data ,sample_rate) f0 = pw.stonemask(data, f0, timeaxis, sample_rate) sp = pw.cheaptrick(data, f0, timeaxis, sample_rate) mcep = pw.code_spectral_envelope(sp, sample_rate, number_of_dimensions=32)
mcep = mcep[:, 1:]
Do you have any suggestions sir? Thank you so much in advance.
To check your features, you can do analysis-synthesis: Speech
--> features
--> Speech*
. If the Speech
sounds similar to the Speech*
, you are good.
If you convert a source speech
to a target speech
then obtain converted speech
, the converted speech
should be compared to the target speech*
instead of the target speech
during evaluation.
I'm implementing your paper as my senior project. I would like to know whether the source code is available for implementation.