deepgram / deepgram-python-sdk

Official Python SDK for Deepgram's automated speech recognition APIs.
https://developers.deepgram.com
MIT License
208 stars 56 forks source link

Async client live transcribing on_message not called #442

Closed yurongchengather closed 1 month ago

yurongchengather commented 1 month ago

What is the current behavior?

I set up the callback dg_connection.on(LiveTranscriptionEvents.Transcript, on_message), but on_message is never called.

Steps to reproduce

my server handler:

import asyncio
import base64
from deepgram import (
    DeepgramClient,
    LiveOptions,
    LiveTranscriptionEvents,
)
import json

import websockets.protocol
import logs
import os
import websockets
from typing import Dict
from dotenv import load_dotenv

load_dotenv()
logger = logs.get_logger(__name__)

DEEPGRAM_API_KEY = os.getenv("DEEPGRAM_API_KEY", "")
MAX_SIZE_ASYNC_QUEUE = 2048000

# Dictionary to store WebSocket connections
websocket_connections: Dict[str, asyncio.Queue] = {}

# dg_config: DeepgramClientOptions = DeepgramClientOptions(options={"keepalive": "true"})
dg_options = LiveOptions(
    model="nova-2-conversationalai",
    interim_results=True,
    keywords=["Genie:5", "Ariel:-15", "Ario:80"],
    smart_format=True,
)

def parse_incoming_data(payload: str | bytes) -> dict | bytes:
    """
    Parses incoming data based on whether it is binary or not.
    1. If the payload is binary, return it as is.
    2. If the payload is a string and starts with "{", assume it is JSON and try to parse it and return a dict.
    3. If the payload is a string and does not start with "{", assume it is base64-encoded binary data, return the decoded bytes.

    Args:
        payload: The incoming data, either binary or string.

    Returns:
        Parsed data (either bytes or dict).
    """
    if isinstance(payload, bytes):
        return payload

    # Determine if the string is JSON by checking the first character
    if isinstance(payload, str) and payload[0] == "{":
        try:
            result = json.loads(payload)
            if isinstance(result, dict):
                return result
        except json.JSONDecodeError:
            return {}

    # If not JSON, assume base64-encoded binary data
    return base64.b64decode(payload)

# WebSocket endpoint for live transcription
async def transcribe_handler(request, ws):
    try:
        # Create a queue for each WebSocket connection
        data_queue = asyncio.Queue(MAX_SIZE_ASYNC_QUEUE)
        websocket_connections[id(ws)] = data_queue
        state = {
            "isReceivingAudioData": True,
            "isUpstreamInitialized": False,
        }

        # Create a deepgram client

        # deepgram: DeepgramClient = DeepgramClient(DEEPGRAM_API_KEY, dg_config)
        deepgram: DeepgramClient = DeepgramClient(DEEPGRAM_API_KEY)
        dg_connection = deepgram.listen.asyncwebsocket.v("1")

        async def flush_data_queue():
            while not data_queue.empty():
                message = await data_queue.get()
                await dg_connection.send(message)
                data_queue.task_done()

        async def send_to_dg(data):
            await data_queue.put(data)
            if (
                state["isUpstreamInitialized"]
                and ws.ws_proto.state == websockets.protocol.OPEN
            ):
                await flush_data_queue()

        async def on_open(self, open):
            state["isUpstreamInitialized"] = True
            await flush_data_queue()

        async def on_close(self, close):
            logger.info("deepgram on_close")
            state["isUpstreamInitialized"] = False

        async def on_error(self, error):
            logger.warning(f"Deepgram on error: {str(error)}")
            if ws.ws_proto.state == websockets.protocol.CLOSED:
                return
            await ws.close(1011, str(error))

        async def on_unhandled(self, unhandled, **kwargs):
            logger.warning(f"request unhandled by deepgram: {unhandled}\n\n")

        async def on_message(self, result):
            logger(f"deepgram on_message {result}")
            is_final = result.is_final
            if result.channel and result.channel.alternatives:
                for alternative in result.channel.alternatives:
                    transcript = alternative.transcript
                    if transcript:
                        text = transcript.strip()
                        await ws.send(
                            json.dumps(
                                {"type": "RESULT", "text": text, "isFinal": is_final}
                            )
                        )

        dg_connection.on(LiveTranscriptionEvents.Open, on_open)
        dg_connection.on(LiveTranscriptionEvents.Transcript, on_message)
        dg_connection.on(LiveTranscriptionEvents.Close, on_close)
        dg_connection.on(LiveTranscriptionEvents.Error, on_error)
        dg_connection.on(LiveTranscriptionEvents.Unhandled, on_unhandled)

        if await dg_connection.start(dg_options) is False:
            logger.error("Failed to start deepgram connection")
            return

        while True:
            # Receive data from the client, could be audio or control message
            data = await ws.recv()
            payload = parse_incoming_data(data)
            if isinstance(payload, bytes):
                await send_to_dg(payload)
            elif payload.get("type", None) == "DONE" and state["isReceivingAudioData"]:
                logger.info("received Done message")
                state["isReceivingAudioData"] = False
                break

    except Exception as e:
        logger.warning(f"WebSocket connection error: {e}")
        await ws.close(code=1011, reason="Internal server error")
    finally:
        await dg_connection.finish()
        # Clean up: Remove the WebSocket connection from the dictionary upon disconnect
        if id(ws) in websocket_connections:
            del websocket_connections[id(ws)]
        if not ws.closed:
            await ws.close(code=1000, reason="Normal closure")

My test client:

# DO NOT COMMIT
# this script is just for local testing
import asyncio
import websockets
import json
import io
from gtts import gTTS

