googleapis / google-cloud-python

Google Cloud Client Library for Python
https://googleapis.github.io/google-cloud-python/
Apache License 2.0
4.79k stars 1.52k forks source link

400 Audio Timeout Error: Long duration elapsed without audio. Audio should be sent close to real time. #11099

Open 29swastik opened 1 year ago

29swastik commented 1 year ago

I'm trying out google's speech-to-text with streaming audio input.

I've a simple JS code that code record audio on button press and send the audio to fastapi backend using websockets. In fastapi backend I'm receiving audio bytes using webscoket and storing it in chunks and sending them to google speech-to-text stream recognition api and getting following error.

I've also verified incoming audio bytes by saving into file and when I played the saved audio file it is working

Error

Traceback (most recent call last):
  File "/Users/swastikn/Desktop/speech_to_text/venv/lib/python3.7/site-packages/google/api_core/grpc_helpers.py", line 112, in __next__
    return next(self._wrapped)
  File "/Users/swastikn/Desktop/speech_to_text/venv/lib/python3.7/site-packages/grpc/_channel.py", line 426, in __next__
    return self._next()
  File "/Users/swastikn/Desktop/speech_to_text/venv/lib/python3.7/site-packages/grpc/_channel.py", line 809, in _next
    raise self
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
    status = StatusCode.OUT_OF_RANGE
    details = "Audio Timeout Error: Long duration elapsed without audio. Audio should be sent close to real time."
    debug_error_string = "{"created":"@1675100066.815164000","description":"Error received from peer ipv4:142.250.67.42:443","file":"src/core/lib/surface/call.cc","file_line":1075,"grpc_message":"Audio Timeout Error: Long duration elapsed without audio. Audio should be sent close to real time.","grpc_status":11}"
>

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/swastikn/Desktop/speech_to_text/sst/temp.py", line 199, in stream_transcript
    listen_print_loop(responses)
  File "/Users/swastikn/Desktop/speech_to_text/sst/temp.py", line 58, in listen_print_loop
    for response in responses:
  File "/Users/swastikn/Desktop/speech_to_text/venv/lib/python3.7/site-packages/google/api_core/grpc_helpers.py", line 115, in __next__
    raise exceptions.from_grpc_error(exc) from exc
google.api_core.exceptions.OutOfRange: 400 Audio Timeout Error: Long duration elapsed without audio. Audio should be sent close to real time.

Code

import base64
import json
import os
import re
import signal
import sys
import threading
import time
import traceback
from typing import List

import uvicorn
from fastapi import FastAPI, WebSocket
from google.cloud import speech_v1p1beta1 as speech
from six.moves import queue
from starlette.websockets import WebSocketDisconnect

app = FastAPI()
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = "creds.json"

class ConnectionManager:
    def __init__(self):
        self.active_connections: List[WebSocket] = []

    async def connect(self, websocket: WebSocket):
        await websocket.accept()
        self.active_connections.append(websocket)

    def disconnect(self, websocket: WebSocket):
        self.active_connections.remove(websocket)

    async def send_personal_message(self, message: str, websocket: WebSocket):
        await websocket.send_text(message)

    async def broadcast(self, message: str):
        for connection in self.active_connections:
            await connection.send_text(message)

def signal_handler(sig, frame):
    sys.exit(0)

