deepgram / deepgram-python-sdk

Official Python SDK for Deepgram's automated speech recognition APIs.
https://developers.deepgram.com
MIT License
239 stars 63 forks source link

on_message doesn't return after adding supabase client call #454

Closed antyser closed 2 months ago

antyser commented 2 months ago

What is the current behavior?

I have a fastapi websocket server to listen audio and send via deepgram async socket for the transcription. When I added the supabase client access, it stops return anything from on_message.

When I enable debug, I got flash with

Version.v ENTER
version: 1
path: deepgram.clients.listen.v1.websocket.async_client
class_name: AsyncListenWebSocketClient
Version.v succeeded
Version.v LEAVE
event subscribed: Results
event subscribed: Error
AsyncListenWebSocketClient.start ENTER
options: {
    "diarize": true,
    "endpointing": 500,
    "interim_results": true,
    "language": "en-US",
    "model": "nova-2",
    "punctuate": true,
    "smart_format": true,
    "utterance_end_ms": "1000"
}
addons: None
headers: None
members: None
kwargs: {}
ListenWebSocketOptions switching class -> dict
combined_options: {'diarize': True, 'endpointing': 500, 'interim_results': True, 'language': 'en-US', 'model': 'nova-2', 'punctuate': True, 'smart_format': True, 'utterance_end_ms': '1000'}
combined_headers: {'Accept': 'application/json', 'Authorization': 'Token 1ac0e10fbc87ec4039fee910aa0bdddf8994bab7', 'User-Agent': '@deepgram/sdk/v3.5.0 python/12.4'}
after running thread: MainThread
number of active threads: 1
keepalive is enabled
autoflush is disabled
after running thread: MainThread
number of active threads: 1
AsyncListenWebSocketClient._emit ENTER
callback handlers for: Open
after running thread: MainThread
number of active threads: 1
after running thread: MainThread
number of active threads: 1
AsyncListenWebSocketClient._emit LEAVE
start succeeded
AsyncListenWebSocketClient.start LEAVE
AsyncListenWebSocketClient._listening ENTER
AsyncListenWebSocketClient._keep_alive ENTER
Sending KeepAlive...
keep_alive succeeded
response_type: Metadata, data: {'type': 'Metadata', 'transaction_key': 'deprecated', 'request_id': '878fdd3e-f72f-47e8-84e9-a95ac54ca727', 'sha256': '0cf6eef985cd21746e2617ebe9f4d6add90d8cc2878f6509bf064a60868018c1', 'created': '2024-08-22T07:29:25.428Z', 'duration': 0.0, 'channels': 0}
MetadataResponse: MetadataResponse(type='Metadata', transaction_key='deprecated', request_id='878fdd3e-f72f-47e8-84e9-a95ac54ca727', sha256='0cf6eef985cd21746e2617ebe9f4d6add90d8cc2878f6509bf064a60868018c1', created='2024-08-22T07:29:25.428Z', duration=0.0, channels=0, models=None, model_info=None, extra=None)
AsyncListenWebSocketClient._emit ENTER
callback handlers for: Metadata
after running thread: MainThread
number of active threads: 1
after running thread: MainThread
number of active threads: 1
AsyncListenWebSocketClient._emit LEAVE
_listening(1000) exiting gracefully
AsyncListenWebSocketClient._listening LEAVE
send() exiting gracefully: 1000
AsyncListenWebSocketClient.send LEAVE
send() exiting gracefully: 1000
AsyncListenWebSocketClient.send LEAVE
send() exiting gracefully: 1000
AsyncListenWebSocketClient.send LEAVE

image I think it means the connection has been established but not sure if the audio is actually transferred.

Steps to reproduce

Some code snippets:

async def get_interview(interview_id: str) -> Optional[Dict[str, Any]]:
    supabase = await get_supabase_client()
    response = await supabase.table(INTERVIEW_TABLE).select("*").eq("id", interview_id).maybe_single().execute()
    return response.data if response.data else None

