Open pxaris opened 1 year ago
!git clone https://github.com/slp-ntua/patrec-labs.git
add path to system and install requirements
import sys
sys.path.append('/kaggle/working/patrec-labs/lab3')
!ls
!pip install -r patrec-labs/lab3/requirements.txt
# step 0
import os
os.listdir('../input/patreco3-multitask-affective-music/data/')
# Step 4: Data loading and analysis
from dataset import CLASS_MAPPING, SpectrogramDataset
PARENT_DATA_DIR = '../input/patreco3-multitask-affective-music/data/'
train_dataset = SpectrogramDataset(
os.path.join(PARENT_DATA_DIR, 'fma_genre_spectrograms'), class_mapping=CLASS_MAPPING, train=True
)
print(train_dataset[10])
print(f"Input: {train_dataset[10][0].shape}")
print(f"Label: {train_dataset[10][1]}")
print(f"Original length: {train_dataset[10][2]}")
def plot_class_histogram(dataset, title):
plt.figure()
plt.hist(dataset.labels, rwidth=0.5, align='mid')
plt.xticks(dataset.labels, dataset.labels.astype(str))
plt.title(title)
plt.xlabel('Class Labels')
plt.ylabel('Frequencies')
plot_class_histogram(train_dataset, title='Histogram of classes with merged classes')
train_dataset_all = SpectrogramDataset(
os.path.join(PARENT_DATA_DIR, 'fma_genre_spectrograms'), class_mapping=None, train=True
)
plot_class_histogram(train_dataset_all, title='Histogram of classes with all classes')
customize read_spectrogram
def read_spectrogram(spectrogram_file, feat_type):
spectrogram = np.load(spectrogram_file)
# spectrograms contains a fused mel spectrogram and chromagram
if feat_type=='mel':
return spectrogram[:128, :].T
elif feat_type=='chroma':
return spectrogram[128:, :].T
return spectrogram.T
the whole updated file
import copy
import os
import numpy as np from sklearn.preprocessing import LabelEncoder from torch.utils.data import DataLoader, Dataset, SubsetRandomSampler
CLASS_MAPPING = { "Rock": "Rock", "Psych-Rock": "Rock", "Indie-Rock": None, "Post-Rock": "Rock", "Psych-Folk": "Folk", "Folk": "Folk", "Metal": "Metal", "Punk": "Metal", "Post-Punk": None, "Trip-Hop": "Trip-Hop", "Pop": "Pop", "Electronic": "Electronic", "Hip-Hop": "Hip-Hop", "Classical": "Classical", "Blues": "Blues", "Chiptune": "Electronic", "Jazz": "Jazz", "Soundtrack": None, "International": None, "Old-Time": None, }
def torch_train_val_split( dataset, batch_train, batch_eval, val_size=0.2, shuffle=True, seed=420 ):
dataset_size = len(dataset)
indices = list(range(dataset_size))
val_split = int(np.floor(val_size * dataset_size))
if shuffle:
np.random.seed(seed)
np.random.shuffle(indices)
train_indices = indices[val_split:]
val_indices = indices[:val_split]
# Creating PT data samplers and loaders:
train_sampler = SubsetRandomSampler(train_indices)
val_sampler = SubsetRandomSampler(val_indices)
train_loader = DataLoader(dataset, batch_size=batch_train, sampler=train_sampler)
val_loader = DataLoader(dataset, batch_size=batch_eval, sampler=val_sampler)
return train_loader, val_loader
def read_spectrogram(spectrogram_file, feat_type): spectrogram = np.load(spectrogram_file)
if feat_type=='mel':
return spectrogram[:128, :].T
elif feat_type=='chroma':
return spectrogram[128:, :].T
return spectrogram.T
class LabelTransformer(LabelEncoder): def inverse(self, y): try: return super(LabelTransformer, self).inverse_transform(y) except: return super(LabelTransformer, self).inverse_transform([y])
def transform(self, y):
try:
return super(LabelTransformer, self).transform(y)
except:
return super(LabelTransformer, self).transform([y])
class PaddingTransform(object): def init(self, max_length, padding_value=0): self.max_length = max_length self.padding_value = padding_value
def __call__(self, s):
if len(s) == self.max_length:
return s
if len(s) > self.max_length:
return s[: self.max_length]
if len(s) < self.max_length:
s1 = copy.deepcopy(s)
pad = np.zeros((self.max_length - s.shape[0], s.shape[1]), dtype=np.float32)
s1 = np.vstack((s1, pad))
return s1
class SpectrogramDataset(Dataset): def init( self, path, class_mapping=None, train=True, feat_type='mel', max_length=-1, regression=None ): t = "train" if train else "test" p = os.path.join(path, t) self.regression = regression
self.full_path = p
self.index = os.path.join(path, "{}_labels.txt".format(t))
self.files, labels = self.get_files_labels(self.index, class_mapping)
self.feats = [read_spectrogram(os.path.join(p, f), feat_type) for f in self.files]
self.feat_dim = self.feats[0].shape[1]
self.lengths = [len(i) for i in self.feats]
self.max_length = max(self.lengths) if max_length <= 0 else max_length
self.zero_pad_and_stack = PaddingTransform(self.max_length)
self.label_transformer = LabelTransformer()
if isinstance(labels, (list, tuple)):
if not regression:
self.labels = np.array(
self.label_transformer.fit_transform(labels)
).astype("int64")
else:
self.labels = np.array(labels).astype("float64")
def get_files_labels(self, txt, class_mapping):
with open(txt, "r") as fd:
lines = [l.rstrip().split("\t") for l in fd.readlines()[1:]]
files, labels = [], []
for l in lines:
if self.regression:
l = l[0].split(",")
files.append(l[0] + ".fused.full.npy")
labels.append(l[self.regression])
continue
label = l[1]
if class_mapping:
label = class_mapping[l[1]]
if not label:
continue
fname = l[0]
if fname.endswith(".gz"):
fname = ".".join(fname.split(".")[:-1])
if 'fma_genre_spectrograms_beat' in self.full_path.split('/'): # necessary fix 1
fname = fname.replace('beatsync.fused', 'fused.full')
if 'test' in self.full_path.split('/'): # necessary fix 2
fname = fname.replace('full.fused', 'fused.full')
files.append(fname)
labels.append(label)
return files, labels
def __getitem__(self, item):
length = min(self.lengths[item], self.max_length)
return self.zero_pad_and_stack(self.feats[item]), self.labels[item], length
def __len__(self):
return len(self.labels)
# dataloaders for specific features type, e.g. 'mel'
BATCH_SIZE = 8
MAX_LENGTH = 150
DEVICE = 'cuda'
train_dataset = SpectrogramDataset(
os.path.join(PARENT_DATA_DIR, 'fma_genre_spectrograms'), class_mapping=CLASS_MAPPING,
train=True, feat_type='mel', max_length=MAX_LENGTH
)
train_loader, val_loader = torch_train_val_split(
train_dataset, BATCH_SIZE, BATCH_SIZE
)
# get the 1st batch values of the data loader
x_b1, y_b1, lengths_b1 = next(iter(train_loader))
# print the shape of the 1st item of the 1st batch of the data loader
input_shape = x_b1[0].shape
print(input_shape)
train.py
& add implementation to the following functions
import torch
import torch.nn as nn
def overfit_with_a_couple_of_batches(model, train_loader, optimizer, device): print('Training in overfitting mode...') epochs = 400
# get only the 1st batch
x_b1, y_b1, lengths_b1 = next(iter(train_loader))
model.train()
for epoch in range(epochs):
loss, logits = model(x_b1.float().to(device), y_b1.to(device), lengths_b1.to(device))
# prepare
optimizer.zero_grad()
# backward
loss.backward()
# optimizer step
optimizer.step()
if epoch == 0 or (epoch+1)%20 == 0:
print(f'Epoch {epoch+1}, Loss at training set: {loss.item()}')
def train(model, train_loader, val_loader, optimizer, epochs, device="cuda", overfit_batch=False): if overfit_batch: overfit_with_a_couple_of_batches(model, train_loader, optimizer, device) else: pass
class Classifier(nn.Module):
def __init__(self, num_classes, backbone, load_from_checkpoint=None):
"""
backbone (nn.Module): The nn.Module to use for spectrogram parsing
num_classes (int): The number of classes
load_from_checkpoint (Optional[str]): Use a pretrained checkpoint to initialize the model
"""
super(Classifier, self).__init__()
self.backbone = backbone # An LSTMBackbone or CNNBackbone
if load_from_checkpoint is not None:
self.backbone = load_backbone_from_checkpoint(
self.backbone, load_from_checkpoint
)
self.is_lstm = isinstance(self.backbone, LSTMBackbone)
self.output_layer = nn.Linear(self.backbone.feature_size, num_classes)
self.criterion = nn.CrossEntropyLoss() # Loss function for classification
def forward(self, x, targets, lengths):
feats = self.backbone(x) if not self.is_lstm else self.backbone(x, lengths)
logits = self.output_layer(feats)
loss = self.criterion(logits, targets)
return loss, logits
# run Training in overfitting mode
from lstm import LSTMBackbone
DEVICE = 'cuda'
RNN_HIDDEN_SIZE = 64
NUM_CATEGORIES = 10
LR = 1e-4
epochs = 10
backbone = LSTMBackbone(input_shape[1], rnn_size=RNN_HIDDEN_SIZE, num_layers=2, bidirectional=True)
model = Classifier(NUM_CATEGORIES, backbone)
model.to(DEVICE)
optimizer = torch.optim.Adam(model.parameters(), lr=LR)
train(model, train_loader, val_loader, optimizer, epochs, device=DEVICE, overfit_batch=True)
# we need EarlyStopping that we'll adopt from: https://stackoverflow.com/a/73704579/19306080 and add some customizations
class EarlyStopper:
def __init__(self, model, save_path, patience=1, min_delta=0):
self.model = model
self.save_path = save_path
self.patience = patience
self.min_delta = min_delta
self.counter = 0
self.min_validation_loss = np.inf
def early_stop(self, validation_loss):
if validation_loss < self.min_validation_loss:
self.min_validation_loss = validation_loss
self.counter = 0
torch.save(self.model.state_dict(), self.save_path)
elif validation_loss > (self.min_validation_loss + self.min_delta):
self.counter += 1
if self.counter >= self.patience:
return True
return False
# implemetation of the training process
def train_one_epoch(model, train_loader, optimizer, device=DEVICE):
model.train()
total_loss = 0
for x, y, lengths in train_loader:
loss, logits = model(x.float().to(device), y.to(device), lengths.to(device))
# prepare
optimizer.zero_grad()
# backward
loss.backward()
# optimizer step
optimizer.step()
total_loss += loss.item()
avg_loss = total_loss / len(train_loader)
return avg_loss
def validate_one_epoch(model, val_loader, device=DEVICE):
model.eval()
total_loss = 0
with torch.no_grad():
for x, y, lengths in val_loader:
loss, logits = model(x.float().to(device), y.to(device), lengths.to(device))
total_loss += loss.item()
avg_loss = total_loss / len(val_loader)
return avg_loss
def train(model, train_loader, val_loader, optimizer, epochs, save_path='checkpoint.pth', device="cuda", overfit_batch=False):
if overfit_batch:
overfit_with_a_couple_of_batches(model, train_loader, optimizer, device)
else:
print(f'Training started for model {save_path.replace(".pth", "")}...')
early_stopper = EarlyStopper(model, save_path, patience=5)
for epoch in range(epochs):
train_loss = train_one_epoch(model, train_loader, optimizer)
validation_loss = validate_one_epoch(model, val_loader)
if epoch== 0 or (epoch+1)%5==0:
print(f'Epoch {epoch+1}/{epochs}, Loss at training set: {train_loss}\n\tLoss at validation set: {validation_loss}')
if early_stopper.early_stop(validation_loss):
print('Early Stopping was activated.')
print(f'Epoch {epoch+1}/{epochs}, Loss at training set: {train_loss}\n\tLoss at validation set: {validation_loss}')
print('Training has been completed.\n')
break
# run Training
DEVICE = 'cuda'
RNN_HIDDEN_SIZE = 128
NUM_CATEGORIES = 10
LR = 1e-4
epochs = 40
save_path='lstm_genre_mel.pth'
backbone = LSTMBackbone(input_shape[1], rnn_size=RNN_HIDDEN_SIZE, num_layers=2, bidirectional=True)
model = Classifier(NUM_CATEGORIES, backbone)
model.to(DEVICE)
optimizer = torch.optim.Adam(model.parameters(), lr=LR)
train(model, train_loader, val_loader, optimizer, epochs, save_path=save_path, device=DEVICE, overfit_batch=False)
# 2D CNN
# copy paste from convolution.py and add the implementation
import torch
import torch.nn as nn
class CNNBackbone(nn.Module):
def __init__(self, input_dims, in_channels, filters, feature_size):
super(CNNBackbone, self).__init__()
self.input_dims = input_dims
self.in_channels = in_channels
self.filters = filters
self.feature_size = feature_size
self.conv1 = nn.Sequential(
nn.Conv2d(self.in_channels, filters[0], kernel_size=(5,5), stride=1, padding=2),
nn.BatchNorm2d((self.in_channels**1) * filters[0]),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2, stride=2))
self.conv2 = nn.Sequential(
nn.Conv2d(filters[0], filters[1], kernel_size=(5,5), stride=1, padding=2),
nn.BatchNorm2d((self.in_channels**2) * filters[1]),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2, stride=2))
self.conv3 = nn.Sequential(
nn.Conv2d(filters[1], filters[2], kernel_size=(3,3), stride=1, padding=1),
nn.BatchNorm2d((self.in_channels**3) * filters[2]),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2, stride=2))
self.conv4 = nn.Sequential(
nn.Conv2d(filters[2], filters[3], kernel_size=(3,3), stride=1, padding=1),
nn.BatchNorm2d((self.in_channels**4) * filters[3]),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2, stride=2))
shape_after_convs = [input_dims[0]//2**(len(filters)), input_dims[1]//2**(len(filters))]
self.fc1 = nn.Linear(filters[3] * shape_after_convs[0] * shape_after_convs[1], self.feature_size)
def forward(self, x):
x = x.view(x.shape[0], self.in_channels, x.shape[1], x.shape[2])
out = self.conv1(x)
out = self.conv2(out)
out = self.conv3(out)
out = self.conv4(out)
out = out.reshape(out.size(0), -1)
out = self.fc1(out)
return out
# run Training in overfitting mode for the CNN
DEVICE = 'cuda'
NUM_CATEGORIES = 10
cnn_in_channels = 1
cnn_filters = [32, 64, 128, 256]
cnn_out_feature_size = 1000
LR = 1e-4
epochs = 10
# get the input shape
x_b1, y_b1, lengths_b1 = next(iter(train_loader))
input_shape = x_b1[0].shape
backbone = CNNBackbone(input_shape, cnn_in_channels, cnn_filters, cnn_out_feature_size)
model = Classifier(NUM_CATEGORIES, backbone)
model.to(DEVICE)
optimizer = torch.optim.Adam(model.parameters(), lr=LR)
train(model, train_loader, val_loader, optimizer, epochs, device=DEVICE, overfit_batch=True)
# add implementation of Regressor
class Regressor(nn.Module):
def __init__(self, backbone, load_from_checkpoint=None):
"""
backbone (nn.Module): The nn.Module to use for spectrogram parsing
load_from_checkpoint (Optional[str]): Use a pretrained checkpoint to initialize the model
"""
super(Regressor, self).__init__()
self.backbone = backbone # An LSTMBackbone or CNNBackbone
if load_from_checkpoint is not None:
self.backbone = load_backbone_from_checkpoint(
self.backbone, load_from_checkpoint
)
self.is_lstm = isinstance(self.backbone, LSTMBackbone)
self.output_layer = nn.Linear(self.backbone.feature_size, 1)
self.criterion = nn.MSELoss() # Loss function for regression
def forward(self, x, targets, lengths):
feats = self.backbone(x) if not self.is_lstm else self.backbone(x, lengths)
out = self.output_layer(feats)
loss = self.criterion(out.float(), targets.float())
return loss, out
Re-opening the issue to be used during tomorrow's lab.
example code:
tasks = ['valence', 'energy', 'danceability']
task = 'valence'
label_index = tasks.index(task) + 1
train_dataset = SpectrogramDataset(
os.path.join(PARENT_DATA_DIR, data_dir),
train=True, feat_type=feat_type, max_length=MAX_LENGTH, regression=label_index
)
train_loader, val_loader = torch_train_val_split(
train_dataset, BATCH_SIZE, BATCH_SIZE
)
def get_labels_predictions(model, data_loader, device):
model.eval()
y = []
y_ = []
with torch.no_grad():
for x, labels, lengths in data_loader:
loss, logits = model(x.float().to(device), labels.to(device), lengths.to(device))
y.append(labels)
y_.append(logits.argmax(dim=-1).detach().cpu().numpy())
return y, y_
source_model_path = 'cnn_genre_fused_beat.pth'
# target model
backbone = CNNBackbone(input_shape, CNN_IN_CHANNELS, CNN_FILTERS, CNN_OUT_FEATURE_SIZE)
model = Regressor(backbone)
model.to(DEVICE)
# load source model
source_model = torch.load(source_model_path)
# remove weights of final layer
del source_model['output_layer.weight']
del source_model['output_layer.bias']
# initialize target model with the state of source model
model.load_state_dict(source_model, strict=False)
# train the model, initialized with genre's model weights, for valence (regression)
save_path = 'cnn_valence_transfer_fused_beat.pth'
model.to(DEVICE)
optimizer = torch.optim.Adam(model.parameters(), lr=LR)
train(model, train_loader, val_loader, optimizer, epochs, save_path=save_path, device=DEVICE, overfit_batch=False)
Εδώ θα γράψουμε βοηθητικό κώδικα για το σημερινό εργαστήριο.
from dataset import CLASS_MAPPING, torch_train_val_split
# dataloaders for specific features type, e.g. 'mel'
BATCH_SIZE = 8
MAX_LENGTH = 150
DEVICE = 'cuda'
train_dataset = SpectrogramDataset(
os.path.join(PARENT_DATA_DIR, 'fma_genre_spectrograms'), class_mapping=CLASS_MAPPING,
train=True, feat_type='mel', max_length=MAX_LENGTH
)
train_loader, val_loader = torch_train_val_split(
train_dataset, BATCH_SIZE, BATCH_SIZE
)
# get the 1st batch values of the data loader
x_b1, y_b1, lengths_b1 = next(iter(train_loader))
# print the shape of the 1st item of the 1st batch of the data loader
input_shape = x_b1[0].shape
print(input_shape)
# run Training in overfitting mode
from lstm import LSTMBackbone
DEVICE = 'cuda'
RNN_HIDDEN_SIZE = 64
NUM_CATEGORIES = 10
LR = 1e-4
epochs = 10
backbone = LSTMBackbone(input_shape[1], rnn_size=RNN_HIDDEN_SIZE, num_layers=2, bidirectional=True)
model = Classifier(NUM_CATEGORIES, backbone)
model.to(DEVICE)
optimizer = torch.optim.Adam(model.parameters(), lr=LR)
train(model, train_loader, val_loader, optimizer, epochs, device=DEVICE, overfit_batch=True)
class EarlyStopper:
def __init__(self, model, save_path, patience=1, min_delta=0):
self.model = model
self.save_path = save_path
self.patience = patience
self.min_delta = min_delta
self.counter = 0
self.min_validation_loss = np.inf
def early_stop(self, validation_loss):
if validation_loss < self.min_validation_loss:
self.min_validation_loss = validation_loss
self.counter = 0
torch.save(self.model.state_dict(), self.save_path)
elif validation_loss > (self.min_validation_loss + self.min_delta):
self.counter += 1
if self.counter >= self.patience:
return True
return False
# implemetation of the training process
def train_one_epoch(model, train_loader, optimizer, device=DEVICE):
model.train()
total_loss = 0
for x, y, lengths in train_loader:
loss, logits = model(x.float().to(device), y.to(device), lengths.to(device))
# prepare
optimizer.zero_grad()
# backward
loss.backward()
# optimizer step
optimizer.step()
total_loss += loss.item()
avg_loss = total_loss / len(train_loader)
return avg_loss
def validate_one_epoch(model, val_loader, device=DEVICE):
model.eval()
total_loss = 0
with torch.no_grad():
for x, y, lengths in val_loader:
loss, logits = model(x.float().to(device), y.to(device), lengths.to(device))
total_loss += loss.item()
avg_loss = total_loss / len(val_loader)
return avg_loss
def train(model, train_loader, val_loader, optimizer, epochs, save_path='checkpoint.pth', device="cuda", overfit_batch=False):
if overfit_batch:
overfit_with_a_couple_of_batches(model, train_loader, optimizer, device)
else:
print(f'Training started for model {save_path.replace(".pth", "")}...')
early_stopper = EarlyStopper(model, save_path, patience=5)
for epoch in range(epochs):
train_loss = train_one_epoch(model, train_loader, optimizer)
validation_loss = validate_one_epoch(model, val_loader)
if epoch== 0 or (epoch+1)%5==0:
print(f'Epoch {epoch+1}/{epochs}, Loss at training set: {train_loss}\n\tLoss at validation set: {validation_loss}')
if early_stopper.early_stop(validation_loss):
print('Early Stopping was activated.')
print(f'Epoch {epoch+1}/{epochs}, Loss at training set: {train_loss}\n\tLoss at validation set: {validation_loss}')
print('Training has been completed.\n')
break
# run Training
DEVICE = 'cuda'
RNN_HIDDEN_SIZE = 128
NUM_CATEGORIES = 10
LR = 1e-4
epochs = 40
save_path='lstm_genre_mel.pth'
backbone = LSTMBackbone(input_shape[1], rnn_size=RNN_HIDDEN_SIZE, num_layers=2, bidirectional=True)
model = Classifier(NUM_CATEGORIES, backbone)
model.to(DEVICE)
optimizer = torch.optim.Adam(model.parameters(), lr=LR)
train(model, train_loader, val_loader, optimizer, epochs, save_path=save_path, device=DEVICE, overfit_batch=False)
from sklearn.metrics import classification_report
def get_labels_predictions(model, data_loader, device):
model.eval()
y = []
y_ = []
with torch.no_grad():
for x, labels, lengths in data_loader:
loss, logits = model(x.float().to(device), labels.to(device), lengths.to(device))
y.append(labels)
y_.append(logits.argmax(dim=-1).detach().cpu().numpy())
return y, y_
data_dir = 'fma_genre_spectrograms'
saved_model_path = 'lstm_genre_mel.pth'
print(f'Evaluation of model {saved_model_path.replace(".pth", "")} on the test set...')
test_dataset = SpectrogramDataset(
os.path.join(PARENT_DATA_DIR, data_dir), class_mapping=CLASS_MAPPING,
train=False, feat_type='mel', max_length=MAX_LENGTH
)
test_loader, _ = torch_train_val_split(
test_dataset, BATCH_SIZE, BATCH_SIZE, val_size=0.0
)
# get the input shape
x_b1, y_b1, lengths_b1 = next(iter(test_loader))
input_shape = x_b1[0].shape
backbone = LSTMBackbone(input_shape[1], rnn_size=RNN_HIDDEN_SIZE, num_layers=2, bidirectional=True)
model = Classifier(NUM_CATEGORIES, backbone)
model.to(DEVICE)
model.load_state_dict(torch.load(saved_model_path))
y, y_ = get_labels_predictions(model, test_loader, DEVICE)
print(classification_report(np.hstack(y), np.hstack(y_)))
cnn_in_channels = 1
cnn_filters = [32, 64, 128, 256]
cnn_out_feature_size = 1000
LR = 1e-4
epochs = 10
backbone = CNNBackbone(input_shape, cnn_in_channels, cnn_filters, cnn_out_feature_size)
Transfer Learning: https://github.com/slp-ntua/patrec-labs/issues/180#issuecomment-1824005869
Issue που θα χρησιμοποιηθεί κατά τη διάρκεια του 2ου Εργαστηρίου για τη σημείωση βοηθητικού κώδικα, οδηγιών κ.α.