bastibe / SoundCard

A Pure-Python Real-Time Audio Library
https://soundcard.readthedocs.io
BSD 3-Clause "New" or "Revised" License
680 stars 69 forks source link

Why won't it save my audio when I use this with SoundFile? #153

Open ughstudios opened 2 years ago

ughstudios commented 2 years ago

I have the following code. The concept here is that as long as get_is_recording() is returning true, then we call "capture_frames" which simply parses the data, and adds it to a list. But when I try to write it, it's always being written to a file with 0 kb.

What's wrong here?

def save_to_disk(self) -> None:
        error = 0
        if isinstance(self.is_recording, ValueProxy):
            error = self.is_recording.value
        else:
            error = self.is_recording

        if error:
            raise RuntimeError('Cannot save to disk while still recording.')

        soundfile.write(self.filename, self.audio_frames, self.bitrate, format='WAV', subtype='PCM_16')

    def stop_recording(self) -> None:
        self.set_recording(0)
        self.save_to_disk()

    def update(self) -> None:
        while self.get_is_recording():
            self.capture_frames()
        self.stop_recording()

def capture_frames(self):
        data = self.microphone.record(self.bitrate, self.bitrate, 2)
        print(data)
        self.audio_frames.append(data)
ughstudios commented 2 years ago

Anyone?

Chum4k3r commented 2 years ago

You should provide a minimally working example, with the class definition, as well as an entry point section, e.g. if __name__ == "__main__": where you show How everything is being called.

The code you provided is barely useless for anyone who isn't you.

Is ir multiprocessing? async? multithreading? all sync? are you sure the list you are appending is the same you are passing as arg to soundfile?

Please, help us help you

Em seg, 1 de ago de 2022 20:03, ughstudios @.***> escreveu:

Anyone?

— Reply to this email directly, view it on GitHub https://github.com/bastibe/SoundCard/issues/153#issuecomment-1201819914, or unsubscribe https://github.com/notifications/unsubscribe-auth/AJPYUKGKDOCIVA263VQYGI3VXBJU7ANCNFSM55I6QKTA . You are receiving this because you are subscribed to this thread.Message ID: @.***>

ughstudios commented 2 years ago
""" provides functionality for recording the screen """
import os
import time
from typing import Optional, Union
from datetime import datetime
from loguru import logger

from multiprocess import Process, Manager  # pylint: disable=no-name-in-module
from multiprocess.managers import Value, ValueProxy
import soundcard
import soundfile
import cv2
import numpy
import pyautogui
import ffmpeg

from league_helper.decorators import function_group_register, memoize
from league_helper.qt_settings_generated import show_live_recording
from league_helper import paths
from league_helper import youtube

def get_timestamp_str() -> str:
    return datetime.now().strftime("%Y-%m-%d_%I-%M-%S_%p")

def get_video_recording_file() -> str:
    return os.path.join(paths.get_recordings_directory(), f'LOL_{get_timestamp_str()}.avi')

def get_audio_recording_file() -> str:
    return os.path.join(paths.get_recordings_directory(), f'LOL_{get_timestamp_str()}.wav')

class VideoRecorder:
    """ Do not instance directly, call get_match_recorder singleton instead """
    def __init__(self, filename: Optional[str] = get_video_recording_file()) -> None:
        self.screen_size: tuple[int, int] = pyautogui.size()
        self.codec = cv2.VideoWriter_fourcc(*'XVID')
        self.is_recording: Union[int, Value] = 0
        self.fps: int = 12
        self.filename = filename
        self.output = cv2.VideoWriter(self.filename, self.codec, self.fps, (self.screen_size))

    def set_recording(self, new_value: int) -> None:
        if isinstance(self.is_recording, ValueProxy):
            self.is_recording.value = new_value
        else:
            self.is_recording = new_value

    def get_is_recording(self) -> int:
        if isinstance(self.is_recording, ValueProxy):
            return self.is_recording.value
        return self.is_recording

    def capture_frames(self) -> None:
        screenshot = pyautogui.screenshot()
        frame = numpy.array(screenshot)
        frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

        self.output.write(frame)
        if show_live_recording.get():
            cv2.imshow('League of Legends live recording', frame)
            cv2.waitKey(1)

    def update(self) -> None:
        while self.get_is_recording():
            self.capture_frames()
        self.stop_recording()

    def start_recording(self) -> None:
        self.set_recording(1)
        self.update()

    def stop_recording(self) -> None:
        self.set_recording(0)

