ccappetta / bidirectional_streaming_ai_voice

Python scripts to handle a two way voice conversation with Anthropic Claude, using ElevenLabs, Faster-Whisper, and Pygame.
MIT License
118 stars 28 forks source link

Event loop error on launch #11

Closed RealJD2020 closed 4 months ago

RealJD2020 commented 4 months ago

Hi Chris,

Great work here (from your videos). I haven't been able to get it running myself. Ran into the keyboard permission issue on my Mac as well, had to run the script with elevated privileges. But then still ran into a persistent event loop error:

async_tasks.py:83>> got Future <Future pending> attached to a different loop")>
Traceback (most recent call last):

I ran it through GPT with the source code, along with more complete traceback, and it identified the following:


The repeated issue you're facing with the asyncio tasks being attached to a different loop is symptomatic of the way your event loop is managed and tasks are handled across different parts of your application. Given the traceback and the nature of the errors, it seems like there is a conflict between threads and the asyncio event loops they are supposed to manage.

Problem Diagnosis:

  1. Mixing of Threads with Asyncio:

    • Your script uses a combination of threading and asyncio, which can lead to complex interactions especially when managing event loops across different threads. Each thread in Python can only have one associated event loop.
    • Your main async tasks are being initiated inside a thread (Thread-1 runs run_async_tasks). This approach requires careful management of the event loop, particularly making sure the right event loop is set and used for all asyncio operations within that thread.
  2. Event Loop Handling:

    • The errors indicate that the Future objects are being attached to a different event loop than the one currently running the await expression. This typically happens when Future or coroutine is created in one loop and awaited in another.

Solution Suggestions:

To resolve these issues, you can take the following approaches:

  1. Proper Event Loop Initialization in Threads:

    • Ensure each thread that runs asyncio operations initializes its own event loop and sets it as the current event loop at the start of the thread. Here’s how you can modify your thread's target function to handle this properly:

      def run_async_tasks():
      loop = asyncio.new_event_loop()  # Create a new event loop for the thread
      asyncio.set_event_loop(loop)     # Set the new event loop as the current event loop in this thread
      
      loop.run_until_complete(start_async_tasks(text_to_speech_queue))
      
      try:
         loop.run_until_complete(shutdown_event.wait())
      finally:
         loop.close()  # Properly close the loop when done
  2. Error Handling in Async Tasks:

    • Modify your async tasks to properly handle and log exceptions to prevent unhandled task exceptions:
      async def text_to_speech_consumer(text_to_speech_queue):
      while True:
         try:
             text = await text_to_speech_queue.get()
             await process_text_to_speech(text)
         except Exception as e:
             print(f"Error in text_to_speech_consumer: {e}")
         finally:
             text_to_speech_queue.task_done()
  3. Handling Future Objects Correctly:

    • Ensure that any Future or coroutine you create is attached and awaited in the same event loop. Use debug mode in asyncio to help identify where the mismatches occur:
      import asyncio
      asyncio.get_event_loop().set_debug(True)

These steps should help stabilize your application's concurrency model by ensuring that each thread manages its own event loop correctly, and that tasks and futures are not crossing these loop boundaries improperly. Additionally, running with debug mode enabled can provide more insight into where and why the event loop mismatches occur, leading to more targeted fixes.

RealJD2020 commented 4 months ago

This didn't immediately resolve the issue, this is where I'm currently working on, the following advice GPT gave me. But if you have any advice, I'd be welcome to hearing it!


Given that the application successfully reaches the prompt "Press spacebar to capture your audio and begin the conversation." before crashing, the issue likely originates in the code that handles the audio capture or processing immediately following this prompt. Here are the key parts to consider based on the sequence of operations that should be happening right after you see this message:

  1. Keyboard Event Listening: The application is likely setting up to listen for the spacebar press to start audio recording. Problems in this part of the code can stem from the keyboard event handling itself or permissions issues, but since you’re running with sudo, permissions are likely not the issue anymore.

  2. Audio Recording Setup: Once the spacebar is pressed, the application should start recording audio. The actual audio capture is done through the callback mechanism within the sounddevice module, which is sensitive to exceptions and errors in the callback function.

  3. Threading and Async Event Loop Interaction: The audio recording and processing logic might be interacting with other parts of your system (such as the asyncio event loop or other threads). If there are unhandled exceptions within these interactions or misconfigurations (like attaching tasks to the wrong event loop), this could lead to crashes.

