Azure-Samples / cognitive-services-speech-sdk

Sample code for the Microsoft Cognitive Services Speech SDK
MIT License
2.79k stars 1.82k forks source link

How to get audio chunk when doing text-to-speech streaming? #2499

Closed Vackwin closed 3 weeks ago

Vackwin commented 1 month ago

Hello, I am using python sdk to synthesize speech from text. The doc here said that we can forward the audio chunks immediately to clients instead of waiting for the whole audio. I want to use this as a server side, and send generated audio to client side streamingly.

speech_synthesizer = speechsdk.SpeechSynthesizer(speech_config=speech_config, audio_config=None)
result = speech_synthesizer.start_speaking_text_async(text).get()
audio_data_stream = speechsdk.AudioDataStream(result)
audio_buffer = bytes(16000)
filled_size = audio_data_stream.read_data(audio_buffer)
while filled_size > 0:
    print("{} bytes received.".format(filled_size))
    filled_size = audio_data_stream.read_data(audio_buffer)

However, this line result = speech_synthesizer.start_speaking_text_async(text).get() will wait for the future completed, so I need to wait until the whole audio generated, how to read_data to audio_buffer when speech is generating? I also look at this, the text prompting works well when audio output to default speaker, but how to send the audio binary chunk to client immediately(first chunk send at first byte latency)? With the example code I can only send audio data until whole speech generated(finish latency).

Thanks for help!

yulin-li commented 1 month ago

start_speaking_text_async(text).get() returns after the test starts synthesizing, not completed. So you can get the audio chuck by chuck with your codes

jingjiangtao commented 1 month ago

start_speaking_text_async(text).get() returns after the test starts synthesizing, not completed. So you can get the audio chuck by chuck with your codes

Does the get() method block the current thread?

jingjiangtao commented 1 month ago

After my testing, the speech_synthesizer.start_speaking_text_async(text).get() method cannot be called concurrently. If the method is called for the first time, it will immediately return a SpeechSynthesisResult object without blocking. However, if the method is called a second time while the first call hasn't returned the first byte, the second call will wait and block the current thread. Example:

import time
import azure.cognitiveservices.speech as speechsdk
from datetime import datetime
from typing import List
from concurrent.futures import ThreadPoolExecutor
from src.config.main import config

'''First, define a few functions
'''
# Get a new SpeechSynthesizer object
def get_speech_synthesizer() -> speechsdk.SpeechSynthesizer:
    speech_config = speechsdk.SpeechConfig(
        subscription=config.TTS['Azure']['SUBSCRIPTION'],
        region=config.TTS['Azure']['SPEECH_REGION']
    )

    speech_config.set_speech_synthesis_output_format(speechsdk.SpeechSynthesisOutputFormat.Riff16Khz16BitMonoPcm)
    audio_config = None
    speech_synthesizer = speechsdk.SpeechSynthesizer(speech_config=speech_config, audio_config=audio_config)
    return speech_synthesizer

# Wrap the call to start_speaking_text_async(text).get() function and return as AudioDataStream
def get_stream(speech_synthesizer: speechsdk.SpeechSynthesizer, text: str) -> speechsdk.AudioDataStream:
    speech_synthesis_result = speech_synthesizer.start_speaking_text_async(text).get()
    stream = speechsdk.AudioDataStream(speech_synthesis_result)
    return stream

# Read data from AudioDataStream
def read_data(stream: speechsdk.AudioDataStream) -> None:
    audio_buffer = bytes(1024)
    length = 0
    filled_size = stream.read_data(audio_buffer)
    length += filled_size
    while filled_size > 0:
        filled_size = stream.read_data(audio_buffer)
        length += filled_size

    canceled = stream.cancellation_details.error_details if stream.cancellation_details else None
    # Print the stream status and the length of audio bytes read
    log(f"read stream finished, length: {length}, status: {stream.status}, canceled: {canceled}")

# Print log with timestamp
def log(msg: str) -> None:
    print(f"{datetime.now().isoformat()} - {msg}")

Now let's start calling the test. Instantiate a SpeechSynthesizer object, make 10 requests with this single object, print the time taken for each request, and save the returned streams to an array. Finally, read the audio data from the streams in parallel in child threads.

