awslabs / amazon-transcribe-streaming-sdk

The Amazon Transcribe Streaming SDK is an async Python SDK for converting audio into text via Amazon Transcribe.
Apache License 2.0
142 stars 37 forks source link

Example for an asynchronous websocket server #27

Open moaazsidat opened 3 years ago

moaazsidat commented 3 years ago

Hey folks, thanks for putting this together as a useful interface to AWS transcribe.

I'm looking for an example that works with an async websocket server calling AWS transcribe. The problem I'm running into right now is that handler.handle_events() is a blocking call that prevents processing further messages – here's my code thus far with a FastAPI websocket app:

import json
import typing

from fastapi import FastAPI, WebSocket
from starlette.endpoints import WebSocketEndpoint

from amazon_transcribe.client import TranscribeStreamingClient
from amazon_transcribe.handlers import TranscriptResultStreamHandler
from amazon_transcribe.model import TranscriptEvent

import numpy as np
from scipy.io.wavfile import read, write

app = FastAPI()

"""
Here's an example of a custom event handler you can extend to
process the returned transcription results as needed. This
handler will simply print the text out to your interpreter.
"""
class MyEventHandler(TranscriptResultStreamHandler):
    async def handle_transcript_event(self, transcript_event: TranscriptEvent):
        # This handler can be implemented to handle transcriptions as needed.
        # Here's an example to get started.
        print("LOG: handler called")
        results = transcript_event.transcript.results
        for result in results:
            for alt in result.alternatives:
                print(f"TRANSCRIPT: {alt.transcript}")

@app.websocket_route("/media")
class Transcription(WebSocketEndpoint):

    async def on_connect(self, websocket: WebSocket) -> None:
        await websocket.accept()
        self.transcription_client = TranscribeStreamingClient(region="ca-central-1")
        self.stream = None
        self.handler = None
        self.chunks_np = np.array([])

    async def on_disconnect(self, websocket: WebSocket, close_code: int) -> None:
        print("LOG: disconnect: ")
        await self.stream.input_stream.end_stream()

    async def on_receive(self, websocket: WebSocket, data: typing.Any) -> None:
        message = json.loads(data)

        if message is not None:
            # print("LOG: message received: ", message)
            if self.stream is None and self.handler is None:
                self.stream = await self.transcription_client.start_stream_transcription(
                    language_code="en-US",
                    media_sample_rate_hz=44100,
                    media_encoding="pcm",
                )
                self.handler = MyEventHandler(self.stream.output_stream)

            chunk = message["chunk"]
            chunk_np = np.fromiter(chunk.values(), dtype=np.int16)

            self.chunks_np = np.append(self.chunks_np, chunk_np)

            chunk_bytes = chunk_np.tobytes()

            await self.stream.input_stream.send_audio_event(audio_chunk=chunk_bytes)
            await self.handler.handle_events()

Any guidance as to what's a good way to call handler.handle_events() in a non-blocking manner?

Have tried wrapping the handler.handle_events() in a while loop and invoking that via a background task and that runs into a similar blocking behaviour as well.

ajay960singh commented 2 years ago

Hey @moaazsidat , were you able to solve for this?

parikls commented 2 years ago

@ajay960singh handle_events must be scheduled as a task, e.g. asyncio.create_task(self.handler.handle_events())

Nithin745 commented 1 week ago

Hi, I'm trying something similar, not able to crack it. Did you got it working?