def listen_print_loop(responses):
    """Iterates through server responses and prints them.
    The responses passed is a generator that will block until a response
    is provided by the server.
    Each response may contain multiple results, and each result may contain
    multiple alternatives; for details, see https://goo.gl/tjCPAU.  Here we
    print only the transcription for the top alternative of the top result.
    In this case, responses are provided for interim results as well. If the
    response is an interim one, print a line feed at the end of it, to allow
    the next result to overwrite it, until the response is a final one. For the
    final one, print a newline to preserve the finalized transcription.
    """
    num_chars_printed = 0
    for response in responses:
        if not response.results:
            continue

        # The `results` list is consecutive. For streaming, we only care about
        # the first result being considered, since once it's `is_final`, it
        # moves on to considering the next utterance.
        result = response.results[0]
        if not result.alternatives:
            continue

        # Display the transcription of the top alternative.
        transcript = result.alternatives[0].transcript

        # Display interim results, but with a carriage return at the end of the
        # line, so subsequent lines will overwrite them.
        #
        # If the previous result was longer than this one, we need to print
        # some extra spaces to overwrite the previous result
        overwrite_chars = " " * (num_chars_printed - len(transcript))

        if not result.is_final:
            sys.stdout.write(transcript + overwrite_chars + "\r")
            sys.stdout.flush()

            num_chars_printed = len(transcript)

        else:
            print(transcript + overwrite_chars)

            # Exit recognition if any of the transcribed phrases could be
            # one of our keywords.
            if re.search(r"\b(exit|quit)\b", transcript, re.I):
                print("Exiting..")
                break

            num_chars_printed = 0

class Stream(object):
    """Opens a recording stream as a generator yielding the audio chunks."""

    def __init__(self, rate, chunk):
        self._rate = rate
        self._chunk = chunk

        # Create a thread-safe buffer of audio data
        self.buff = queue.Queue()
        self.closed = True

    def __enter__(self):
        self.closed = False

        return self

    def __exit__(self, type, value, traceback):
        self.closed = True
        # Signal the generator to terminate so that the client's
        # streaming_recognize method will not block the process termination.
        self.buff.put(None)

    def fill_buffer(self, in_data):
        """Continuously collect data from the audio stream, into the buffer."""
        self.buff.put(in_data)
        return self

    def generator(self):
        while True:
            # Use a blocking get() to ensure there's at least one chunk of
            # data, and stop iteration if the chunk is None, indicating the
            # end of the audio stream.
            chunk = self.buff.get()
            if chunk is None:
                return
            data = [chunk]

            # Now consume whatever other data's still buffered.
            while True:
                try:
                    chunk = self.buff.get(block=False)
                    if chunk is None:
                        return
                    data.append(chunk)
                except queue.Empty:
                    break

            yield b"".join(data)

@app.websocket('/stt')
async def stt(ws: WebSocket):
    await manager.connect(ws)
    print("Connection accepted")
    # A lot of messages will be sent rapidly. We'll stop showing after the first one.
    message_count = 0
    run = True
    while run:
        try:
            message = await ws.receive_text()
            if message is None:
                print("No message received...")
                continue

            # Messages are a JSON encoded string
            data = json.loads(message)

            # Using the event type you can determine what type of message you are receiving
            if data['event'] == "connected":
                print("Connected Message received: {}".format(message))
            if data['event'] == "start":
                print("Start Message received: {}".format(message))
            if data['event'] == "media":
                payload = data['media']['payload']
                chunk = base64.b64decode(payload)

                ## just to verify that we are getting proper audio I'm saving audio bytes as audio file
                with open("/Users/swastikn/Desktop/song.webm", mode="bx") as f:
                    f.write(chunk)
                 ## and I was able to play the audio file using saved file

                stream.fill_buffer(chunk)
            if data['event'] == "mark":
                print("Mark Message received: {}".format(message))
            if data['event'] == "stop":
                print("Stop Message received: {}".format(message))
                break
            message_count += 1
        except WebSocketDisconnect as e:
            run = False
            print(e)

    print("Connection closed. Received a total of {} messages".format(message_count))

def stream_transcript():
    run = True
    while run:
        audio_generator = stream.generator()
        try:
            requests = (
                speech.StreamingRecognizeRequest(audio_content=content)
                for content in audio_generator
            )
            responses = client.streaming_recognize(streaming_config, requests)
            # Now, put the transcription responses to use.
            listen_print_loop(responses)
        except WebSocketDisconnect as e:
            run = False
            print(e)
        except Exception:
            traceback.print_exception(*sys.exc_info())

        time.sleep(5)