@app.websocket("/listen/{interview_id}")
async def websocket_endpoint(websocket: WebSocket, interview_id: str):
    interview = await get_interview(interview_id)
    if not interview:
        logger.error(f"Interview not found: {interview_id}")
        raise HTTPException(status_code=404, detail="Interview not found")
    await websocket.accept()
    logger.info(f"WebSocket connection accepted at /listen/{interview_id}")

    try:
        await handle_socket(websocket, interview_id)
    except WebSocketDisconnect:
        logger.info(f"WebSocket connection disconnected at /listen/{interview_id}")
    except Exception as e:
        logger.exception("Could not process audio")
        raise Exception(f"Could not process audio: {e}")
    finally:
        if not websocket.client_state == WebSocketState.DISCONNECTED:
            await websocket.close()
        logger.info(f"WebSocket connection closed at /listen/{interview_id}")

  async def handle_socket(websocket: WebSocket, interview_id: str):
    messages = await get_messages(interview_id)
    state = ConversationState(messages=messages)
    message_queue = Queue()
    logger.info(f"Initial messages: {state.messages}")
    deepgram_socket = await connect_to_deepgram(message_queue)
    processing_task = asyncio.create_task(process_messages(state, websocket, message_queue))

    try:
        while True:
            data = await websocket.receive_text()
            d = json.loads(data)
            if d["type"] == "audio":
                audio_bytes = base64_to_bytes(d["data"])
                await deepgram_socket.send(audio_bytes)
            elif d["type"] == "image":
                await message_queue.put(("image", d["data"]))
            elif d["type"] == "text":
                await message_queue.put(("text", d["data"]))
            elif "video" in d:
                pass  # Handle video if needed
    except asyncio.CancelledError:
        logger.info("WebSocket connection closed")
    finally:
        processing_task.cancel()
        await deepgram_socket.finish()
        await websocket.close()

async def connect_to_deepgram(message_queue: Queue):
    try:
        config = DeepgramClientOptions(verbose=verboselogs.DEBUG, options={"keepalive": "true"})
        dg_client: DeepgramClient = DeepgramClient(os.getenv("DEEPGRAM_API_KEY"), config)
        dg_connection = dg_client.listen.asyncwebsocket.v("1")

        async def on_message(self, result, **kwargs):
            try:
                if result.is_final and result.channel.alternatives[0].transcript:
                    await message_queue.put(("transcript", result))
            except Exception as e:
                logger.exception("Could not process deepgram result")

        async def on_error(self, error, **kwargs):
            logger.error(f"Deepgram error: {error}")

        dg_connection.on(LiveTranscriptionEvents.Transcript, on_message)
        dg_connection.on(LiveTranscriptionEvents.Error, on_error)

        options = LiveOptions(
            punctuate=True,
            language="en-US",
            diarize=True,
            interim_results=True,
            utterance_end_ms="1000",
            smart_format=True,
            endpointing=500,
        )
        await dg_connection.start(options)

        return dg_connection
    except Exception as e:
        logger.exception("Could not open socket")
        raise Exception(f"Could not open socket: {e}")

Expected behavior

expect on_message to return message.

Please tell us about your environment

MacOS 14.5 (23F79) Python3.12 ## Other information
davidvonthenen commented 2 months ago

Hi @antyser

Based on what you have given here, it looks like you either:

  1. aren't pushing the audio to DG
  2. you haven't specified the encoding and sample_rate values in the Options and DG is unable to determine what kind of audio stream it is (ie you might be pushing audio bytes, but it doesnt know what kind of audio).
antyser commented 2 months ago
    interview = await get_interview(interview_id)
    if not interview:
        logger.error(f"Interview not found: {interview_id}")
        raise HTTPException(status_code=404, detail="Interview not found")

If this code is enabled, it cannot produce anything. Do you know what could cause the issue?

I think it is pushing the audio to DG because when I disabled it, I can see the transcription.

And regarding the setting, I add encoding="opus" and it works. I am not quite sure how to find the sample_rate, but it seems that I leave it empty it works fine. It doesn't work with 16000 or 4800.

davidvonthenen commented 2 months ago