Steps to Investigate and Potential Solutions:

A. Inspect the on_space_press Function: This is likely the function handling the spacebar event. Ensure it's correctly managing state changes and starting the recording without issues.

B. Review Audio Callback Function: If the application crashes during audio recording, the callback function passed to sounddevice.InputStream might be the source of trouble. Check this function for any operations that might raise exceptions or behave unexpectedly.

def callback(indata, frames, time, status):
    if status:
        print(status)  # This can help identify if there are buffer overflows or other issues
    if is_recording:
        audio_data.append(indata.copy())  # Ensure that this operation is thread-safe and error-free

C. Ensure Thread-Safe Operations: If the callback function or other parts of your audio handling logic interact with shared resources (like appending to audio_data), ensure these interactions are thread-safe.

D. Validate Event Loop Management in Threads: Since the main crash involves asyncio and event loop errors, review how you initiate and manage these event loops in any threads that interact with asyncio.

def run_async_tasks():
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    asyncio.ensure_future(start_async_tasks())
    try:
        loop.run_forever()
    finally:
        loop.close()

E. Debugging and Logs: Add more logging before, during, and after the audio recording steps to capture the state and any errors that occur right before the crash. This can help pinpoint the exact location and cause of the failure.

print("Ready to record. Press spacebar.")
keyboard.on_press(on_space_press)
print("Recording setup complete.")

F. Consider Exception Handling in Key Areas: Wrap sensitive operations in try-except blocks to handle potential exceptions gracefully and prevent them from crashing the entire application.

try:
    # potentially problematic code
except Exception as e:
    print(f"An error occurred: {e}")

By carefully checking these areas and adding robust error handling and logging, you should be able to identify and correct the underlying cause of the crash. If the issue persists, consider isolating parts of the functionality (like audio recording) in a minimal, separate script to verify they work independently without the rest of the application's complexity.

RealJD2020 commented 4 months ago

@ccappetta I'm still getting this error when running the program (with sudo, even) -- attempting all the suggestions GPT had for me did nothing to help solve this threading and asyncio loop management problems. Are you sure this program runs for anyone else? I'm perplexed as to how it is so badly not working on my machine haha:

[The transcript history loads just fine here]

Press spacebar to capture your audio and begin the conversation.
Error in text_to_speech_consumer: Task <Task pending name='Task-2' coro=<text_to_speech_consumer() running at .../bidirectional_streaming_ai_voice/async_tasks.py:86>> got Future <Future pending created at .../.pyenv/versions/3.9.7/lib/python3.9/asyncio/base_events.py:424> attached to a different loop
Task exception was never retrieved
future: <Task finished name='Task-2' coro=<text_to_speech_consumer() done, defined at .../bidirectional_streaming_ai_voice/async_tasks.py:83> exception=ValueError('task_done() called too many times')>
Traceback (most recent call last):
  File ".../bidirectional_streaming_ai_voice/async_tasks.py", line 91, in text_to_speech_consumer
    text_to_speech_queue.task_done()
  File ".../.pyenv/versions/3.9.7/lib/python3.9/asyncio/queues.py", line 209, in task_done
    raise ValueError('task_done() called too many times')