def text_to_audio_bytes(text: str) -> bytes:
    """
    Converts text to audio data (bytes) using gTTS library.

    Args:
        text: The text string to convert.

    Returns:
        A bytes object containing the generated audio data.

    Raises:
        ValueError: If text is empty.
    """
    if not text:
        raise ValueError("Text string cannot be empty")

    # Set language and audio format
    tts = gTTS(text=text, lang="en", slow=False)

    # Save audio to a temporary buffer
    with io.BytesIO() as buffer:
        tts.write_to_fp(buffer)
        audio_data = buffer.getvalue()

    return audio_data

async def send_data(text):
    uri = "ws://localhost:8080/transcribe"
    audio_bytes = text_to_audio_bytes(text)
    async with websockets.connect(uri) as websocket:
        print("Connected to WebSocket server.")

        await websocket.send(audio_bytes)
        print(f"Sent audio data of length {len(audio_bytes)}.")

        # Send a control message to indicate the end of audio data transmission
        done_message = json.dumps({"type": "DONE"})
        await websocket.send(done_message)
        print("Sent DONE message:", done_message)

        # Listen for messages from the server
        async for message in websocket:
            data = json.loads(message)
            print("Message from server:", data)

text = "hello world"
asyncio.get_event_loop().run_until_complete(send_data(text))

Step 1: kick off the server Step 2: run the test client

Expected behavior

The client would receive the transcribe result and print it; also the server log should show that on_message is called.

Please tell us about your environment

Other information

I can see the server logging showing that the deepgram connection was successfully established and data was sent to deepgram (also dg_connection.finish and on_close were called). And the deepgram dashboard also shows status 101.

Screenshot 2024-07-29 at 4 09 47 PM

dvonthenen commented 1 month ago

Hi @yurongchengather

Your callback function signatures don't match what is expected. Example: https://github.com/deepgram/deepgram-python-sdk/blob/main/examples/speech-to-text/websocket/async_microphone/main.py#L48-L95

For AsyncIO functions, this is what the callback functions look like:

async def on_open(self, open, **kwargs):
async def on_message(self, result, **kwargs):
async def on_metadata(self, metadata, **kwargs):
async def on_speech_started(self, speech_started, **kwargs):
async def on_utterance_end(self, utterance_end, **kwargs):
async def on_close(self, close, **kwargs):
async def on_error(self, error, **kwargs):
async def on_unhandled(self, unhandled, **kwargs):
yurongchengather commented 1 month ago

Thanks for the quick response! I've updated the callback signatures for the callbacks I have. But still got the same behavior. Do I have to define all the callbacks you listed?

my updated code:

import asyncio
import base64
from deepgram import (
    DeepgramClient,
    LiveOptions,
    LiveTranscriptionEvents,
)
import json

import websockets.protocol
import logs
import os
import websockets
from typing import Dict
from dotenv import load_dotenv

load_dotenv()
logger = logs.get_logger(__name__)

DEEPGRAM_API_KEY = os.getenv("DEEPGRAM_API_KEY", "")
MAX_SIZE_ASYNC_QUEUE = 2048000

# Dictionary to store WebSocket connections
websocket_connections: Dict[str, asyncio.Queue] = {}

dg_options = LiveOptions(
    model="nova-2-conversationalai",
    interim_results=True,
    keywords=["Genie:5", "Ariel:-15", "Ario:80"],
    smart_format=True,
)

def parse_incoming_data(payload: str | bytes) -> dict | bytes:
    """
    Parses incoming data based on whether it is binary or not.
    1. If the payload is binary, return it as is.
    2. If the payload is a string and starts with "{", assume it is JSON and try to parse it and return a dict.
    3. If the payload is a string and does not start with "{", assume it is base64-encoded binary data, return the decoded bytes.

    Args:
        payload: The incoming data, either binary or string.

    Returns:
        Parsed data (either bytes or dict).
    """
    if isinstance(payload, bytes):
        return payload

    # Determine if the string is JSON by checking the first character
    if isinstance(payload, str) and payload[0] == "{":
        try:
            result = json.loads(payload)
            if isinstance(result, dict):
                return result
        except json.JSONDecodeError:
            return {}

    # If not JSON, assume base64-encoded binary data
    return base64.b64decode(payload)

# WebSocket endpoint for live transcription
async def transcribe_handler(request, ws):
    try:
        # Create a queue for each WebSocket connection
        data_queue = asyncio.Queue(MAX_SIZE_ASYNC_QUEUE)
        websocket_connections[id(ws)] = data_queue
        state = {
            "isReceivingAudioData": True,
            "isUpstreamInitialized": False,
        }

        # Create a deepgram client
        deepgram: DeepgramClient = DeepgramClient(DEEPGRAM_API_KEY)
        dg_connection = deepgram.listen.asyncwebsocket.v("1")

        async def flush_data_queue():
            while not data_queue.empty():
                message = await data_queue.get()
                await dg_connection.send(message)
                logger.info(f"sent data to deepgram of size {len(message)}")
                data_queue.task_done()

        async def send_to_dg(data):
            await data_queue.put(data)
            if (
                state["isUpstreamInitialized"]
                and ws.ws_proto.state == websockets.protocol.OPEN
            ):
                await flush_data_queue()

        async def on_open(self, open, **kwargs):
            logger.info(f"Deepgram on open {open}")
            state["isUpstreamInitialized"] = True
            await flush_data_queue()

        async def on_close(self, close, **kwargs):
            state["isUpstreamInitialized"] = False
            if ws.ws_proto.state == websockets.protocol.CLOSED:
                logger.info("ws is already closed")
                return
            logger.info(f"Closing deepgram connection: {close}")
            await ws.close()

        async def on_error(self, error, **kwargs):
            logger.warning(f"Deepgram on error: {str(error)}")
            if ws.ws_proto.state == websockets.protocol.CLOSED:
                return
            await ws.close(1011, str(error))

        async def on_message(self, result, **kwargs):
            logger(f"deepgram on_message is called {result}")
            is_final = result.is_final
            if result.channel and result.channel.alternatives:
                for alternative in result.channel.alternatives:
                    transcript = alternative.transcript
                    if transcript:
                        text = transcript.strip()
                        logger.info(
                            f"sending transcription result to client, ws is open: {ws.ws_proto.state == websockets.protocol.OPEN}, type: RESULT, text: {text}, isFinal: {is_final}"
                        )
                        await ws.send(
                            json.dumps(
                                {"type": "RESULT", "text": text, "isFinal": is_final}
                            )
                        )

        dg_connection.on(LiveTranscriptionEvents.Open, on_open)
        dg_connection.on(LiveTranscriptionEvents.Transcript, on_message)
        dg_connection.on(LiveTranscriptionEvents.Close, on_close)
        dg_connection.on(LiveTranscriptionEvents.Error, on_error)

        if await dg_connection.start(dg_options) is False:
            logger.error("Failed to start deepgram connection")
            return

        while True:
            # Receive data from the client, could be audio or control message
            data = await ws.recv()
            payload = parse_incoming_data(data)
            logger.info(
                f"using socket {ws}, socket open: {ws.ws_proto.state == websockets.protocol.OPEN}"
            )
            if isinstance(payload, bytes):
                await send_to_dg(payload)
            elif payload.get("type", None) == "DONE" and state["isReceivingAudioData"]:
                state["isReceivingAudioData"] = False
                break

    except Exception as e:
        logger.warning(f"WebSocket connection error: {e}")
        await ws.close(code=1011, reason="Internal server error")
    finally:
        logger.info("Calling deepgram finish")
        await dg_connection.finish()
        if id(ws) in websocket_connections:
            logger.info("Removing websocket")
            del websocket_connections[id(ws)]
        if not ws.closed:
            await ws.close(code=1000, reason="Normal closure")

