Open MarcoRavich opened 3 months ago
Ok, I've played (again) a bit with ChatGPT and generated this Jupiter Notebook trainer:
import os
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, TimeDistributed, LSTM, Conv1D
from tensorflow.keras.mixed_precision import set_global_policy
import soundfile as sf
from sklearn.metrics import mean_absolute_error
import ipywidgets as widgets
from IPython.display import display
from pydub import AudioSegment
from tqdm import tqdm
import re
# Enable mixed precision for performance on GPUs
set_global_policy('mixed_float16')
# Create necessary directories
os.makedirs('/training-data/compressed', exist_ok=True)
os.makedirs('/training-data/uncompressed', exist_ok=True)
os.makedirs('/aligned-data', exist_ok=True)
# Function to normalize filenames
def normalize_filename(filename):
"""Normalize filenames by removing extensions and standardizing separators."""
filename = re.sub(r'[\W_]+', '', filename.lower()) # Remove non-alphanumeric chars
return filename
# Function to read and convert audio files using soundfile or pydub
def read_audiofile(filepath):
print(f"Reading file '{filepath}'")
try:
# Use soundfile for lossless formats
data, samplerate = sf.read(filepath)
except RuntimeError:
# Use pydub for other compressed formats
audio = AudioSegment.from_file(filepath)
data = np.array(audio.get_array_of_samples()).astype(np.float32)
samplerate = audio.frame_rate
if audio.channels == 2:
data = data.reshape((-1, 2))
# Display audio details
print(f"File: {filepath}")
print(f"Sample Rate: {samplerate} Hz")
print(f"Channels: {data.shape[1] if len(data.shape) > 1 else 1}")
print(f"Duration: {len(data) / samplerate:.2f} seconds")
return samplerate, data
# Function to convert to 32-bit float if necessary
def convert_to_32bit_fp(filepath, output_path):
samplerate, data = read_audiofile(filepath)
if data.dtype != np.float32:
print(f"Converting '{filepath}' to 32-bit float WAV")
data = data.astype(np.float32) / np.max(np.abs(data)) # Normalize to [-1, 1]
sf.write(output_path, data, samplerate, subtype='FLOAT')
return output_path
# File upload handling
def upload_and_save_files(upload_widget, target_dir):
"""Save uploaded files to the target directory."""
uploaded_files = upload_widget.value
for filename in tqdm(uploaded_files.keys(), desc=f"Saving files to {target_dir}"):
content = uploaded_files[filename]['content']
file_path = os.path.join(target_dir, filename)
with open(file_path, 'wb') as f:
f.write(content)
print(f"Files saved successfully to {target_dir}")
# Function to check and upload files
def check_and_upload_files(directory, file_type, message_widget):
"""Check if the directory exists and is not empty; if not, prompt the user to upload files."""
if not os.path.exists(directory) or not os.listdir(directory):
message_widget.value = f"{file_type} files are missing or the directory is empty. Please upload the {file_type} files."
upload_widget = widgets.FileUpload(accept='.wav,.mp3,.ogg,.flac,.m4a', multiple=True)
upload_button = widgets.Button(description="Upload Files", button_style="info")
def on_upload_clicked(b):
upload_and_save_files(upload_widget, directory)
message_widget.value = f"{file_type} files have been uploaded successfully."
upload_button.on_click(on_upload_clicked)
# Organize layout for uploading section
upload_section = widgets.VBox([
widgets.HTML(f"<h3>Upload {file_type} Files</h3>"),
upload_widget,
upload_button
])
display(upload_section)
else:
message_widget.value = f"{file_type} files are already present."
# Function to align samples between two audio files
def auto_align_samples(data1, data2, num_samples=44100, center_percent=0.5):
center = int(len(data1) * center_percent)
offset = 0
best_diff = float("inf")
for i in range(0, -num_samples // 2, -1):
td1 = data1[center: center + num_samples]
td2 = data2[center + i: center + i + num_samples]
difference = mean_absolute_error(td1, td2)
if difference < best_diff:
best_diff = difference
offset = i
for i in range(0, num_samples // 2):
td1 = data1[center: center + num_samples]
td2 = data2[center + i: center + i + num_samples]
difference = mean_absolute_error(td1, td2)
if difference < best_diff:
best_diff = difference
offset = i
min_len = min(len(data1), len(data2)) - abs(offset)
if offset < 0:
td1 = data1[-offset:min_len-offset]
td2 = data2[0:min_len]
else:
td1 = data1[0:min_len]
td2 = data2[offset:min_len+offset]
assert len(td1) == len(td2)
return td1, td2, offset
# Preprocessing datasets
def preprocess_dataset(compressed_audio, uncompressed_audio, input_size, num_channels):
x_train = np.zeros([0, input_size, num_channels], dtype=np.float32)
y_train = np.zeros([0, input_size, num_channels], dtype=np.float32)
# Normalize filenames for matching
compressed_keys = {normalize_filename(k): k for k in compressed_audio.keys()}
uncompressed_keys = {normalize_filename(k): k for k in uncompressed_audio.keys()}
common_files = set(compressed_keys.keys()).intersection(set(uncompressed_keys.keys()))
if not common_files:
raise ValueError("No matching files between compressed and uncompressed datasets.")
for norm_name in common_files:
odata = compressed_audio[compressed_keys[norm_name]]
mdata = uncompressed_audio[uncompressed_keys[norm_name]]
odata, mdata, _ = auto_align_samples(odata, mdata)
sl = len(odata) // input_size * input_size
x_train = np.concatenate([x_train, np.stack(np.split(odata[:sl], input_size), axis=1)], 0)
y_train = np.concatenate([y_train, np.stack(np.split(mdata[:sl], input_size), axis=1)], 0)
return x_train, y_train
# Model building and training functions
def build_model(input_size, num_channels, hidden_size):
model = Sequential()
model.add(tf.keras.layers.Input(shape=(input_size, num_channels)))
model.add(Conv1D(hidden_size, kernel_size=3, padding='causal', activation='linear'))
model.add(LSTM(hidden_size, return_sequences=True, activation='linear'))
model.add(TimeDistributed(Dense(num_channels, activation='linear')))
return model
def compile_model(model, learning_rate):
optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
model.compile(optimizer=optimizer, loss='mae', metrics=['accuracy'])
return model
def train_model(x_train, y_train, batch_size, epochs, learning_rate, hidden_size):
input_size = x_train.shape[1]
num_channels = x_train.shape[2]
model = build_model(input_size, num_channels, hidden_size)
model = compile_model(model, learning_rate)
train_len = len(x_train) // batch_size * batch_size
x_train = x_train[:train_len]
y_train = y_train[:train_len]
# Print model summary
model.summary()
# Train the model
model.fit(x_train, y_train, batch_size=batch_size, epochs=epochs)
return model
# UI Layout
message_compressed = widgets.HTML(value="")
message_uncompressed = widgets.HTML(value="")
# Adding a Header and Description
header = widgets.HTML("<h2>Audio Compression Training Tool</h2>")
description = widgets.HTML("""
<p>This tool helps in aligning, uploading, and processing audio files for training a model on compressed and uncompressed audio data. Please follow the steps below:</p>
<ol>
<li>Upload your compressed and uncompressed audio files.</li>
<li>Once the files are uploaded, the system will automatically process and align them.</li>
<li>Train the model using the aligned data.</li>
</ol>
""")
# Checking and uploading files
check_and_upload_files('/training-data/compressed', 'Compressed', message_compressed)
check_and_upload_files('/training-data/uncompressed', 'Uncompressed', message_uncompressed)
# UI for setting hyperparameters
input_size_widget = widgets.IntText(description="Input Size:", value=64)
hidden_size_widget = widgets.IntText(description="Hidden Size:", value=128)
batch_size_widget = widgets.IntText(description="Batch Size:", value=32)
epochs_widget = widgets.IntText(description="Epochs:", value=10)
learning_rate_widget = widgets.FloatText(description="Learning Rate:", value=0.001)
# Start training button
start_button = widgets.Button(description="Start Training", button_style="success")
# Function to be called on button click
def on_train_button_clicked(b):
compressed_dir = '/training-data/compressed'
uncompressed_dir = '/training-data/uncompressed'
compressed_audio = {name: read_audiofile(os.path.join(compressed_dir, name))[1]
for name in os.listdir(compressed_dir) if name.endswith(('wav', 'mp3', 'ogg', 'flac', 'm4a'))}
uncompressed_audio = {name: read_audiofile(os.path.join(uncompressed_dir, name))[1]
for name in os.listdir(uncompressed_dir) if name.endswith(('wav', 'flac'))}
input_size = input_size_widget.value # Allow user to set input size
num_channels = 2 # Assuming stereo audio
try:
x_train, y_train = preprocess_dataset(compressed_audio, uncompressed_audio, input_size, num_channels)
trained_model = train_model(
x_train, y_train,
batch_size_widget.value,
epochs_widget.value,
learning_rate_widget.value,
hidden_size_widget.value
)
print("Training complete!")
except ValueError as ve:
print(f"Error during training: {ve}")
start_button.on_click(on_train_button_clicked)
# UI layout for the training parameters
training_layout = widgets.VBox([
widgets.HTML("<h3>Model Training:</h3>"),
widgets.HBox([input_size_widget, hidden_size_widget]),
widgets.HBox([batch_size_widget, epochs_widget, learning_rate_widget]),
start_button
])
# Display UI
display(header, description, message_compressed, message_uncompressed, training_layout)
Unfortunally files uploading does not work (it allows to select local files, but don't do anything on "Upload Files" button click): btw it detects files correctly if manually uploaded into right directories but - when pressing the "Start Training" button - it returns Error during training: No matching files between compressed and uncompressed datasets.
Hope someone can help to fix this.
Hi there, this project sounds really cool !
I've started a discussion @ HydrogenAudio about the similar @bkraad47's Fat Llama project (where I've cited this repo too) that is getting interesting... could you join it to directly explain better (than me) how "neural networks' delossification" works ?
Last but not least, as the discussion revealed too, many users would like to experiment directly but don't have the necessary hardware or aren't able to configure it: can you setup a a "Jupyter Notebook" to let anyone test AudioDelossifier easily ? Check out Google Colab/HF Spaces/Paperspace/Kaggle/Jupiter/Deepnote/etc.
Hope that inspires !