class AudioRecorder:
    """ Do not instance directly, call get_match_recorder singleton instead """
    def __init__(self, filename: str = get_audio_recording_file()) -> None:
        self.bitrate = 44100
        self.audio_frames: list = []
        self.filename = filename
        self.is_recording: Union[ValueProxy, int] = 0
        self.default_speaker = soundcard.default_speaker()
        self.microphones = soundcard.all_microphones(include_loopback=True)
        for microphone in self.microphones:
            if microphone.name == self.default_speaker.name:
                self.microphone = microphone

    def set_recording(self, new_value: int) -> None:
        if isinstance(self.is_recording, ValueProxy):
            self.is_recording.value = new_value
        else:
            self.is_recording = new_value

    def get_is_recording(self) -> int:
        if isinstance(self.is_recording, ValueProxy):
            return self.is_recording.value
        return self.is_recording

    def save_to_disk(self) -> None:
        error = 0
        if isinstance(self.is_recording, ValueProxy):
            error = self.is_recording.value
        else:
            error = self.is_recording

        if error:
            raise RuntimeError('Cannot save to disk while still recording.')

        #soundfile.write(self.filename, *self.audio_frames, self.bitrate, format='WAV')

    def stop_recording(self) -> None:
        self.set_recording(0)
        self.save_to_disk()

    def update(self) -> None:
        while self.get_is_recording():
            self.capture_frames()
        self.stop_recording()

    def capture_frames(self):
        data = self.microphone.record(self.bitrate, self.bitrate, 2)
        print(data)
        soundfile.write(self.filename, data, self.bitrate, format='WAV')
        #self.audio_frames.append(data)

    def start_recording(self) -> None:
        self.set_recording(1)
        self.update()

class MatchRecorder:
    """ Do not instance directly, call get_match_recorder singleton instead """
    def __init__(self) -> None:
        self.manager = Manager()
        self.recording = self.manager.Value('i', 0)  # Set to 0 or 1 instead of True or False
        self.video_filename = self.manager.Array('b', get_video_recording_file().encode())
        self.audio_filename = self.manager.Array('b', get_audio_recording_file().encode())
        self.video_filename_length = self.manager.Value('i', len(get_video_recording_file()))
        self.audio_filename_length = self.manager.Value('i', len(get_audio_recording_file()))
        self.video_filename = get_video_recording_file()
        self.audio_filename = get_audio_recording_file()

        self.video_process = Process(target=self.start_video_recording, args=[self.video_filename, self.audio_filename_length, self.recording])  # pylint: disable=not-callable
        self.audio_process = Process(target=self.start_audio_recording, args=[self.audio_filename, self.video_filename_length, self.recording])  # pylint: disable=not-callable

    def start_recording(self) -> None:
        if self.recording.value:
            logger.info('Cannot start recording while a recording is in progress.')
            return
        self.recording.value = 1
        self.video_process.start()
        self.audio_process.start()

    def stop_recording(self) -> None:
        if not self.recording.value:
            logger.error('There is no recording to stop.')
            return
        self.recording.value = 0

    def start_video_recording(self, video_filename: str, video_filename_length: Value, is_recording: Value) -> None:
        filename = video_filename[0:video_filename_length.value]
        logger.info(f'Saving video file to: {filename}')
        video = VideoRecorder(filename)
        video.is_recording = is_recording
        video.start_recording()

    def start_audio_recording(self, audio_filename: str, audio_filename_length: Value, is_recording: Value) -> None:
        filename = audio_filename[0:audio_filename_length.value]
        logger.info(f'Saving audio file to: {filename}')
        audio = AudioRecorder(filename)
        audio.is_recording = is_recording
        audio.start_recording()

def merge_audio_and_video(video_path: str, audio_path: str) -> str:
    """ Combines the audio and video recordings into a single file
        https://ffmpeg.org/ffmpeg-filters.html#concat
        Returns the output file location
    """
    video = ffmpeg.input(video_path)
    audio = ffmpeg.input(audio_path)
    output = video_path.replace('.avi', '.mp4')
    ffmpeg.concat(video, audio, v=1, a=1).output(output).run(cmd=paths.get_ffmpeg_exe_path())
    logger.info(f'Files have been merged into a single file: {output}')
    return output

@memoize
def get_match_recorder() -> MatchRecorder:
    return MatchRecorder()

def delete_temp_recordings(recorder: MatchRecorder) -> None:
    """ After a recording has finished and has been merged with ffmpeg, we delete the separate (audio and video) recordings since we have a full recording with both merged.
        This is done to save on disk space.
    """
    filepaths = (recorder.audio_filename, recorder.video_filename)
    for filepath in filepaths:
        logger.info(f'Removing temporary recording file {filepath}')
        os.remove(filepath)

def start_recording_match() -> None:
    recorder = get_match_recorder()
    recorder.start_recording()

def stop_and_upload_match() -> None:
    recorder = get_match_recorder()
    recorder.stop_recording()
    output = merge_audio_and_video(recorder.video_filename, recorder.audio_filename)
    delete_temp_recordings(recorder)
    video_title = os.path.basename(recorder.video_filename)
    video = youtube.VideoMetaData(video_title, '', [], 'gaming', license='creativeCommon', filepath=output)
    youtube.upload_video(video)

def start():
    start_recording_match()
    time.sleep(5)
    stop_and_upload_match()

Here's an example directly from my code. Look at the function at the bottom, that's the entrypoint

bastibe commented 2 years ago

If you want help, put in some effort. Build a minimal example code, not a three-screen dump.

Your problem is, though, write expects a 1D or 2D numpy array, but you're providing a list of numpy arrays.