the server logging:

[BACKEND] transcribe_handler.py:on_open:94 -  Deepgram on open {
[BACKEND]     "type": "Open"
[BACKEND] }
[BACKEND] transcribe_handler.py:transcribe_handler:142 -  using socket <sanic.server.websockets.impl.WebsocketImplProtocol object at 0x320f5cc90>, socket open: True
[BACKEND] transcribe_handler.py:flush_data_queue:82 -  sent data to deepgram of size 10368
[BACKEND] transcribe_handler.py:transcribe_handler:142 -  using socket <sanic.server.websockets.impl.WebsocketImplProtocol object at 0x320f5cc90>, socket open: True
[BACKEND] transcribe_handler.py:transcribe_handler:155 -  Calling deepgram finish
[BACKEND] transcribe_handler.py:on_close:103 -  Closing deepgram connection: {
[BACKEND]     "type": "Close"
[BACKEND] }

As you can see that the on_message is not called.

Appreciate your help!

dvonthenen commented 1 month ago

From the looks of it, you aren't waiting long enough to actually receive the transcription. It looks like once the audio stops streaming to the backend, the connection is closed right afterward. Any transcription currently being processed is lost once the connection is closed.

yurongchengather commented 1 month ago

Right, there is no waiting. But adding 10s sleeping lead to the same behavior:

    finally:
        logger.info("Start waiting before calling deepgram finish")
        await asyncio.sleep(10)
        logger.info("Calling deepgram finish")
        await dg_connection.finish()
        if id(ws) in websocket_connections:
            logger.info("Removing websocket")
            del websocket_connections[id(ws)]
        if not ws.closed:
            await ws.close(code=1000, reason="Normal closure")

And server log:

[BACKEND] transcribe_handler.py:on_open:94 -  Deepgram on open {
[BACKEND]     "type": "Open"
[BACKEND] }
[BACKEND] transcribe_handler.py:transcribe_handler:142 -  using socket <sanic.server.websockets.impl.WebsocketImplProtocol object at 0x31cd19790>, socket open: True
[BACKEND] transcribe_handler.py:flush_data_queue:82 -  sent data to deepgram of size 10560
[BACKEND] transcribe_handler.py:transcribe_handler:142 -  using socket <sanic.server.websockets.impl.WebsocketImplProtocol object at 0x31cd19790>, socket open: True
[BACKEND] transcribe_handler.py:transcribe_handler:155 -  Start waiting before calling deepgram finish
[BACKEND] transcribe_handler.py:transcribe_handler:157 -  Calling deepgram finish
[BACKEND] transcribe_handler.py:on_close:103 -  Closing deepgram connection: {
[BACKEND]     "type": "Close"
[BACKEND] }
Screenshot 2024-07-29 at 8 54 12 PM
dvonthenen commented 1 month ago

Can you turn on debugging and copy and paste the output here. This is how you enable it: https://github.com/deepgram/deepgram-python-sdk/blob/main/examples/speech-to-text/websocket/microphone/main.py#L27-L30

My leading guess is you aren't actually sending the audio stream OR the encoding doesn't match the encoding value in the LiveOptions.

yurongchengather commented 1 month ago

EDIT: my bad, the Result callback on_message was actually called from the logs.

Here is the debug output: (The logs actually printed out the correct transcribe result hello world, but the on_message callback is still not called)