ValueError: task_done() called too many times
Exception in thread Thread-1:
Traceback (most recent call last):
  File ".../.pyenv/versions/3.9.7/lib/python3.9/threading.py", line 973, in _bootstrap_inner
    self.run()
  File ".../.pyenv/versions/3.9.7/lib/python3.9/threading.py", line 910, in run
    self._target(*self._args, **self._kwargs)
  File ".../bidirectional_streaming_ai_voice/main.py", line 208, in run_async_tasks
    loop.run_until_complete(shutdown_event.wait())
  File ".../.pyenv/versions/3.9.7/lib/python3.9/asyncio/base_events.py", line 642, in run_until_complete
    return future.result()
  File ".../.pyenv/versions/3.9.7/lib/python3.9/asyncio/locks.py", line 226, in wait
    await fut
RuntimeError: Task <Task pending name='Task-4' coro=<Event.wait() running at .../.pyenv/versions/3.9.7/lib/python3.9/asyncio/locks.py:226> cb=[_run_until_complete_cb() at .../.pyenv/versions/3.9.7/lib/python3.9/asyncio/base_events.py:184]> got Future <Future pending created at .../.pyenv/versions/3.9.7/lib/python3.9/asyncio/base_events.py:424> attached to a different loop
zsh: segmentation fault  sudo python3 main.py
RealJD2020 commented 4 months ago

Take note of the seg fault at the end (Python actually just crashes when trying to run this on a Mac).

RealJD2020 commented 4 months ago

Alright, I made some significant progress. Few things here of important note.

  1. For Mac users, you to go to to Settings --> Privacy & Security --> Accessibility --> And you need to enable Terminal (or VSCode) in order to use the Keyboard spacebar key. This was the cause of the segfaults.
  2. The Thread <> asyncio interactions your original script was using were super duper not working on Mac. I'm not sure why it ever worked for you on Windows @ccappetta but it is possible the system was just slow enough it wasn't causing all sorts of race conditions or deadlocks. But I had to rewrite a significant amount of the threading (as Thread Executors) and make more of the functions asynchronous with asyncio to get it to actually run. And here are the revised files:

main.py:

# fmt: off
import os
import sys
import asyncio
import keyboard
import time
import tempfile
import anthropic
import datetime
import assemblyai as aai
import sounddevice as sd
import numpy as np
from async_tasks import start_async_tasks, text_to_speech_queue, text_to_speech_consumer
from concurrent.futures import ThreadPoolExecutor
from threading import Thread
from scipy.io.wavfile import write
from faster_whisper import WhisperModel
from dotenv import load_dotenv
from colorama import init, Fore, Style
# Redirect stdout to devnull while importing Pygame
os.environ['PYGAME_HIDE_SUPPORT_PROMPT'] = "hide"
import pygame
# fmt: on

# Credentials and API keys
load_dotenv()
aai.settings.api_key = os.getenv("ASSEMBLYAI_API_KEY")
anthropic_key = os.getenv('ANTHROPIC_KEY')

# Initialize colorama and Anthropic client
init(autoreset=True)
client = anthropic.Anthropic(api_key=anthropic_key)

# Initialize Pygame for audio playback
pygame.mixer.init()
pygame.init()

# Global variables
conversation_history = []
shutdown_event = asyncio.Event()
model_size = "small"
compute_type = "float16"
recording_finished = False
is_recording = False

class States:
    WAITING_FOR_USER = 1
    RECORDING_USER_INPUT = 2
    PROCESSING_USER_INPUT = 3
    GENERATING_RESPONSE = 4
    PLAYING_RESPONSE = 5

current_state = States.WAITING_FOR_USER

system_message = '''Claude, your responses in this conversation will be converted to speech in real-time for a voice-based interaction. Please optimize your replies for this format:
Be concise and focus on the most salient points. Aim for brevity over lengthy explanations.
Jump straight into your key ideas without summarizing or affirming the human's stance each time.
Make your points directly. Turn the conversation back to the human quickly. You don't need to re-state or summarize what you've already said. Think of this as me interviewing you, you do not need to wrap up your comments with open ended questions.

Feel free to be unexpected, witty, irreverent, and humorous when appropriate. Don't be afraid to make keen observations, ask thought-provoking questions, or leave some ideas incomplete for us to ponder. The goal is an engaging, unpredictable, interesting dialogue.
If a great insight, joke, or question comes to mind that doesn't fit the current topic, go ahead and say it anyway. We can always circle back to the main thread later. Serendipity and tangents are welcome.
The overall goal is to have a lively, natural conversation with plenty of back-and-forth. Prioritize concision, but also allow room for humor, improvisation, and provocative ideas. Efficiency is good, but so is keeping things fun and stimulating. We would prefer to cover a topic with many brief back and forth comments rather than a few long monologues. Do not emote using asterisksin your replies. Communicate in a natural spoken style.'''