if __name__ == '__main__':
    # Audio recording parameters
    RATE = 8000
    CHUNK = int(RATE / 100)  # 100ms
    language_code = "en-IN"  # a BCP-47 language tag

    client = speech.SpeechClient()
    config = speech.RecognitionConfig(
        encoding=speech.RecognitionConfig.AudioEncoding.LINEAR16,
        sample_rate_hertz=RATE,
        language_code=language_code,
        enable_speaker_diarization=True,
    )

    streaming_config = speech.StreamingRecognitionConfig(
        config=config, interim_results=True
    )
    manager = ConnectionManager()
    stream = Stream(RATE, CHUNK)
    t1 = threading.Thread(target=stream_transcript)
    t1.daemon = True
    t1.start()

    signal.signal(signal.SIGINT, signal_handler)

    print("Server listening on: http://localhost:" + str(8000))
    print("Route for stt: http://localhost:" + str(8000) + '/stt')

    uvicorn.run(app=app)

JS code to record audio

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta http-equiv="X-UA-Compatible" content="IE=edge">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>IVR</title>
</head>
<body>
    <button id="start-recording">Start Recording</button>
    <button id="stop-recording" disabled>Stop Recording</button>
</body>
  <script src="app.js"></script>
</html>

---------

const recordButton = document.getElementById("start-recording");
const stopButton = document.getElementById("stop-recording");

let mediaRecorder;
let chunks = [];

const handleSuccess = (stream) => {
  mediaRecorder = new MediaRecorder(stream);

  mediaRecorder.start();
  console.log(mediaRecorder.state);
  console.log("recorder started");
  recordButton.disabled = true;
  stopButton.disabled = false;

  mediaRecorder.ondataavailable = (e) => {
    chunks.push(e.data);
  };

  mediaRecorder.onstop = (e) => {
    const audioBlob = new Blob(chunks, { type: "audio/wav" });
    chunks = [];
    sendDataToWebsocket(audioBlob);
  };
};

const handleError = (error) => {
  console.log("navigator.getUserMedia error: ", error);
};

recordButton.addEventListener("click", () => {
  navigator.mediaDevices
    .getUserMedia({ audio: true })
    .then(handleSuccess)
    .catch(handleError);
});

stopButton.addEventListener("click", () => {
  mediaRecorder.stop();
  recordButton.disabled = false;
  stopButton.disabled = true;
});

const sendDataToWebsocket = (audioBlob) => {
  const socket = new WebSocket("ws://localhost:8000/stt");
  socket.binaryType = "arraybuffer";

const reader = new FileReader();
var payload;
reader.readAsArrayBuffer(audioBlob);
reader.onloadend = function() {
  var binaryString = '';
  var bytes = new Uint8Array(reader.result);
  for (var i = 0; i < bytes.byteLength; i++) {
    binaryString += String.fromCharCode(bytes[i]);
  }
  payload = window.btoa(binaryString);
}

  socket.onopen = () => {

    data = {'event': 'media', 'stream_sid': 7, 'media': {'payload': payload}}
    console.log(data)
    console.log(JSON.stringify(data));
    socket.send(JSON.stringify(data));
    console.log("Data sent to websocket");
  };

  socket.onclose = (event) => {
    console.log("Websocket closed", event);
  };

  socket.onerror = (error) => {
    console.log("Websocket error", error);
  };
};

Note

  1. python version - 3.7
  2. google-cloud-core==2.2.1
  3. google-api-core==2.11.0
  4. google-api-python-client==2.34.0
parthea commented 1 year ago

I'm going to transfer this issue to the google-cloud-python repository as we are preparing to move the code for google-cloud-speech to that repository in the next 1-2 weeks.

goyal15rajat commented 10 months ago

Is there any update on this issue?