Azure-Samples / cognitive-services-speech-sdk

Sample code for the Microsoft Cognitive Services Speech SDK
MIT License
2.8k stars 1.83k forks source link

[TTS/STT] Dangling threads #2506

Open MarkoMackic opened 2 months ago

MarkoMackic commented 2 months ago

Here is minimal reproduction example ( self contained so I don't have to follow all the instructions, you don't need anyio if you're using the pure_thread version, it's here just because we're using it, so I included it ). The problem here is that worker threads for MS speech synth ( the recognizer has the same problem ) are left dangling. The machines we're using are Ubuntu 22.04 ( also my machine ). We're using MS speech recog/synth in our system which handles many simultaneous real time audio connections, and we spawn a thread for each of those, and then inside it we run synth/recog . It seems by this data that 5 worker threads are spawned for each synth instance, so to sum it up, it quickly chokes the system.

If there is something I'm missing in how we use the API, please do let me know.

SDK version confirmed: 1.37.0, 1.38.0

import gc
import os
import subprocess
import threading
import time
from threading import Thread

import anyio
import anyio.from_thread
import azure.cognitiveservices.speech as speechsdk

LOG_LEVEL = 0
PURE_THREAD = 1

def log(text, min_level=0):
    if LOG_LEVEL >= min_level:
        print(text)

class NullOutputStream(speechsdk.audio.PushAudioOutputStream):
    def __init__(self):
        super().__init__(self)

    def write(self, data: memoryview) -> int:
        log(f"Write : {len(data)}", 2)
        return len(data)

    def close(self) -> None:
        pass

def get_speech_synth():
    speech_cfg = speechsdk.SpeechConfig(
        subscription=os.environ["SPEECH_KEY"], region=os.environ["SPEECH_REGION"]
    )
    speech_cfg.speech_synthesis_language = "en"
    speech_cfg.speech_synthesis_voice_name = "en-US-AriaNeural"
    speech_cfg.set_speech_synthesis_output_format(
        speechsdk.SpeechSynthesisOutputFormat.Raw16Khz16BitMonoPcm
    )
    return speechsdk.SpeechSynthesizer(
        speech_config=speech_cfg,
        audio_config=speechsdk.AudioConfig(stream=NullOutputStream()),
    )

def disconnect_signals(synth: speechsdk.SpeechSynthesizer):
    synth.synthesis_completed.disconnect_all()
    synth.synthesis_started.disconnect_all()

def thread_process():
    log(f"Thread {threading.current_thread()} started")

    synth = get_speech_synth()

    evt = threading.Event()

    def started(eargs: speechsdk.SpeechSynthesisEventArgs):
        log("Synth started", 1)

    def completed(eargs: speechsdk.SpeechSynthesisEventArgs):
        log("Synth completed", 1)
        evt.set()

    synth.synthesis_started.connect(started)
    synth.synthesis_completed.connect(completed)

    log("Before speaking", 1)
    synth.speak_text_async("Hello how are you")
    log("before sleep", 2)
    time.sleep(10)
    log("after sleep", 1)

    # we're waiting for stop stopped/completed event, thread should shut down?
    synth.stop_speaking_async()

    evt.wait()

    disconnect_signals(synth)

    synth = None
    log("Garbage collecting? maybe that's it?")
    gc.collect()
    log(f"Thread {threading.current_thread()} ended")

async def task_process(portal: anyio.from_thread.BlockingPortal):
    log(f"Thread {threading.current_thread()} started")

    synth = get_speech_synth()
    evt = anyio.Event()

    def started(eargs: speechsdk.SpeechSynthesisEventArgs):
        log("Synth started", 1)

    def completed(eargs: speechsdk.SpeechSynthesisEventArgs):
        log("Synth completed", 1)
        portal.call(evt.set)

    synth.synthesis_started.connect(started)
    synth.synthesis_completed.connect(completed)

    log("Before speaking", 1)
    synth.speak_text_async("Hello how are you")
    log("before sleep", 2)
    await anyio.sleep(10)
    log("after sleep", 2)

    # we're waiting for stop stopped/completed event, thread should shut down?
    synth.stop_speaking_async()
    await evt.wait()

    # signals still connected ? they are not
    disconnect_signals(synth)

    synth = None
    log("Garbage collecting? maybe that's it?")
    gc.collect()
    log(f"Thread {threading.current_thread()} ended")

successful_threads = 0

async def with_blocking_portal():
    async with anyio.from_thread.BlockingPortal() as bp:
        await task_process(bp)

def run():
    global successful_threads
    try:
        if PURE_THREAD:
            thread_process()
        else:
            anyio.run(with_blocking_portal)
        successful_threads += 1
    except Exception as e:
        log(f"error {e}", 2)

def print_thread_state():
    log("THREAD STATE")
    threads = subprocess.check_output(
        ["ps", "-T", "-p", str(os.getpid())]
    ).splitlines()[1::]
    log(f"Successfully executed {successful_threads}")
    log(f"Running count {len(threads)}")
    log("Use gdb to look at dangling microsoft threads.")
    log("END THREAD STATE")

threads = [Thread(target=run) for i in range(10)]

print_thread_state()

for t in threads:
    t.start()

for t in threads:
    t.join()

log("garbage collecting")
gc.collect()
log("finished all")
print_thread_state()
time.sleep(50000)

Here is example of the output:

THREAD STATE
Successfully executed 0
Running count 1
Use gdb to look at dangling microsoft threads.
END THREAD STATE
Thread <Thread(Thread-1 (run), started 123425957570112)> started
Thread <Thread(Thread-2 (run), started 123425949177408)> started
Thread <Thread(Thread-3 (run), started 123425940784704)> started
Thread <Thread(Thread-4 (run), started 123425932392000)> started
Thread <Thread(Thread-5 (run), started 123425923999296)> started
Thread <Thread(Thread-6 (run), started 123425915606592)> started
Thread <Thread(Thread-7 (run), started 123425907213888)> started
Thread <Thread(Thread-8 (run), started 123425898821184)> started
Thread <Thread(Thread-9 (run), started 123425414706752)> started
Thread <Thread(Thread-10 (run), started 123424928159296)> started
Garbage collecting? maybe that's it?
Thread <Thread(Thread-4 (run), started 123425932392000)> ended
Garbage collecting? maybe that's it?
Garbage collecting? maybe that's it?
Thread <Thread(Thread-9 (run), started 123425414706752)> ended
Garbage collecting? maybe that's it?
Thread <Thread(Thread-1 (run), started 123425957570112)> ended
Garbage collecting? maybe that's it?
Garbage collecting? maybe that's it?
Garbage collecting? maybe that's it?
Thread <Thread(Thread-3 (run), started 123425940784704)> ended
Garbage collecting? maybe that's it?
Thread <Thread(Thread-6 (run), started 123425915606592)> ended
Thread <Thread(Thread-2 (run), started 123425949177408)> ended
Garbage collecting? maybe that's it?
Thread <Thread(Thread-7 (run), started 123425907213888)> ended
Thread <Thread(Thread-10 (run), started 123424928159296)> ended
Garbage collecting? maybe that's it?
Thread <Thread(Thread-8 (run), started 123425898821184)> ended
Thread <Thread(Thread-5 (run), started 123425923999296)> ended
garbage collecting
finished all
THREAD STATE
Successfully executed 10
Running count 51
Use gdb to look at dangling microsoft threads.
END THREAD STATE
MarkoMackic commented 2 months ago

Using speak_text doesn't leave dangling threads ( stop_speaking_async removed).

MarkoMackic commented 2 months ago

Using stop_speaking instead of stop_speaking_async causes segfault (start_speaking_async used to initiate the speaking ). First thread exited successfully. image

kormang commented 1 month ago

The problem is that _async function require that the caller eventually calls result_future.get(). If this is by design, then at least it should be documented.

github-actions[bot] commented 4 weeks ago

This item has been open without activity for 19 days. Provide a comment on status and remove "update needed" label.

kormang commented 3 weeks ago

This item has been open without activity for 19 days. Provide a comment on status and remove "update needed" label.

Issue is still present, and won't go away by it self. Very easy to reproduce.

MarkoMackic commented 3 weeks ago

Can we get any info about this from your side?

github-actions[bot] commented 6 days ago

This item has been open without activity for 19 days. Provide a comment on status and remove "update needed" label.