print("\nClaude's instructions: " + system_message + Fore.YELLOW +
      "\n\nPress spacebar to capture your audio and begin the conversation." + Style.RESET_ALL)

def record_audio():
    global is_recording
    fs = 44100  # Sample rate
    duration = 90  # Maximum possible duration, but we can stop earlier
    block_duration = 0.1  # Duration of each audio block in seconds

    # Callback function to be called for each audio block. Frames, time, and status are used by library mechanisms outside this script so need to stay.
    def callback(indata, frames, time, status):
        nonlocal audio_data
        if is_recording:
            audio_data.append(indata.copy())

    # Set the flag to True to start recording
    is_recording = True

    audio_data = []
    # Start recording in a non-blocking manner
    with sd.InputStream(callback=callback, samplerate=fs, channels=1, blocksize=int(fs * block_duration)):
        while is_recording and len(audio_data) * block_duration < duration:
            sd.sleep(int(block_duration * 1000))

    # Convert the audio data to a NumPy array
    audio_data = np.concatenate(audio_data, axis=0)

    return audio_data, fs

def on_space_press(event):
    global is_recording, current_state
    if event.name == 'space':
        if current_state == States.WAITING_FOR_USER:
            is_recording = True
            current_state = States.RECORDING_USER_INPUT
            print(Fore.YELLOW + "Recording started. Press the space bar to stop.")
        elif current_state == States.RECORDING_USER_INPUT and is_recording:
            is_recording = False  # This will trigger the recording to stop
            print("Recording stopped. Processing input...")
            current_state = States.PROCESSING_USER_INPUT

def transcribe_audio_to_text(audio_data, sample_rate):
    start_time = time.time()  # Record the start time
    temp_dir = './input/'
    os.makedirs(temp_dir, exist_ok=True)
    temp_file_path = tempfile.mktemp(suffix='.wav', dir=temp_dir)
    try:
        write(temp_file_path, sample_rate, audio_data)
        compute_type = "float32"

        segments, _ = WhisperModel(
            model_size, device="cpu", compute_type=compute_type).transcribe(temp_file_path)
        transcript = " ".join(segment.text for segment in segments)
        print(Fore.GREEN + "User:", transcript)
        end_time = time.time()  # Record the end time
        duration = end_time - start_time  # Calculate the duration
        print(f"[Transcription: {duration:.2f} seconds]")
        return transcript
    except Exception as e:
        print(Fore.RED + "Error during transcription:", e)
    finally:
        os.remove(temp_file_path)

def generate_and_process_text(user_input, transcript_file):
    # Trim whitespace and check if input is empty
    user_input = user_input.strip()
    if user_input:  # Proceed only if user_input is not empty
        conversation_history.append({"role": "user", "content": user_input})
        with open(transcript_file, "a") as file:
            file.write(f"~~~\n\nUser: {user_input}\n\n~~~\n")
    else:
        print(Fore.RED + "Received empty input, skipping...")
        return  # Skip processing for empty input

    claude_response = ""
    buffer = ""
    min_chunk_size = 150
    splitters = (".", "?", "!")

    # Before streaming, validate conversation_history
    validated_history = [
        msg for msg in conversation_history if msg.get("content").strip()]

    try:
        with client.messages.stream(
            max_tokens=1024,
            messages=validated_history,
            system=system_message,
            model="claude-3-opus-20240229",
        ) as stream:
            for text in stream.text_stream:
                print(Fore.CYAN + text + Style.RESET_ALL, end="", flush=True)
                claude_response += text
                buffer += text

                # Check if buffer is ready to be chunked
                last_splitter_pos = max(buffer.rfind(splitter)
                                        for splitter in splitters)
                if len(buffer) >= min_chunk_size and last_splitter_pos != -1:
                    chunk = buffer[:last_splitter_pos+1]
                    buffer = buffer[last_splitter_pos+1:]

                    # Queue this chunk for async TTS processing
                    queue_chunk_for_processing(chunk)
    except anthropic.APIError as e:
        print(Fore.RED + f"Anthropic API Error: {e}")
        return

    # Process any remaining text in buffer
    if buffer:
        queue_chunk_for_processing(buffer)

    # Append Claude's full response to the conversation history
    conversation_history.append(
        {"role": "assistant", "content": claude_response})

    # Write Claude's completed response to the transcript file
    with open(transcript_file, "a") as file:
        file.write(f"\nClaude: {claude_response}\n\n")

    print()  # Newline for separation

