deepgram / deepgram-go-sdk

Go SDK for Deepgram's automated speech recognition APIs.
https://developers.deepgram.com
MIT License
31 stars 27 forks source link

Live Client - WS Receive Queue Build-up #244

Closed matthew-ceravolo closed 2 months ago

matthew-ceravolo commented 2 months ago

What is the current behavior?

"Step size" in Deepgram refers to the frequency in which interim results are returned. The default value used by Deepgram cloud-hosted is 1 second. However, when using Deepgram on-prem, "step size" can be lowered for more frequent results (i.e. for closer-to-realtime processing). When step size is lowered, to 500ms for example, the websocket will receive twice as many messages from the engine, and at 200ms, 5x as many messages, etc.

The problem is shown below, in the listen() method. The code assumes that we can just read every 500ms, but if a message is sent every 500ms (or less) then there is a gradual build-up of messages in the receive-end of the websocket as they are sent faster than they are being read.

For example, with step size = 200ms:

t = 0: no events
t = 200: message-0 SEND on websocket
t = 400: message-1 SEND on websocket
t = 500: message-0 READ on websocket
t = 600: message-2 SEND on websocket
t = 800: message-3 SEND on websocket
t = 1000: message-1 READ on websocket
t = 1000: message-4 SEND on websocket

After 1 second, message4 is sitting on the websocket waiting to be read, but we are still reading message-1, so we are already 600ms behind. This continues to build-up, until after just 10 seconds we are already 6 seconds behind.

This only affects the READs from the queue, so the audio is still being processed as expected, but it gradually takes longer and longer to be notified via the Message() callback that speech was recognized

pkg/client/live/client.go - line 288:

func (c *Client) listen() {
    klog.V(6).Infof("live.listen() ENTER\n")

    ticker := time.NewTicker(500 * time.Millisecond) <--- **This should be configurable**
    defer ticker.Stop()
    for {
        select {
        case <-c.ctx.Done():
            c.closeWs(false)
            klog.V(6).Infof("live.listen() Signal Exit\n")
            klog.V(6).Infof("live.listen() LEAVE\n")
            return
        case <-ticker.C:
            ws := c.internalConnect()
            if ws == nil {
                klog.V(3).Infof("listen: Connection is not valid\n")
                klog.V(6).Infof("live.listen() LEAVE\n")
                return
            }

            msgType, byMsg, err := ws.ReadMessage()

Steps to reproduce

Try a step size of 200ms to quickly see the problem

Expected behavior

Messages should be read from the queue more quickly.

The solution is to provide an environment variable like DEEPGRAM_STEP_SIZE that can be altered to modify the frequency at which this websocket is read from

Please tell us about your environment

Ubuntu 20.04 Go 1.21.5 Deepgram SDK: latest

dvonthenen commented 2 months ago

We have been trying to track down a threading problem in the latest minor release, which I believe is now fixed as of today. We are going to restore the loop that drains the queue. We should see a release of that tonight or tomorrow morning.

We will also drop that timer value as well in this next patch release.

For right now the 1.2 is the most stable at the moment.

dvonthenen commented 2 months ago

Just posted the last PR for this patch release. This should be released soon.

matthew-ceravolo commented 2 months ago

@dvonthenen

(bearing in mind that I'm not an expert by any means in this codebase) looking at the PR - it looks like the change is to move it from reading every 500ms to every 250ms, which wouldn't be fast enough still for us. we're trying to get updates from our on-prem model every 200ms for realtime interruptions and the like. could we make the value configurable instead? similarly, for cloud-hosted flows, hitting it 4x per second is probably too often since the step size isn't that low.

also, reads and writes can safely happen at the same time on gorilla/websockets (i.e. one read and one write, not multiple reads and writes). adding the muConn.Lock() in the listen() flow will block writes (which are happening every 20ms) while trying to get a read ready since the Client shares the same mutex for all flows. it might be worth separating the mutexes for setting up the connection vs using it for reading/writing

dvonthenen commented 2 months ago

hi @matthew-ceravolo

It turns out that the timer loop doesn't mean anything. After restoring the for loop to drain the queue, the thread never leaves the inner for loop because the read call is blocking. It's deadcode. The only thing the timer contributes at this point is as an initial delay of 250ms which isn't necessary. All of the timer stuff (just in listen) can be removed now.

the lock on the listen thread isn't for read and write, it's to access the wsconn object safely in conjunction with read/write. I had realized the main issue with the code is that we separated the "check if the wsconn object is valid" and the "read/write". The root cause of the issue I was struggling with was there was a small amount of time when the wsconn object would get closed but already be in flight so the 'is valid" check would have occurred (thread 1), the socket was closed on a different thread (thread 2), and then a write/read would happen on a dead/nil socket (on thread 1).

after testing, I am pretty sure this has been resolved now.

dvonthenen commented 2 months ago

I just released this here, please give it a try: https://github.com/deepgram/deepgram-go-sdk/releases/tag/v1.3.6

If there are any other concerns, let's jump on a Discord session to discuss so we aren't playing phone tag.

matthew-ceravolo commented 2 months ago

ah, I see the new infinite loop on read now - beautiful. and makes sense on the mutexes. Will give it a try this morning! Thanks for the quick fix again 😄

dvonthenen commented 2 months ago

I will be removing the timer/ticker on a subsequent release. I just don't want to vary the code so much. Looking for stable first and then clean/refactor later.

This should be stable at this point now.