I haven't not sure supabase APIs before, but based on that function, none of our code exists inside get_interview.

I would first put debug/trace in between those two supabase APIs to see which of those two calls are causing the hang, then I would probably ask someone in the supabase community to see if they have any suggestions.

davidvonthenen commented 2 months ago

If you are using containerized audio which you are, then you don't need to specify either of those parameters. Most people use a different encoding (not opus) which is why it struck my eye initially.

antyser commented 2 months ago

send() succeeded AsyncListenWebSocketClient.send LEAVE AsyncListenWebSocketClient.keep_alive ENTER Sending KeepAlive... AsyncListenWebSocketClient.send ENTER send() succeeded AsyncListenWebSocketClient.send LEAVE keep_alive succeeded AsyncListenWebSocketClient.keep_alive LEAVE AsyncListenWebSocketClient.send ENTER send() succeeded AsyncListenWebSocketClient.send LEAVE AsyncListenWebSocketClient.send ENTER send() succeeded AsyncListenWebSocketClient.send LEAVE AsyncListenWebSocketClient.send ENTER send() succeeded AsyncListenWebSocketClient.send LEAVE AsyncListenWebSocketClient.send ENTER send() succeeded AsyncListenWebSocketClient.send LEAVE AsyncListenWebSocketClient.send ENTER send() succeeded AsyncListenWebSocketClient.send LEAVE AsyncListenWebSocketClient.send ENTER send() succeeded AsyncListenWebSocketClient.send LEAVE AsyncListenWebSocketClient.send ENTER send() succeeded AsyncListenWebSocketClient.send LEAVE AsyncListenWebSocketClient.send ENTER send() succeeded AsyncListenWebSocketClient.send LEAVE AsyncListenWebSocketClient.send ENTER send() succeeded AsyncListenWebSocketClient.send LEAVE AsyncListenWebSocketClient.send ENTER send() succeeded AsyncListenWebSocketClient.send LEAVE AsyncListenWebSocketClient.send ENTER send() succeeded AsyncListenWebSocketClient.send LEAVE AsyncListenWebSocketClient.send ENTER send() succeeded AsyncListenWebSocketClient.send LEAVE AsyncListenWebSocketClient.send ENTER send() succeeded AsyncListenWebSocketClient.send LEAVE AsyncListenWebSocketClient.send ENTER send() succeeded AsyncListenWebSocketClient.send LEAVE AsyncListenWebSocketClient.send ENTER send() succeeded AsyncListenWebSocketClient.send LEAVE AsyncListenWebSocketClient.send ENTER send() succeeded AsyncListenWebSocketClient.send LEAVE AsyncListenWebSocketClient.send ENTER send() succeeded AsyncListenWebSocketClient.send LEAVE AsyncListenWebSocketClient.send ENTER send() succeeded AsyncListenWebSocketClient.send LEAVE AsyncListenWebSocketClient.send ENTER send() succeeded AsyncListenWebSocketClient.send LEAVE AsyncListenWebSocketClient.send ENTER send() succeeded AsyncListenWebSocketClient.send LEAVE AsyncListenWebSocketClient.send ENTER send() succeeded AsyncListenWebSocketClient.send LEAVE AsyncListenWebSocketClient.send ENTER send() succeeded AsyncListenWebSocketClient.send LEAVE AsyncListenWebSocketClient.send ENTER send() succeeded AsyncListenWebSocketClient.send LEAVE AsyncListenWebSocketClient.send ENTER send() succeeded AsyncListenWebSocketClient.send LEAVE AsyncListenWebSocketClient.send ENTER send() succeeded AsyncListenWebSocketClient.send LEAVE AsyncListenWebSocketClient.send ENTER send() succeeded AsyncListenWebSocketClient.send LEAVE response_type: Metadata, data: {'type': 'Metadata', 'transaction_key': 'deprecated', 'request_id': 'ef0834ab-b8dc-42b4-a1f0-a965767744cd', 'sha256': '5d44a359a732cbba9b6308d1116e8cc8282237d7de5173ba3c581fac3258a4ad', 'created': '2024-08-22T22:50:09.437Z', 'duration': 0.0, 'channels': 0} MetadataResponse: MetadataResponse(type='Metadata', transaction_key='deprecated', request_id='ef0834ab-b8dc-42b4-a1f0-a965767744cd', sha256='5d44a359a732cbba9b6308d1116e8cc8282237d7de5173ba3c581fac3258a4ad', created='2024-08-22T22:50:09.437Z', duration=0.0, channels=0, models=None, model_info=None, extra=None) AsyncListenWebSocketClient._emit ENTER callback handlers for: Metadata after running thread: MainThread number of active threads: 1 after running thread: MainThread number of active threads: 1 AsyncListenWebSocketClient._emit LEAVE _listening(1000) exiting gracefully AsyncListenWebSocketClient._listening LEAVE AsyncListenWebSocketClient.send ENTER send() exiting gracefully: 1000 AsyncListenWebSocketClient.send LEAVE AsyncListenWebSocketClient.send ENTER send() exiting gracefully: 1000 AsyncListenWebSocketClient.send LEAVE AsyncListenWebSocketClient.send ENTER send() exiting gracefully: 1000 AsyncListenWebSocketClient.send LEAVE AsyncListenWebSocketClient.send ENTER send() exiting gracefully: 1000 AsyncListenWebSocketClient.send LEAVE AsyncListenWebSocketClient.send ENTER send() exiting gracefully: 1000 AsyncListenWebSocketClient.send LEAVE AsyncListenWebSocketClient.send ENTER send() exiting gracefully: 1000 AsyncListenWebSocketClient.send LEAVE AsyncListenWebSocketClient.send ENTER send() exiting gracefully: 1000 AsyncListenWebSocketClient.send LEAVE AsyncListenWebSocketClient.send ENTER send() exiting gracefully: 1000 AsyncListenWebSocketClient.send LEAVE AsyncListenWebSocketClient.send ENTER send() exiting gracefully: 1000 AsyncListenWebSocketClient.send LEAVE AsyncListenWebSocketClient.send ENTER send() exiting gracefully: 1000 AsyncListenWebSocketClient.send LEAVE AsyncListenWebSocketClient.send ENTER send() exiting gracefully: 1000 AsyncListenWebSocketClient.send LEAVE AsyncListenWebSocketClient.send ENTER send() exiting gracefully: 1000 AsyncListenWebSocketClient.send LEAVE AsyncListenWebSocketClient.send ENTER send() exiting gracefully: 1000 AsyncListenWebSocketClient.send LEAVE AsyncListenWebSocketClient.send ENTER send() exiting gracefully: 1000 AsyncListenWebSocketClient.send LEAVE AsyncListenWebSocketClient.send ENTER send() exiting gracefully: 1000 AsyncListenWebSocketClient.send LEAVE AsyncListenWebSocketClient.keep_alive ENTER Sending KeepAlive... AsyncListenWebSocketClient.send ENTER send() exiting gracefully: 1000 AsyncListenWebSocketClient.send LEAVE keep_alive succeeded AsyncListenWebSocketClient.keep_alive LEAVE AsyncListenWebSocketClient.send ENTER send() exiting gracefully: 1000 AsyncListenWebSocketClient.send LEAVE AsyncListenWebSocketClient.send ENTER send() exiting gracefully: 1000 AsyncListenWebSocketClient.send LEAVE AsyncListenWebSocketClient.send ENTER send() exiting gracefully: 1000 AsyncListenWebSocketClient.send LEAVE AsyncListenWebSocketClient.send ENTER send() exiting gracefully: 1000 AsyncListenWebSocketClient.send LEAVE AsyncListenWebSocketClient.send ENTER send() exiting gracefully: 1000 AsyncListenWebSocketClient.send LEAVE AsyncListenWebSocketClient.send ENTER send() exiting gracefully: 1000 AsyncListenWebSocketClient.send LEAVE AsyncListenWebSocketClient.send ENTER send() exiting gracefully: 1000 AsyncListenWebSocketClient.send LEAVE AsyncListenWebSocketClient.send ENTER send() exiting gracefully: 1000 AsyncListenWebSocketClient.send LEAVE AsyncListenWebSocketClient.send ENTER send() exiting gracefully: 1000 AsyncListenWebSocketClient.send LEAVE AsyncListenWebSocketClient.send ENTER send() exiting gracefully: 1000 AsyncListenWebSocketClient.send LEAVE AsyncListenWebSocketClient.send ENTER send() exiting gracefully: 1000 AsyncListenWebSocketClient.send LEAVE AsyncListenWebSocketClient.send ENTER send() exiting gracefully: 1000 AsyncListenWebSocketClient.send LEAVE AsyncListenWebSocketClient.send ENTER send() exiting gracefully: 1000 AsyncListenWebSocketClient.send LEAVE AsyncListenWebSocketClient.send ENTER send() exiting gracefully: 1000 AsyncListenWebSocketClient.send LEAVE AsyncListenWebSocketClient.send ENTER send() exiting gracefully: 1000 AsyncListenWebSocketClient.send LEAVE AsyncListenWebSocketClient.send ENTER send() exiting gracefully: 1000 AsyncListenWebSocketClient.send LEAVE AsyncListenWebSocketClient.send ENTER send() exiting gracefully: 1000 AsyncListenWebSocketClient.send LEAVE AsyncListenWebSocketClient.send ENTER send() exiting gracefully: 1000 AsyncListenWebSocketClient.send LEAVE AsyncListenWebSocketClient.send ENTER send() exiting gracefully: 1000 AsyncListenWebSocketClient.send LEAVE AsyncListenWebSocketClient.send ENTER send() exiting gracefully: 1000 AsyncListenWebSocketClient.send LEAVE AsyncListenWebSocketClient.send ENTER send() exiting gracefully: 1000 AsyncListenWebSocketClient.send LEAVE AsyncListenWebSocketClient.finish ENTER closing socket...