# Text to synthesize audio
text = ("This repository hosts samples that help you to get started with several features of the SDK.")
# Save the returned streams
streams: List[speechsdk.AudioDataStream] = []
# Instantiate a SpeechSynthesizer object
tts = get_speech_synthesizer()
# Make 10 calls
for i in range(10):
    tic = time.perf_counter()
    # Call the synthesis method and return the stream
    stream = get_stream(tts, text)
    toc = time.perf_counter()
    # Print the time taken for this request
    log(f"get stream elapsed: {(toc - tic) * 1000:.0f} ms")
    # Save the returned stream
    streams.append(stream)

# Read streams simultaneously in child threads
with ThreadPoolExecutor(max_workers=32) as executor:
    executor.map(read_data, streams)

Output is as follows:

2024-07-20T21:41:02.279893 - get stream elapsed: 1 ms # # The first request took 1 millisecond, didn't block the current thread
2024-07-20T21:41:06.429160 - get stream elapsed: 4149 ms # Subsequent requests took much longer and blocked the current thread
2024-07-20T21:41:08.944821 - get stream elapsed: 2515 ms
2024-07-20T21:41:10.892377 - get stream elapsed: 1948 ms
2024-07-20T21:41:14.316145 - get stream elapsed: 3424 ms
2024-07-20T21:41:17.939049 - get stream elapsed: 3623 ms
2024-07-20T21:41:21.916065 - get stream elapsed: 3977 ms
2024-07-20T21:41:23.662992 - get stream elapsed: 1746 ms
2024-07-20T21:41:25.417438 - get stream elapsed: 1755 ms
2024-07-20T21:41:27.547944 - get stream elapsed: 2131 ms
2024-07-20T21:41:27.564470 - read stream finished, length: 190400, status: StreamStatus.AllData, canceled: None # All data reading was successful
2024-07-20T21:41:27.564470 - read stream finished, length: 190400, status: StreamStatus.AllData, canceled: None
2024-07-20T21:41:27.564470 - read stream finished, length: 190400, status: StreamStatus.AllData, canceled: None

2024-07-20T21:41:27.565470 - read stream finished, length: 190400, status: StreamStatus.AllData, canceled: None
2024-07-20T21:41:27.565470 - read stream finished, length: 190400, status: StreamStatus.AllData, canceled: None
2024-07-20T21:41:27.565470 - read stream finished, length: 190400, status: StreamStatus.AllData, canceled: None
2024-07-20T21:41:27.565470 - read stream finished, length: 190400, status: StreamStatus.AllData, canceled: None
2024-07-20T21:41:27.565470 - read stream finished, length: 190400, status: StreamStatus.AllData, canceled: None
2024-07-20T21:41:27.565470 - read stream finished, length: 190400, status: StreamStatus.AllData, canceled: None
2024-07-20T21:41:29.484493 - read stream finished, length: 190400, status: StreamStatus.AllData, canceled: None

As we can see, only the first request took 1 millisecond, meaning it didn't block. The subsequent 9 requests took much longer and blocked the current thread. All data in the streams was successfully read. This is because the subsequent requests were waiting for the previous ones to complete.

If we wait 5 seconds before each request to allow the previous request to complete, then each request will take the same time as the first request and won't block the current thread:

# Instantiate a SpeechSynthesizer object
tts = get_speech_synthesizer()
# Make 10 calls
for i in range(10):
    tic = time.perf_counter()
     # Call the synthesis method and return the stream
    stream = get_stream(tts, text)
    toc = time.perf_counter()
     # Print the time taken for this request
    log(f"get stream elapsed: {(toc - tic) * 1000:.0f} ms")
    # Save the returned stream
    streams.append(stream)
    # Wait for 5 seconds
    time.sleep(5)

The output is as follows, showing that each request takes a very short time:

2024-07-20T21:45:09.901704 - get stream elapsed: 1 ms
2024-07-20T21:45:14.903804 - get stream elapsed: 1 ms
2024-07-20T21:45:19.906523 - get stream elapsed: 1 ms
2024-07-20T21:45:24.912867 - get stream elapsed: 1 ms
2024-07-20T21:45:29.927142 - get stream elapsed: 0 ms
2024-07-20T21:45:34.942170 - get stream elapsed: 1 ms
2024-07-20T21:45:40.016039 - get stream elapsed: 65 ms
2024-07-20T21:45:45.017656 - get stream elapsed: 1 ms
2024-07-20T21:45:50.021842 - get stream elapsed: 1 ms
2024-07-20T21:45:55.023465 - get stream elapsed: 1 ms
2024-07-20T21:46:00.035453 - read stream finished, length: 190400, status: StreamStatus.AllData, canceled: None
2024-07-20T21:46:00.043458 - read stream finished, length: 190400, status: StreamStatus.AllData, canceled: None
2024-07-20T21:46:00.044450 - read stream finished, length: 190400, status: StreamStatus.AllData, canceled: None
2024-07-20T21:46:00.044450 - read stream finished, length: 190400, status: StreamStatus.AllData, canceled: None
2024-07-20T21:46:00.044450 - read stream finished, length: 190400, status: StreamStatus.AllData, canceled: None
2024-07-20T21:46:00.044450 - read stream finished, length: 190400, status: StreamStatus.AllData, canceled: None
2024-07-20T21:46:00.045450 - read stream finished, length: 190400, status: StreamStatus.AllData, canceled: None
2024-07-20T21:46:00.045450 - read stream finished, length: 190400, status: StreamStatus.AllData, canceled: None
2024-07-20T21:46:00.045450 - read stream finished, length: 190400, status: StreamStatus.AllData, canceled: None
2024-07-20T21:46:00.045450 - read stream finished, length: 190400, status: StreamStatus.AllData, canceled: None

If we use different SpeechSynthesizer objects for each request, each request will take a very short time, but when reading the returned streams, some streams may time out, resulting in inability to read data:

# Make 10 calls
for i in range(10):
    # Get a new instance each time
    tts = get_speech_synthesizer()
    tic = time.perf_counter()
     # Call the synthesis method and return the stream
    stream = get_stream(tts, text)
    toc = time.perf_counter()
    # Print the time taken for this request
    log(f"get stream elapsed: {(toc - tic) * 1000:.0f} ms")
    # Save the returned stream
    streams.append(stream)

The output is as follows:

2024-07-20T21:54:37.479914 - get stream elapsed: 1 ms # All take very short time
2024-07-20T21:54:37.479914 - get stream elapsed: 0 ms
2024-07-20T21:54:37.488914 - get stream elapsed: 0 ms
2024-07-20T21:54:37.489916 - get stream elapsed: 0 ms
2024-07-20T21:54:37.489916 - get stream elapsed: 0 ms
2024-07-20T21:54:37.490923 - get stream elapsed: 0 ms
2024-07-20T21:54:37.490923 - get stream elapsed: 0 ms
2024-07-20T21:54:37.490923 - get stream elapsed: 0 ms
2024-07-20T21:54:37.491914 - get stream elapsed: 1 ms
2024-07-20T21:54:37.492914 - get stream elapsed: 0 ms
2024-07-20T21:54:40.143649 - read stream finished, length: 190400, status: StreamStatus.AllData, canceled: None
2024-07-20T21:54:40.250949 - read stream finished, length: 190400, status: StreamStatus.AllData, canceled: None
2024-07-20T21:54:40.421969 - read stream finished, length: 190400, status: StreamStatus.AllData, canceled: None
2024-07-20T21:54:40.593421 - read stream finished, length: 190400, status: StreamStatus.AllData, canceled: None
2024-07-20T21:54:40.970530 - read stream finished, length: 190400, status: StreamStatus.AllData, canceled: None
2024-07-20T21:54:41.603605 - read stream finished, length: 190400, status: StreamStatus.AllData, canceled: None
2024-07-20T21:55:57.508608 - read stream finished, length: 0, status: StreamStatus.Canceled, canceled: USP error: timeout waiting for the first audio chunk # Error occurred, unable to read data
2024-07-20T21:55:57.509616 - read stream finished, length: 0, status: StreamStatus.Canceled, canceled: USP error: timeout waiting for the first audio chunk # Error occurred, unable to read data
2024-07-20T21:55:57.509616 - read stream finished, length: 0, status: StreamStatus.Canceled, canceled: USP error: timeout waiting for the first audio chunk # Error occurred, unable to read data
2024-07-20T21:55:57.509616 - read stream finished, length: 0, status: StreamStatus.Canceled, canceled: USP error: timeout waiting for the first audio chunk # Error occurred, unable to read data