def queue_chunk_for_processing(chunk):
    # Queue the chunk for asynchronous processing
    # print(Fore.YELLOW + f"Adding chunk to the queue: {chunk}")  # Debug log statement
    text_to_speech_queue.put_nowait(chunk)

async def run_async_tasks():
    # Get the current event loop and run the async tasks
    try:
        await start_async_tasks(text_to_speech_queue)
        await shutdown_event.wait()
    finally:
        pass  # Clean-up not necessary as asyncio.run handles it

def parse_transcript(transcript_content):
    conversation_history = []
    lines = transcript_content.strip().split("\n")
    for line in lines:
        if line.startswith("User: "):
            user_input = line[6:]
            conversation_history.append(
                {"role": "user", "content": user_input})
        elif line.startswith("Claude: "):
            claude_response = line[8:]
            conversation_history.append(
                {"role": "assistant", "content": claude_response})

    # Check if the last message is from the user
    if conversation_history and conversation_history[-1]["role"] == "user":
        # Append a filler message from Claude
        conversation_history.append({"role": "assistant", "content": "..."})

    return conversation_history

def determine_current_state(conversation_history):
    if conversation_history[-1]["role"] == "assistant":
        return States.WAITING_FOR_USER
    else:
        return States.GENERATING_RESPONSE

def setup_new_transcript_file():
    transcripts_directory = "transcripts"
    os.makedirs(transcripts_directory, exist_ok=True)

    timestamp = datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S')
    transcript_file = f"{transcripts_directory}/transcript_{timestamp}.txt"
    with open(transcript_file, "w") as file:
        file.write(f"Transcription started at {timestamp}\n\n")

    return transcript_file

def setup_transcript_file(transcript_filename):
    timestamp = datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S')
    new_transcript_file = f"transcripts/transcript_{timestamp}.txt"

    # Copy the content of the original transcript into the new file
    with open(transcript_filename, "r") as original_file, open(new_transcript_file, "w") as new_file:
        original_content = original_file.read()
        new_file.write(original_content)
        new_file.write(f"\nTranscription resumed at {timestamp}\n\n")

    return new_transcript_file