This is the log I turn on spam log. It seems all the send are not succeed. Is there a way to find out why?

davidvonthenen commented 2 months ago

A 1000 error means the websocket is no longer connected anymore. Some thing terminated the connection.

Usually when you get the Metadata message, it means the connection is closed because audio went through to DG ('duration': 0.0):

Metadata, data: {'type': 'Metadata', 'transaction_key': 'deprecated', 'request_id': 'ef0834ab-b8dc-42b4-a1f0-a965767744cd', 'sha256': '5d44a359a732cbba9b6308d1116e8cc8282237d7de5173ba3c581fac3258a4ad', 'created': '2024-08-22T22:50:09.437Z', 'duration': 0.0, 'channels': 0}
davidvonthenen commented 2 months ago

Check the return value here because currently you aren't: await dg_connection.start(options)

I would also recommend subscribing to the Close event to see when the connection closes. Usually a 1000 means a graceful connection close. If you didn't trigger the close, then the server did because it received something it didn't expect (example, non-audio bytes or audio of the incorrect encoding).

Dump out the bytes you are calling in send() and verify that it is audio. Another way to achieve this, save the bytes to a file, and you should be able to play that file using VLC.

antyser commented 2 months ago

yeah, I found something funny.... if I enable this interview = await get_interview(interview_id), the audio file is broken, but I disable it, it is playable... Still couldn't understand the connection here, but I think at least it is confirmed that the DG close connection due to the malformed bytes.

davidvonthenen commented 2 months ago

Glad you figured it out!

antyser commented 2 months ago

the issue is frontend may send the data without waiting the socket connects. Due to the extra query delay, websocket doesn't not accept the immediately and causes the first few chunks data loss. My understanding is opus encoding should allow some data loss. But not sure if the first chunk matters.

davidvonthenen commented 2 months ago

It does matter. The first frame contains the header information for audio. Open the connection first, then send the audio. If you need to buffer and retain the audio frames, you need to need to do that in order for everything to work properly.

If you are looking for help, we have a Discord community of people that very knowledgeable and we also have an AI bot in the #ask-ai channel where you are ask it these questions.

Here is a link to join Discord: https://dpgr.am/discord