If we wait for 0.1 seconds after each request before making the next request based on the above code, only the 10th request will succeed, and the first 9 will fail:

# Make 10 calls
for i in range(10):
    # Get a new instance each time
    tts = get_speech_synthesizer()
    tic = time.perf_counter()
    # Call the synthesis method and return the stream
    stream = get_stream(tts, text)
    toc = time.perf_counter()
    # Print the time taken for this request
    log(f"get stream elapsed: {(toc - tic) * 1000:.0f} ms")
    # Save the returned stream
    streams.append(stream)
    # Wait for 0.1 seconds after each request before making the next request
    time.sleep(0.1)

The output is as follows:

2024-07-20T21:59:18.563934 - get stream elapsed: 1 ms
2024-07-20T21:59:18.664269 - get stream elapsed: 0 ms
2024-07-20T21:59:18.775101 - get stream elapsed: 0 ms
2024-07-20T21:59:18.884229 - get stream elapsed: 0 ms
2024-07-20T21:59:18.993076 - get stream elapsed: 1 ms
2024-07-20T21:59:19.101300 - get stream elapsed: 1 ms
2024-07-20T21:59:19.208337 - get stream elapsed: 0 ms
2024-07-20T21:59:19.318798 - get stream elapsed: 1 ms
2024-07-20T21:59:19.427299 - get stream elapsed: 1 ms
2024-07-20T21:59:19.538172 - get stream elapsed: 1 ms
2024-07-20T21:59:22.596834 - read stream finished, length: 190400, status: StreamStatus.AllData, canceled: None # Only the last request was successful, all other requests failed.
2024-07-20T22:00:38.593552 - read stream finished, length: 0, status: StreamStatus.Canceled, canceled: USP error: timeout waiting for the first audio chunk
2024-07-20T22:00:38.685909 - read stream finished, length: 0, status: StreamStatus.Canceled, canceled: USP error: timeout waiting for the first audio chunk
2024-07-20T22:00:38.794399 - read stream finished, length: 0, status: StreamStatus.Canceled, canceled: USP error: timeout waiting for the first audio chunk
2024-07-20T22:00:38.905398 - read stream finished, length: 0, status: StreamStatus.Canceled, canceled: USP error: timeout waiting for the first audio chunk
2024-07-20T22:00:39.014357 - read stream finished, length: 0, status: StreamStatus.Canceled, canceled: USP error: timeout waiting for the first audio chunk
2024-07-20T22:00:39.122586 - read stream finished, length: 0, status: StreamStatus.Canceled, canceled: USP error: timeout waiting for the first audio chunk
2024-07-20T22:00:39.231391 - read stream finished, length: 0, status: StreamStatus.Canceled, canceled: USP error: timeout waiting for the first audio chunk
2024-07-20T22:00:39.340313 - read stream finished, length: 0, status: StreamStatus.Canceled, canceled: USP error: timeout waiting for the first audio chunk
2024-07-20T22:00:39.448226 - read stream finished, length: 0, status: StreamStatus.Canceled, canceled: USP error: timeout waiting for the first audio chunk
yulin-li commented 1 month ago

oh that's true. The synthesizer object would queue the requests and send to the service one by one. You need to create multiple synthesizer instances for concurrency

jingjiangtao commented 1 month ago

oh that's true. The synthesizer object would queue the requests and send to the service one by one. You need to create multiple synthesizer instances for concurrency

The code above has already tested the case of multiple instances making parallel requests. Some of these requests will fail.

github-actions[bot] commented 3 weeks ago

This item has been open without activity for 19 days. Provide a comment on status and remove "update needed" label.

Vackwin commented 3 weeks ago

start_speaking_text_async(text).get() returns after the test starts synthesizing, not completed. So you can get the audio chuck by chuck with your codes

OK, I think this solved my problem, finally i use speak_async() which gets output from llm and do tts streaming.