async def main():
    global current_state, is_recording, conversation_history

    # Initialize previous state
    previous_state = current_state

    if len(sys.argv) == 2:
        transcript_filename = sys.argv[1]
        try:
            with open(transcript_filename, "r") as file:
                transcript_content = file.read()
            conversation_history = parse_transcript(transcript_content)
            current_state = determine_current_state(conversation_history)
            transcript_file = setup_transcript_file(transcript_filename)
            print(Fore.GREEN + f"Resuming conversation from {transcript_filename}")
        except FileNotFoundError:
            print(Fore.RED + f"Transcript file '{transcript_filename}' not found. Starting a new conversation.")
            transcript_file = setup_new_transcript_file()
    else:
        transcript_file = setup_new_transcript_file()

    executor = ThreadPoolExecutor(max_workers=100)
    loop = asyncio.get_event_loop()  # here's how you should get the loop

    # Start running pending async tasks in parallel.
    async_tasks = asyncio.create_task(run_async_tasks())
    keyboard.on_press(on_space_press)

    try:
        while True:
            if current_state != previous_state:
                previous_state = current_state  # Update previous_state

            if current_state == States.RECORDING_USER_INPUT:
                # Start recording
                recording, fs = await loop.run_in_executor(executor, record_audio)  # Add await here
                current_state = States.PROCESSING_USER_INPUT

            elif current_state == States.PROCESSING_USER_INPUT:
                # Transcribe and process input
                current_state = States.GENERATING_RESPONSE
                user_input = await loop.run_in_executor(executor, transcribe_audio_to_text, recording, fs)  # Add await here
                _ = await loop.run_in_executor(executor, generate_and_process_text, user_input, transcript_file)

            elif current_state == States.GENERATING_RESPONSE:
                if not pygame.mixer.music.get_busy():  # Check if pygame playback is completed
                    current_state = States.WAITING_FOR_USER

            # Add a short sleep to prevent the loop from hogging CPU
            time.sleep(0.1)

    except BaseException as e:
        print(Fore.RED + "\nShutting down gracefully...")
        shutdown_event.set()

        if not isinstance(e, KeyboardInterrupt):
            # If the exception isn't caused by user interrupt
            # Reraise the exception after the graceful shutdown
            shutdown_event.exception = e
        # Now wait for all tasks to finish
        await async_tasks

    if hasattr(shutdown_event, 'exception'):
        # If there was an exception, reraise it
        raise shutdown_event.exception

if __name__ == "__main__":
    # pygame.mixer.init()  # Initializes Pygame mixer for audio playback
    # pygame.init()  # Initializes all imported Pygame modules
    asyncio.run(main())

async_tasks.py:

# fmt: off
import os
import asyncio
from httpx import AsyncClient, Timeout
from collections import deque
from colorama import init, Fore, Back, Style
from dotenv import load_dotenv
from datetime import datetime
os.environ['PYGAME_HIDE_SUPPORT_PROMPT'] = "hide"
import pygame
# fmt: on

asyncio.get_event_loop().set_debug(True)

load_dotenv()
init(autoreset=True)

ELEVENLABS_API_KEY = os.getenv('ELEVENLABS_API_KEY')
VOICE_ID = '6TiI3I94tSLb3E80mEmw'
MODEL_ID = 'eleven_turbo_v2'
# MODEL_ID = 'eleven_multilingual_v2'

file_increment = 0
audio_queue = deque()

# Define the text_to_speech_queue as a global asyncio Queue that can be accessed by main script
text_to_speech_queue = asyncio.Queue()

shutdown_event = asyncio.Event()

directory = datetime.now().strftime('%Y-%m-%d_%H-%M-%S')
os.makedirs(f'output/{directory}', exist_ok=True)

async def process_text_to_speech(text):
    # print(f"Begin process_text_to_speech: {text}")
    global file_increment
    filename = f'output/{directory}/{file_increment}.mp3'
    file_increment += 1

    if ELEVENLABS_API_KEY is None:
        print(Fore.RED + "Error: ELEVENLABS_API_KEY is not set.")
        return

    url = f'https://api.elevenlabs.io/v1/text-to-speech/{VOICE_ID}'
    headers = {
        "Accept": "audio/mpeg",
        "Content-Type": "application/json",
        "xi-api-key": ELEVENLABS_API_KEY
    }
    data = {
        "model_id": MODEL_ID,
        "text": text,
        "voice_settings": {"similarity_boost": 0.75, "stability": 0.3}
    }

    timeout = Timeout(30.0, connect=60.0)
    # print(Fore.YELLOW + f"Processing text for TTS: {text}")  # Debug: check text input
    try:
        async with AsyncClient(timeout=timeout) as client:
            response = await client.post(url, json=data, headers=headers)
            if response.status_code == 200:
                with open(filename, 'wb') as audio_file:
                    audio_file.write(response.content)
                audio_queue.append(filename)
    #            print(Fore.GREEN + f"Audio file saved and queued: {filename}")  # Debug: confirm file save
            else:
                print(Fore.RED + f"Error generating speech: {response.status_code} - {response.text}")
    except Exception as e:
        print(f'Error occurred in process_text_to_speech: {e}')
        raise  # This will allow the error to propagate so the consuming function is aware

