pipecat-ai / pipecat

Open Source framework for voice and multimodal conversational AI
BSD 2-Clause "Simplified" License
3.07k stars 259 forks source link

Bot stops responding randomly - No frames not emitted by deepgram STT #455

Open nisalr opened 1 week ago

nisalr commented 1 week ago

I'm using the Twilio bot with deepgram and playHT. Occasionally the bot suddenly stops responding because the deepgram STT doesn't emit frames to be sent to the LLM. I'm not able to reliably reproduce this and it seems somewhat random. Is anyone else facing this issue?

richtermb commented 1 week ago

Yes I am. It might be because asynclive is now deprecated in favor of asyncwebsocket according to Deepgram's SDK (see the constructor for DeepgramSTTService)

Edit: Even after modifying deepgram.py I am still getting the same error. This is most likely on Deepgram's end

ConnectionClosed in AsyncListenWebSocketClient._listening with code CloseCode.ABNORMAL_CLOSURE:

richtermb commented 1 week ago

FWIW I'm also having experiencing trouble reproducing reliably, but I was able to see these errors in the logs when the bug resurfaced-- You can see that there is no dialogue after. I would guess it happens <10% of the time.


ConnectionClosed in AsyncListenWebSocketClient._listening with code 1006: 
send() failed - ConnectionClosed: no close frame received or sent
send() failed - ConnectionClosed: no close frame received or sent
send() failed - ConnectionClosed: no close frame received or sent
send() failed - ConnectionClosed: no close frame received or sent
send() failed - ConnectionClosed: no close frame received or sent
send() failed - ConnectionClosed: no close frame received or sent
send() failed - ConnectionClosed: no close frame received or sent
send() failed - ConnectionClosed: no close frame received or sent
send() failed - ConnectionClosed: no close frame received or sent
send() failed - ConnectionClosed: no close frame received or sent
send() failed - ConnectionClosed: no close frame received or sent
send() failed - ConnectionClosed: no close frame received or sent
send() failed - ConnectionClosed: no close frame received or sent
send() failed - ConnectionClosed: no close frame received or sent
send() failed - ConnectionClosed: no close frame received or sent
send() failed - ConnectionClosed: no close frame received or sent
send() failed - ConnectionClosed: no close frame received or sent
send() failed - ConnectionClosed: no close frame received or sent
send() failed - ConnectionClosed: no close frame received or sent
send() failed - ConnectionClosed: no close frame received or sent
send() failed - ConnectionClosed: no close frame received or sent
send() failed - ConnectionClosed: no close frame received or sent
send() failed - ConnectionClosed: no close frame received or sent
send() failed - ConnectionClosed: no close frame received or sent
send() failed - ConnectionClosed: no close frame received or sent
send() failed - ConnectionClosed: no close frame received or sent
send() failed - ConnectionClosed: no close frame received or sent
2024-09-14 17:07:31.287 | DEBUG    | pipecat.transports.base_input:_handle_interruptions:172 - User started speaking
2024-09-14 17:07:36.248 | DEBUG    | pipecat.transports.base_input:_handle_interruptions:175 - User stopped speaking
2024-09-14 17:07:40.808 | DEBUG    | pipecat.transports.base_input:_handle_interruptions:172 - User started speaking
2024-09-14 17:07:44.388 | DEBUG    | pipecat.transports.base_input:_handle_interruptions:175 - User stopped speaking
2024-09-14 17:07:45.045 | DEBUG    | pipecat.transports.base_input:_handle_interruptions:172 - User started speaking
2024-09-14 17:07:50.145 | DEBUG    | pipecat.transports.base_input:_handle_interruptions:175 - User stopped speaking
2024-09-14 17:07:53.824 | DEBUG    | pipecat.transports.base_input:_handle_interruptions:172 - User started speaking
2024-09-14 17:07:57.304 | DEBUG    | pipecat.transports.base_input:_handle_interruptions:175 - User stopped speaking
nisalr commented 6 days ago