[BACKEND] Version.v ENTER
[BACKEND] listen_router.py:v:164 -  Version.v ENTER
[BACKEND] listen_router.py:v:165 -  version: 1
[BACKEND] version: 1
[BACKEND] path: deepgram.clients.listen.v1.websocket.async_client
[BACKEND] class_name: AsyncListenWebSocketClient
[BACKEND] listen_router.py:v:206 -  path: deepgram.clients.listen.v1.websocket.async_client
[BACKEND] listen_router.py:v:207 -  class_name: AsyncListenWebSocketClient
[BACKEND] Version.v succeeded
[BACKEND] Version.v LEAVE
[BACKEND] listen_router.py:v:225 -  Version.v LEAVE
[BACKEND] async_client.py:on:228 -  event subscribed: Open
[BACKEND] event subscribed: Open
[BACKEND] event subscribed: Results
[BACKEND] event subscribed: Close
[BACKEND] event subscribed: Error
[BACKEND] async_client.py:on:228 -  event subscribed: Results
[BACKEND] async_client.py:on:228 -  event subscribed: Close
[BACKEND] async_client.py:on:228 -  event subscribed: Error
[BACKEND] async_client.py:start:104 -  AsyncListenWebSocketClient.start ENTER
[BACKEND] AsyncListenWebSocketClient.start ENTER
[BACKEND] options: {
[BACKEND]     "interim_results": true,
[BACKEND]     "keywords": [XXXX],
[BACKEND]     "model": "nova-2-conversationalai",
[BACKEND]     "smart_format": true
[BACKEND] }
[BACKEND] async_client.py:start:105 -  options: {
[BACKEND]     "interim_results": true,
[BACKEND]     "keywords": [XXXX],
[BACKEND]     "model": "nova-2-conversationalai",
[BACKEND]     "smart_format": true
[BACKEND] }
[BACKEND] async_client.py:start:106 -  addons: None
[BACKEND] async_client.py:start:107 -  headers: None
[BACKEND] addons: None
[BACKEND] headers: None
[BACKEND] members: None
[BACKEND] kwargs: {}
[BACKEND] async_client.py:start:108 -  members: None
[BACKEND] async_client.py:start:109 -  kwargs: {}
[BACKEND] ListenWebSocketOptions switching class -> dict
[BACKEND] async_client.py:start:130 -  ListenWebSocketOptions switching class -> dict
[BACKEND] combined_options: {'interim_results': True, 'keywords': [XXXX], 'model': 'nova-2-conversationalai', 'smart_format': True}
[BACKEND] combined_headers: {'Accept': 'application/json', 'Authorization': 'Token 1854a700cce9efc8d3a4d29daff18aedf3835701', 'User-Agent': '@deepgram/sdk/v3.4.0 python/11.9'}
[BACKEND] async_client.py:start:142 -  combined_options: {'interim_results': True, 'keywords': [XXXX], 'model': 'nova-2-conversationalai', 'smart_format': True}
[BACKEND] async_client.py:start:149 -  combined_headers: {'Accept': 'application/json', 'Authorization': 'Token 1854a700cce9efc8d3a4d29daff18aedf3835701', 'User-Agent': '@deepgram/sdk/v3.4.0 python/11.9'}
[BACKEND] after running thread: MainThread
[BACKEND] after running thread: ddtrace.internal.telemetry.writer:TelemetryWriter
[BACKEND] async_client.py:start:163 -  after running thread: MainThread
[BACKEND] async_client.py:start:163 -  after running thread: ddtrace.internal.telemetry.writer:TelemetryWriter
[BACKEND] async_client.py:start:164 -  number of active threads: 2
[BACKEND] number of active threads: 2
[BACKEND] keepalive is enabled
[BACKEND] autoflush is disabled
[BACKEND] after running thread: MainThread
[BACKEND] after running thread: ddtrace.internal.telemetry.writer:TelemetryWriter
[BACKEND] async_client.py:start:185 -  after running thread: MainThread
[BACKEND] async_client.py:start:185 -  after running thread: ddtrace.internal.telemetry.writer:TelemetryWriter
[BACKEND] async_client.py:start:186 -  number of active threads: 2
[BACKEND] number of active threads: 2
[BACKEND] AsyncListenWebSocketClient._emit ENTER
[BACKEND] callback handlers for: Open
[BACKEND] async_client.py:_emit:238 -  AsyncListenWebSocketClient._emit ENTER
[BACKEND] async_client.py:_emit:239 -  callback handlers for: Open
[BACKEND] async_client.py:_emit:243 -  after running thread: MainThread
[BACKEND] async_client.py:_emit:243 -  after running thread: ddtrace.internal.telemetry.writer:TelemetryWriter
[BACKEND] after running thread: MainThread
[BACKEND] after running thread: ddtrace.internal.telemetry.writer:TelemetryWriter
[BACKEND] number of active threads: 2
[BACKEND] waiting for tasks to finish...
[BACKEND] async_client.py:_emit:244 -  number of active threads: 2
[BACKEND] async_client.py:_emit:252 -  waiting for tasks to finish...
[BACKEND] AsyncListenWebSocketClient._listening ENTER
[BACKEND] async_client.py:_listening:268 -  AsyncListenWebSocketClient._listening ENTER
[BACKEND] AsyncListenWebSocketClient._keep_alive ENTER
[BACKEND] async_client.py:_keep_alive:477 -  AsyncListenWebSocketClient._keep_alive ENTER
[BACKEND] transcribe_handler.py:on_open:97 -  Deepgram on open {
[BACKEND]     "type": "Open"
[BACKEND] }
[BACKEND] async_client.py:_emit:258 -  after running thread: MainThread
[BACKEND] async_client.py:_emit:258 -  after running thread: ddtrace.internal.telemetry.writer:TelemetryWriter
[BACKEND] async_client.py:_emit:259 -  number of active threads: 2
[BACKEND] async_client.py:_emit:261 -  AsyncListenWebSocketClient._emit LEAVE
[BACKEND] async_client.py:start:195 -  AsyncListenWebSocketClient.start LEAVE
[BACKEND] after running thread: MainThread
[BACKEND] after running thread: ddtrace.internal.telemetry.writer:TelemetryWriter
[BACKEND] number of active threads: 2
[BACKEND] AsyncListenWebSocketClient._emit LEAVE
[BACKEND] start succeeded
[BACKEND] AsyncListenWebSocketClient.start LEAVE
[BACKEND] transcribe_handler.py:transcribe_handler:145 -  using socket <sanic.server.websockets.impl.WebsocketImplProtocol object at 0x30e8e0c10>, socket open: True
[BACKEND] transcribe_handler.py:flush_data_queue:85 -  sent data to deepgram of size 10560
[BACKEND] transcribe_handler.py:transcribe_handler:145 -  using socket <sanic.server.websockets.impl.WebsocketImplProtocol object at 0x30e8e0c10>, socket open: True
[BACKEND] transcribe_handler.py:transcribe_handler:158 -  Start waiting before calling deepgram finish
[BACKEND] response_type: Results, data: {'type': 'Results', 'channel_index': [0, 1], 'duration': 1.0079376, 'start': 0.0, 'is_final': False, 'speech_final': False, 'channel': {'alternatives': [{'transcript': 'Hello, world.', 'confidence': 0.98413086, 'words': [{'word': 'hello', 'start': 0.08, 'end': 0.56, 'confidence': 0.8967285, 'punctuated_word': 'Hello,'}, {'word': 'world', 'start': 0.56, 'end': 1.0079376, 'confidence': 0.98413086, 'punctuated_word': 'world.'}]}]}, 'metadata': {'request_id': 'a5d9f4be-78a7-4fb0-b81d-c22aabaea6d5', 'model_info': {'name': '2-conversationalai-nova', 'version': '2024-02-05.8805', 'arch': 'nova-2'}, 'model_uuid': '39d6c502-03bf-4618-9cd1-dac1b89d663b'}, 'from_finalize': False}
[BACKEND] async_client.py:_listening:290 -  response_type: Results, data: {'type': 'Results', 'channel_index': [0, 1], 'duration': 1.0079376, 'start': 0.0, 'is_final': False, 'speech_final': False, 'channel': {'alternatives': [{'transcript': 'Hello, world.', 'confidence': 0.98413086, 'words': [{'word': 'hello', 'start': 0.08, 'end': 0.56, 'confidence': 0.8967285, 'punctuated_word': 'Hello,'}, {'word': 'world', 'start': 0.56, 'end': 1.0079376, 'confidence': 0.98413086, 'punctuated_word': 'world.'}]}]}, 'metadata': {'request_id': 'a5d9f4be-78a7-4fb0-b81d-c22aabaea6d5', 'model_info': {'name': '2-conversationalai-nova', 'version': '2024-02-05.8805', 'arch': 'nova-2'}, 'model_uuid': '39d6c502-03bf-4618-9cd1-dac1b89d663b'}, 'from_finalize': False}
[BACKEND] LiveResultResponse: {
[BACKEND]     "channel": {
[BACKEND]         "alternatives": [
[BACKEND]             {
[BACKEND]                 "transcript": "Hello, world.",
[BACKEND]                 "confidence": 0.98413086,
[BACKEND]                 "words": [
[BACKEND]                     {
[BACKEND]                         "word": "hello",
[BACKEND]                         "start": 0.08,
[BACKEND]                         "end": 0.56,
[BACKEND]                         "confidence": 0.8967285,
[BACKEND]                         "punctuated_word": "Hello,"
[BACKEND]                     },
[BACKEND]                     {
[BACKEND]                         "word": "world",
[BACKEND]                         "start": 0.56,
[BACKEND]                         "end": 1.0079376,
[BACKEND]                         "confidence": 0.98413086,
[BACKEND]                         "punctuated_word": "world."
[BACKEND]                     }
[BACKEND]                 ]
[BACKEND]             }
[BACKEND]         ]
[BACKEND]     },
[BACKEND]     "metadata": {
[BACKEND]         "model_info": {
[BACKEND]             "name": "2-conversationalai-nova",
[BACKEND]             "version": "2024-02-05.8805",
[BACKEND]             "arch": "nova-2"
[BACKEND]         },
[BACKEND]         "request_id": "a5d9f4be-78a7-4fb0-b81d-c22aabaea6d5",
[BACKEND]         "model_uuid": "39d6c502-03bf-4618-9cd1-dac1b89d663b"
[BACKEND]     },
[BACKEND]     "type": "Results",
[BACKEND]     "channel_index": [
[BACKEND]         0,
[BACKEND]         1
[BACKEND]     ],
[BACKEND]     "duration": 1.0079376,
[BACKEND]     "start": 0.0,
[BACKEND]     "is_final": false,
[BACKEND]     "from_finalize": false,
[BACKEND]     "speech_final": false
[BACKEND] }
[BACKEND] AsyncListenWebSocketClient._emit ENTER
[BACKEND] callback handlers for: Results
[BACKEND] after running thread: MainThread
[BACKEND] after running thread: ddtrace.internal.telemetry.writer:TelemetryWriter
[BACKEND] number of active threads: 2
[BACKEND] waiting for tasks to finish...
[BACKEND] async_client.py:_emit:238 -  AsyncListenWebSocketClient._emit ENTER
[BACKEND] async_client.py:_emit:239 -  callback handlers for: Results
[BACKEND] async_client.py:_emit:243 -  after running thread: MainThread
[BACKEND] async_client.py:_emit:243 -  after running thread: ddtrace.internal.telemetry.writer:TelemetryWriter
[BACKEND] async_client.py:_emit:244 -  number of active threads: 2
[BACKEND] async_client.py:_emit:252 -  waiting for tasks to finish...
[BACKEND] after running thread: MainThread
[BACKEND] after running thread: ddtrace.internal.telemetry.writer:TelemetryWriter
[BACKEND] number of active threads: 2
[BACKEND] async_client.py:_emit:258 -  after running thread: MainThread
[BACKEND] async_client.py:_emit:258 -  after running thread: ddtrace.internal.telemetry.writer:TelemetryWriter
[BACKEND] async_client.py:_emit:259 -  number of active threads: 2
[BACKEND] async_client.py:_emit:261 -  AsyncListenWebSocketClient._emit LEAVE
[BACKEND] AsyncListenWebSocketClient._emit LEAVE
[BACKEND] response_type: Results, data: {'type': 'Results', 'channel_index': [0, 1], 'duration': 1.22, 'start': 0.0, 'is_final': True, 'speech_final': True, 'channel': {'alternatives': [{'transcript': 'Hello, world.', 'confidence': 0.99072266, 'words': [{'word': 'hello', 'start': 0.0, 'end': 0.5, 'confidence': 0.88793945, 'punctuated_word': 'Hello,'}, {'word': 'world', 'start': 0.61, 'end': 1.11, 'confidence': 0.99072266, 'punctuated_word': 'world.'}]}]}, 'metadata': {'request_id': 'a5d9f4be-78a7-4fb0-b81d-c22aabaea6d5', 'model_info': {'name': '2-conversationalai-nova', 'version': '2024-02-05.8805', 'arch': 'nova-2'}, 'model_uuid': '39d6c502-03bf-4618-9cd1-dac1b89d663b'}, 'from_finalize': False}
[BACKEND] async_client.py:_listening:290 -  response_type: Results, data: {'type': 'Results', 'channel_index': [0, 1], 'duration': 1.22, 'start': 0.0, 'is_final': True, 'speech_final': True, 'channel': {'alternatives': [{'transcript': 'Hello, world.', 'confidence': 0.99072266, 'words': [{'word': 'hello', 'start': 0.0, 'end': 0.5, 'confidence': 0.88793945, 'punctuated_word': 'Hello,'}, {'word': 'world', 'start': 0.61, 'end': 1.11, 'confidence': 0.99072266, 'punctuated_word': 'world.'}]}]}, 'metadata': {'request_id': 'a5d9f4be-78a7-4fb0-b81d-c22aabaea6d5', 'model_info': {'name': '2-conversationalai-nova', 'version': '2024-02-05.8805', 'arch': 'nova-2'}, 'model_uuid': '39d6c502-03bf-4618-9cd1-dac1b89d663b'}, 'from_finalize': False}
[BACKEND] LiveResultResponse: {
[BACKEND]     "channel": {
[BACKEND]         "alternatives": [
[BACKEND]             {
[BACKEND]                 "transcript": "Hello, world.",
[BACKEND]                 "confidence": 0.99072266,
[BACKEND]                 "words": [
[BACKEND]                     {
[BACKEND]                         "word": "hello",
[BACKEND]                         "start": 0.0,
[BACKEND]                         "end": 0.5,
[BACKEND]                         "confidence": 0.88793945,
[BACKEND]                         "punctuated_word": "Hello,"
[BACKEND]                     },
[BACKEND]                     {
[BACKEND]                         "word": "world",
[BACKEND]                         "start": 0.61,
[BACKEND]                         "end": 1.11,
[BACKEND]                         "confidence": 0.99072266,
[BACKEND]                         "punctuated_word": "world."
[BACKEND]                     }
[BACKEND]                 ]
[BACKEND]             }
[BACKEND]         ]
[BACKEND]     },
[BACKEND]     "metadata": {
[BACKEND]         "model_info": {
[BACKEND]             "name": "2-conversationalai-nova",
[BACKEND]             "version": "2024-02-05.8805",
[BACKEND]             "arch": "nova-2"
[BACKEND]         },
[BACKEND]         "request_id": "a5d9f4be-78a7-4fb0-b81d-c22aabaea6d5",
[BACKEND]         "model_uuid": "39d6c502-03bf-4618-9cd1-dac1b89d663b"
[BACKEND]     },
[BACKEND]     "type": "Results",
[BACKEND]     "channel_index": [
[BACKEND]         0,
[BACKEND]         1
[BACKEND]     ],
[BACKEND]     "duration": 1.22,
[BACKEND]     "start": 0.0,
[BACKEND]     "is_final": true,
[BACKEND]     "from_finalize": false,
[BACKEND]     "speech_final": true
[BACKEND] }
[BACKEND] AsyncListenWebSocketClient._emit ENTER
[BACKEND] callback handlers for: Results
[BACKEND] after running thread: MainThread
[BACKEND] after running thread: ddtrace.internal.telemetry.writer:TelemetryWriter
[BACKEND] number of active threads: 2
[BACKEND] waiting for tasks to finish...
[BACKEND] async_client.py:_emit:238 -  AsyncListenWebSocketClient._emit ENTER
[BACKEND] async_client.py:_emit:239 -  callback handlers for: Results
[BACKEND] async_client.py:_emit:243 -  after running thread: MainThread
[BACKEND] async_client.py:_emit:243 -  after running thread: ddtrace.internal.telemetry.writer:TelemetryWriter
[BACKEND] async_client.py:_emit:244 -  number of active threads: 2
[BACKEND] async_client.py:_emit:252 -  waiting for tasks to finish...
[BACKEND] after running thread: MainThread
[BACKEND] async_client.py:_emit:258 -  after running thread: MainThread
[BACKEND] async_client.py:_emit:258 -  after running thread: ddtrace.internal.telemetry.writer:TelemetryWriter
[BACKEND] async_client.py:_emit:259 -  number of active threads: 2
[BACKEND] async_client.py:_emit:261 -  AsyncListenWebSocketClient._emit LEAVE
[BACKEND] after running thread: ddtrace.internal.telemetry.writer:TelemetryWriter
[BACKEND] number of active threads: 2
[BACKEND] AsyncListenWebSocketClient._emit LEAVE
[BACKEND] transcribe_handler.py:transcribe_handler:160 -  Calling deepgram finish
[BACKEND] async_client.py:finish:835 -  AsyncListenWebSocketClient.finish ENTER
[BACKEND] AsyncListenWebSocketClient.finish ENTER
[BACKEND] closing socket...
[BACKEND] send CloseStream...
[BACKEND] AsyncListenWebSocketClient._emit ENTER
[BACKEND] callback handlers for: Close
[BACKEND] after running thread: MainThread
[BACKEND] async_client.py:_emit:238 -  AsyncListenWebSocketClient._emit ENTER
[BACKEND] async_client.py:_emit:239 -  callback handlers for: Close
[BACKEND] async_client.py:_emit:243 -  after running thread: MainThread
[BACKEND] async_client.py:_emit:243 -  after running thread: ddtrace.internal.telemetry.writer:TelemetryWriter
[BACKEND] async_client.py:_emit:244 -  number of active threads: 2
[BACKEND] async_client.py:_emit:252 -  waiting for tasks to finish...
[BACKEND] after running thread: ddtrace.internal.telemetry.writer:TelemetryWriter
[BACKEND] number of active threads: 2
[BACKEND] waiting for tasks to finish...
[BACKEND] transcribe_handler.py:on_close:106 -  Closing deepgram connection: {
[BACKEND]     "type": "Close"
[BACKEND] }
[BACKEND] after running thread: MainThread
[BACKEND] after running thread: ddtrace.internal.telemetry.writer:TelemetryWriter
[BACKEND] number of active threads: 2
[BACKEND] AsyncListenWebSocketClient._emit LEAVE
[BACKEND] async_client.py:_emit:258 -  after running thread: MainThread
[BACKEND] async_client.py:_emit:258 -  after running thread: ddtrace.internal.telemetry.writer:TelemetryWriter
[BACKEND] async_client.py:_emit:259 -  number of active threads: 2
[BACKEND] async_client.py:_emit:261 -  AsyncListenWebSocketClient._emit LEAVE
[BACKEND] response_type: Results, data: {'type': 'Results', 'channel_index': [0, 1], 'duration': 0.09993744, 'start': 1.22, 'is_final': True, 'speech_final': True, 'channel': {'alternatives': [{'transcript': '', 'confidence': 0.0, 'words': []}]}, 'metadata': {'request_id': 'a5d9f4be-78a7-4fb0-b81d-c22aabaea6d5', 'model_info': {'name': '2-conversationalai-nova', 'version': '2024-02-05.8805', 'arch': 'nova-2'}, 'model_uuid': '39d6c502-03bf-4618-9cd1-dac1b89d663b'}, 'from_finalize': False}
[BACKEND] async_client.py:_listening:290 -  response_type: Results, data: {'type': 'Results', 'channel_index': [0, 1], 'duration': 0.09993744, 'start': 1.22, 'is_final': True, 'speech_final': True, 'channel': {'alternatives': [{'transcript': '', 'confidence': 0.0, 'words': []}]}, 'metadata': {'request_id': 'a5d9f4be-78a7-4fb0-b81d-c22aabaea6d5', 'model_info': {'name': '2-conversationalai-nova', 'version': '2024-02-05.8805', 'arch': 'nova-2'}, 'model_uuid': '39d6c502-03bf-4618-9cd1-dac1b89d663b'}, 'from_finalize': False}
[BACKEND] LiveResultResponse: {
[BACKEND]     "channel": {
[BACKEND]         "alternatives": [
[BACKEND]             {
[BACKEND]                 "transcript": "",
[BACKEND]                 "confidence": 0.0,
[BACKEND]                 "words": []
[BACKEND]             }
[BACKEND]         ]
[BACKEND]     },
[BACKEND]     "metadata": {
[BACKEND]         "model_info": {
[BACKEND]             "name": "2-conversationalai-nova",
[BACKEND]             "version": "2024-02-05.8805",
[BACKEND]             "arch": "nova-2"
[BACKEND]         },
[BACKEND]         "request_id": "a5d9f4be-78a7-4fb0-b81d-c22aabaea6d5",
[BACKEND]         "model_uuid": "39d6c502-03bf-4618-9cd1-dac1b89d663b"
[BACKEND]     },
[BACKEND]     "type": "Results",
[BACKEND]     "channel_index": [
[BACKEND]         0,
[BACKEND]         1
[BACKEND]     ],
[BACKEND]     "duration": 0.09993744,
[BACKEND]     "start": 1.22,
[BACKEND]     "is_final": true,
[BACKEND]     "from_finalize": false,
[BACKEND]     "speech_final": true
[BACKEND] }
[BACKEND] AsyncListenWebSocketClient._emit ENTER
[BACKEND] callback handlers for: Results
[BACKEND] after running thread: MainThread
[BACKEND] after running thread: ddtrace.internal.telemetry.writer:TelemetryWriter
[BACKEND] number of active threads: 2
[BACKEND] async_client.py:_emit:238 -  AsyncListenWebSocketClient._emit ENTER
[BACKEND] async_client.py:_emit:239 -  callback handlers for: Results
[BACKEND] async_client.py:_emit:243 -  after running thread: MainThread
[BACKEND] async_client.py:_emit:243 -  after running thread: ddtrace.internal.telemetry.writer:TelemetryWriter
[BACKEND] async_client.py:_emit:244 -  number of active threads: 2
[BACKEND] async_client.py:_emit:252 -  waiting for tasks to finish...
[BACKEND] waiting for tasks to finish...
[BACKEND] after running thread: MainThread
[BACKEND] after running thread: ddtrace.internal.telemetry.writer:TelemetryWriter
[BACKEND] number of active threads: 2
[BACKEND] AsyncListenWebSocketClient._emit LEAVE
[BACKEND] async_client.py:_emit:258 -  after running thread: MainThread
[BACKEND] async_client.py:_emit:258 -  after running thread: ddtrace.internal.telemetry.writer:TelemetryWriter
[BACKEND] async_client.py:_emit:259 -  number of active threads: 2
[BACKEND] async_client.py:_emit:261 -  AsyncListenWebSocketClient._emit LEAVE
[BACKEND] async_client.py:_listening:290 -  response_type: Metadata, data: {'type': 'Metadata', 'transaction_key': 'deprecated', 'request_id': 'a5d9f4be-78a7-4fb0-b81d-c22aabaea6d5', 'sha256': 'f8945f627720d2e26f8598099ccbfff150707d499113c96e4535b6c12b851524', 'created': '2024-07-30T16:25:33.167Z', 'duration': 1.3199375, 'channels': 1, 'models': ['39d6c502-03bf-4618-9cd1-dac1b89d663b', '824ec1be-4ade-4293-b728-9c7c5faf1976'], 'model_info': {'39d6c502-03bf-4618-9cd1-dac1b89d663b': {'name': '2-conversationalai-nova', 'version': '2024-02-05.8805', 'arch': 'nova-2'}, '824ec1be-4ade-4293-b728-9c7c5faf1976': {'name': 'phoneme', 'version': '2022-05-18.0', 'arch': 'base'}}}
[BACKEND] response_type: Metadata, data: {'type': 'Metadata', 'transaction_key': 'deprecated', 'request_id': 'a5d9f4be-78a7-4fb0-b81d-c22aabaea6d5', 'sha256': 'f8945f627720d2e26f8598099ccbfff150707d499113c96e4535b6c12b851524', 'created': '2024-07-30T16:25:33.167Z', 'duration': 1.3199375, 'channels': 1, 'models': ['39d6c502-03bf-4618-9cd1-dac1b89d663b', '824ec1be-4ade-4293-b728-9c7c5faf1976'], 'model_info': {'39d6c502-03bf-4618-9cd1-dac1b89d663b': {'name': '2-conversationalai-nova', 'version': '2024-02-05.8805', 'arch': 'nova-2'}, '824ec1be-4ade-4293-b728-9c7c5faf1976': {'name': 'phoneme', 'version': '2022-05-18.0', 'arch': 'base'}}}
[BACKEND] MetadataResponse: {
[BACKEND]     "type": "Metadata",
[BACKEND]     "transaction_key": "deprecated",
[BACKEND]     "request_id": "a5d9f4be-78a7-4fb0-b81d-c22aabaea6d5",
[BACKEND]     "sha256": "f8945f627720d2e26f8598099ccbfff150707d499113c96e4535b6c12b851524",
[BACKEND]     "created": "2024-07-30T16:25:33.167Z",
[BACKEND]     "duration": 1.3199375,
[BACKEND]     "channels": 1,
[BACKEND]     "models": [
[BACKEND]         "39d6c502-03bf-4618-9cd1-dac1b89d663b",
[BACKEND]         "824ec1be-4ade-4293-b728-9c7c5faf1976"
[BACKEND]     ],
[BACKEND]     "model_info": {
[BACKEND]         "39d6c502-03bf-4618-9cd1-dac1b89d663b": {
[BACKEND]             "name": "2-conversationalai-nova",
[BACKEND]             "version": "2024-02-05.8805",
[BACKEND]             "arch": "nova-2"
[BACKEND]         },
[BACKEND]         "824ec1be-4ade-4293-b728-9c7c5faf1976": {
[BACKEND]             "name": "phoneme",
[BACKEND]             "version": "2022-05-18.0",
[BACKEND]             "arch": "base"
[BACKEND]         }
[BACKEND]     }
[BACKEND] }
[BACKEND] AsyncListenWebSocketClient._emit ENTER
[BACKEND] callback handlers for: Metadata
[BACKEND] after running thread: MainThread
[BACKEND] after running thread: ddtrace.internal.telemetry.writer:TelemetryWriter
[BACKEND] number of active threads: 2
[BACKEND] async_client.py:_emit:238 -  AsyncListenWebSocketClient._emit ENTER
[BACKEND] async_client.py:_emit:239 -  callback handlers for: Metadata
[BACKEND] async_client.py:_emit:243 -  after running thread: MainThread
[BACKEND] async_client.py:_emit:243 -  after running thread: ddtrace.internal.telemetry.writer:TelemetryWriter
[BACKEND] async_client.py:_emit:244 -  number of active threads: 2
[BACKEND] async_client.py:_emit:258 -  after running thread: MainThread
[BACKEND] async_client.py:_emit:258 -  after running thread: ddtrace.internal.telemetry.writer:TelemetryWriter
[BACKEND] after running thread: MainThread
[BACKEND] after running thread: ddtrace.internal.telemetry.writer:TelemetryWriter
[BACKEND] number of active threads: 2
[BACKEND] AsyncListenWebSocketClient._emit LEAVE
[BACKEND] async_client.py:_emit:259 -  number of active threads: 2
[BACKEND] async_client.py:_emit:261 -  AsyncListenWebSocketClient._emit LEAVE
[BACKEND] _listening(1000) exiting gracefully
[BACKEND] AsyncListenWebSocketClient._listening LEAVE
[BACKEND] async_client.py:_listening:388 -  AsyncListenWebSocketClient._listening LEAVE
[BACKEND] Sending KeepAlive...
[BACKEND] send() exiting gracefully: 1000
[BACKEND] AsyncListenWebSocketClient.send LEAVE
[BACKEND] keep_alive succeeded
[BACKEND] async_client.py:send:733 -  AsyncListenWebSocketClient.send LEAVE
[BACKEND] Sending KeepAlive...
[BACKEND] send() exiting gracefully: 1000
[BACKEND] AsyncListenWebSocketClient.send LEAVE
[BACKEND] keep_alive succeeded
[BACKEND] async_client.py:send:733 -  AsyncListenWebSocketClient.send LEAVE
[BACKEND] Sending KeepAlive...
[BACKEND] send() exiting gracefully: 1000
[BACKEND] AsyncListenWebSocketClient.send LEAVE
[BACKEND] keep_alive succeeded
[BACKEND] async_client.py:send:733 -  AsyncListenWebSocketClient.send LEAVE
[BACKEND] Sending KeepAlive...
[BACKEND] send() exiting gracefully: 1000
[BACKEND] AsyncListenWebSocketClient.send LEAVE
[BACKEND] keep_alive succeeded
[BACKEND] async_client.py:send:733 -  AsyncListenWebSocketClient.send LEAVE
[BACKEND] Sending KeepAlive...
[BACKEND] send() exiting gracefully: 1000
[BACKEND] AsyncListenWebSocketClient.send LEAVE
[BACKEND] keep_alive succeeded
yurongchengather commented 1 month ago

@dvonthenen Thank you so much for the help! I confirm that the on_message is called. And the problem resides in my code, not a deepgram bug.