async def play_audio():
    while True:
        if not pygame.mixer.music.get_busy() and audio_queue:
            pygame.mixer.music.load(audio_queue.popleft())
            pygame.mixer.music.play()
        await asyncio.sleep(0.1)

async def text_to_speech_consumer(text_to_speech_queue):
    # print(f'{datetime.now()} - Inside text_to_speech_consumer. Waiting for text in queue.')
    while True:
        try:
            text = text_to_speech_queue.get_nowait()
        #    print(f'{datetime.now()} - Got text: {text}')
        except asyncio.QueueEmpty:
        #    print(f'{datetime.now()} - Queue was empty when trying to fetch.')
            await asyncio.sleep(1)  # Sleep for a while before retrying
            continue
        except Exception as e:
            print(f'{datetime.now()} - Error in text_to_speech_consumer: {e}')
        else:
        #    print(f'{datetime.now()} - No error occurred. Proceeding to process text')
            await process_text_to_speech(text)
            text_to_speech_queue.task_done()
        #    print(f'{datetime.now()} - Done processing text')

async def start_async_tasks(text_to_speech_queue):
    """Starts asynchronous tasks without directly calling loop.run_forever()."""
    #print("Starting async tasks...")
    consumer_task = asyncio.create_task(text_to_speech_consumer(text_to_speech_queue))
    play_task = asyncio.create_task(play_audio())
    await asyncio.gather(*[consumer_task, play_task])

async def stop_async_tasks():
    # Cancel all running tasks
    tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
    [task.cancel() for task in tasks]

    # Gather all tasks to let them finish with cancellation
    await asyncio.gather(*tasks, return_exceptions=True)

The only thing that does not work now is the audio playback. I am not sure yet why the tasks aren't running properly, still trying to narrow this last bit down. So far, what works is:

  1. I can press Spacebar to record.
  2. Press spacebar to stop.
  3. Audio transcription works, sends to Claude.
  4. Claude response streams back.
  5. The text_to_speech_consumer for whatever reason seems to start blocking right when the queue begins to fill up, and doesn't actually process anything until you either keyboard interrupt the main loop, or hit Spacebar again and begin your next recording -- and then it begins to actually process the queue of TTS text, and it works.

Very obnoxious asynchronous / blocking issue tripping me up at the last mile here.

RealJD2020 commented 4 months ago

@ccappetta So, I've narrowed the issue down. Here's what's going on (aside from the imprecise asyncio and Threading mixing and mingling that's going on).

  1. You've got two different queues that are important here -- a text_to_speech_queue and an audio_queue.
  2. At any time in the main program interaction flow, either side of this handoff may be moving quicker (or empty) than the other.
    • There may be a slowdown in the ElevenLabs encoding of audio, so a backlog in the text_to_speech_queue.
    • Or there may be a backlog of audio_queue files in line for playback by pygame.
  3. In main, you are managing state change here independent of the two queue states, however. Which, for me anyway (I don't see this in your YouTube videos, but man has it been a pain on my end), the audio playback either stops mid-way through a single response generation from Claude, or, the TTS encoding itself stops, which requires a keyboard interrupt (either ^C or the Spacebar) to un-stick. And I think this is the culprit:
            elif current_state == States.GENERATING_RESPONSE:
                if not pygame.mixer.music.get_busy():  # Check if pygame playback is completed
                    current_state = States.WAITING_FOR_USER
RealJD2020 commented 4 months ago

Python3.12

ccappetta commented 4 months ago

yo this looks like awesome progress from a quick skim, i'm buried this week with work and life stuff so wont have a chance to fully digest it until later; but wanted to let you know i've seen it