This is very similar to what I'm facing. I also think it's a Deepgram related issue, but I haven't found a way to fix it

aconchillo commented 5 days ago

We'll have to reconnect if we detect a network issue.

richtermb commented 2 days ago

@nisalr @aconchillo I think I found the issue, it has to do with this snippet in deepgram.py:

        if len(transcript) > 0:
            if is_final:
                await self.queue_frame(
                    TranscriptionFrame(transcript, "", time_now_iso8601())
                )
            else:
                await self.queue_frame(
                    InterimTranscriptionFrame(transcript, "", time_now_iso8601())
                )

After looking at Deepgram's verbose logs I have a suspicion that the is_final flag is the issue-- sometimes Deepgram's API will return a LiveResultResponse instance with populated text but is_final is False. Therefore the content is not registered by the LLMUserResponseAggregator (which is the next processor in my pipeline), as it only registers text attached to instances of TranscriptionFrames and not InterimTranscriptionFrames. If there user is silent after this occurrence, _aggregation will be an empty string and it will not be pushed.

nisalr commented 2 days ago

@richtermb thanks for the update. Any ideas on how to fix the issue? I feel it happens when the network connection with Deepgram is not great. According to the deepgram docs, the is_final flag should be sent when the transcription is complete. I've also been looking into this but haven't found a way to solve it yet.

richtermb commented 2 days ago

@nisalr unfortunately I can't say for sure that mine is the issue you're facing; in fact if's you think it's related to network connection then it's pretty much out of your control besides adding logic that attempts to reconnect on failure. I think in my case the "fix" would be to track InterimTranscriptionFrames, and once a consecutive series of empty transcriptions comes in, cleverly stitch the interim frames to the existing transcription aggregate (you don't want any duplication).

In your situation I would modify deepgram.py and increase the verbosity of the logger until you can see LiveResultResponse instances as they flow in, and hope that you reproduce (the issue).

In my case, a LiveResultResponse that was populated with a word or two would come in, with is_final=False, so it would be propagated as an InterimTranscriptionFrame which is essentially ignored by LLMUserResponseAggregator. Then, the subsequent LiveResultResponse that came in a second later was an empty string as I had stopped speaking, with is_final=True. When the UserStoppedSpeakingFrame appeared shortly after, no completion was run which makes sense because the aggregation was still an empty string.

nisalr commented 1 day ago

@richtermb Did some more digging, and this is exactly what's happening to me as well. Occasionally, the deepgram interim result is not followed by the final transcript with the is_final flag set to True. And sometimes, deepgram doesn't send any result at all (not even an interim result), even when the user speaks.

nisalr commented 23 hours ago

hey @richtermb I think I found a solution to this. If a is_final result is not received within a certain amount of time (e.g. 0.5 seconds), you can send a Finalize message (https://developers.deepgram.com/docs/finalize) to finalize the transcript. Here's my code that works well based on a few tests

class DeepgramSTTServiceMod(DeepgramSTTService):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.finalize_after = 0.5
        self.timer_task = None
        print("Initializing timer")
        self.reset_timer()

    def reset_timer(self):
        if self.timer_task:
            self.timer_task.cancel()
        self.timer_task = asyncio.create_task(self.timer_coroutine())

    async def timer_coroutine(self):
        await asyncio.sleep(self.finalize_after)
        await self.send_finalize()

    async def send_finalize(self):
        await self._connection.send(json.dumps({"type": "Finalize"}))
        print(f"Finalize sent due to {self.finalize_after} seconds of silence")
        logger.info(f"Finalize sent due to {self.finalize_after} seconds of silence")

    async def _on_message(self, *args, **kwargs):
        result = kwargs["result"]
        is_final = result.is_final
        transcript = result.channel.alternatives[0].transcript
        if len(transcript) > 0:
            self.reset_timer()
            if is_final:
                await self.queue_frame(TranscriptionFrame(transcript, "", time_now_iso8601()))
            else:
                await self.queue_frame(InterimTranscriptionFrame(transcript, "", time_now_iso8601())