spatialaudio / python-sounddevice

:sound: Play and Record Sound with Python :snake:
https://python-sounddevice.readthedocs.io/
MIT License
1.01k stars 149 forks source link

Having Difficulty trying to create an asynchronous playback #547

Open DragonwolfAside opened 1 month ago

DragonwolfAside commented 1 month ago

I'm currently up to a project that needs an asynchronous audio support, and I need both the player process and the generator process work at the same time. I've been following your code on readthedocs, but I still cannot figure things out. Here's my code written:

import asyncio

import sounddevice as sd
import soundfile as sf
import numpy as np

fil = sf.SoundFile('files/snd.wav', 'r')

class Engine:
    def __init__(self, sr=48000, blocksize=512, buf_size=2048, channels=2, dtype='float32'):
        self.sr = sr
        self.blocksize = blocksize
        self.channels = channels
        self.dtype = dtype

        self.buf_size = buf_size
        self.buffer = np.zeros((self.buf_size, self.channels), dtype=self.dtype)
        self.stop_event = asyncio.Event()

        self.pos = 0  # read ptr
        self.end = 0  # data end ptr
        self.delta = self.buf_size  # spare memory space
        self.remain = 0  # remaining data

    async def reader(self):
        # Pseudo generator, replacing with other code later
        while True:
            if self.end > self.pos:  # case loopback
                self.buffer[self.end:] = (fil.read(self.buf_size - self.end)
                                          .reshape(self.buf_size - self.end, self.channels))
                self.buffer[:self.pos] = fil.read(self.pos).reshape(self.pos, self.channels)
            else:
                self.buffer[self.end:self.pos] = (fil.read(self.pos - self.end)
                                                  .reshape(self.pos - self.end, self.channels))
            await asyncio.sleep(0)  # yield

    async def callback(self, outdata, frame_count, time_info, status):
        print(f"\rCursor: {self.pos} End: {self.end}\nRemain: {self.remain} Delta: {self.delta}", end="")

        if self.end > self.pos:  # case loopback
            self.remain = self.end - self.pos
            self.delta = self.buf_size - self.remain - 1
        elif self.end < self.pos:
            self.delta = self.pos - self.end - 1
            self.remain = self.buf_size - self.delta + 1
        else:
            print(" !! No Data", end="")

        if self.pos + frame_count <= self.remain:  # sufficient data
            if self.end > self.pos:  # case loopback
                rear = self.buf_size - self.pos - 1  # read
                outdata[:rear] = self.buffer[self.pos:]
                outdata[rear:] = self.buffer[:self.end]
                self.pos = frame_count - rear
            else:
                outdata[:self.remain] = self.buffer[self.pos:self.end]  # read
                self.pos += frame_count

        else:  # buffer underflow
            under = frame_count - self.remain
            print(f"\n!!!Buffer Underflow, under={under}")

            if self.end > self.pos:  # case loopback
                rear = self.buf_size - self.pos - 1  # read rest of data
                outdata[:rear] = self.buffer[self.pos:]
                outdata[rear:] = self.buffer[:self.end]
                outdata[-under:] = np.zeros((under, self.channels), dtype=self.dtype)  # return zeros
            else:
                outdata[:self.remain] = self.buffer[self.pos:self.end]  # read rest of data
                outdata[-under:] = np.zeros((under, self.channels), dtype=self.dtype)  # return zeros

            self.pos = self.end

    def start_stream(self, **kwargs):
        print(self.dtype, self.blocksize, self.channels, self.sr)

        stream = sd.OutputStream(
            callback=self.callback, dtype=self.dtype, blocksize=self.blocksize, channels=self.channels,
            samplerate=self.sr, **kwargs
        )

        with stream:
            await self.stop_event.wait()

async def main():
    engine = Engine(channels=1)
    await asyncio.gather(
        engine.reader(),
        engine.start_stream()
    )

if __name__ == '__main__':
    asyncio.run(main())
    input("Program Exited...")

I knew some tricks like sync2async blocking2async, and I tried without callback, but the thread is still blocked by stream.write().

Here's my tricks:

import asyncio

loop = asyncio.new_event_loop()

async def sync2async(func, *args):
    return await loop.run_in_executor(None, func, *args)

async def blocking2async(func, *args):
    coro = loop.run_in_executor(None, func, *args)
    done, pending = await asyncio.wait([coro], return_when=asyncio.FIRST_COMPLETED)
    if coro in done:
        return coro.result()

Here's my modified methods

class Engine:

    ...

    async def callback(self):
        print(f"\rCursor: {self.pos} End: {self.end}\nRemain: {self.remain} Delta: {self.delta}", end="")
        outdata = np.zeros((self.blocksize, self.channels), dtype=self.dtype)
        ...
        return outdata

    async def writer(self):
        while True:
            await blocking2async(self.stream.write, await self.callback())
            await asyncio.sleep(0)  # yield

    def start_stream(self):
        print(self.dtype, self.blocksize, self.channels, self.sr)
        self.stream.start()

async def main():
    engine = Engine(channels=1)
    engine.start_stream()
    await asyncio.gather(
        engine.reader(),
        engine.writer()
    )

When I run this code, I noticed that the program stuck at done, pending = await asyncio.wait([coro], return_when=asyncio.FIRST_COMPLETED) and it keeps awaiting.

Really we can't use asyncio in sounddevice?

mgeier commented 1 month ago

a project that needs an asynchronous audio support

"asynchronous" is quite a broad term.

All callback streams are already asynchronous (the callback function runs in a separate thread that's created by the PortAudio library), and if you use sd.play(...) in an interactive Python interpreter, it will start playing asynchronously while you can continue executing Python statements.

So as long as you don't use the "blocking" interface, it's always asynchronous!

Really we can't use asyncio in sounddevice?

That's a much more specific question.

It is possible to use asyncio together with the sounddevice module, but it is a bit awkward, both of which can be seen in the "asyncio" examples.

Don't those examples work for you?

I didn't have the time to look closer into your example, but the callback function cannot be an async function, that's simply not supported.

Apart from that, your example code doesn't run:

    await self.stop_event.wait()
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
SyntaxError: 'await' outside async function

If your callback function would work, it would then access self.buffer in the callback thread while it is also accessed in the main thread. This will likely lead to problems. You should try to make your code thread-safe.

DragonwolfAside commented 1 month ago

Apart from that, your example code doesn't run:

Well, forgot to make Engine.start_stream a synchronous. Here's a tested version

import asyncio

import sounddevice as sd
import soundfile as sf
import numpy as np

from utils import blocking2async

# This program needs a 'files/snd.wav' to run
fil = sf.SoundFile('files/snd.wav', 'r')

class Engine:
    def __init__(self, sr=48000, blocksize=512, buf_size=2048, channels=2, dtype='float32', **kwargs):
        self.sr = sr
        self.blocksize = blocksize
        self.channels = channels
        self.dtype = dtype

        self.buf_size = buf_size
        self.buffer = np.zeros((self.buf_size, self.channels), dtype=self.dtype)
        self.stop_event = asyncio.Event()

        self.pos = 0  # read ptr
        self.end = 0  # data end ptr
        self.delta = self.buf_size  # spare memory space
        self.remain = 0  # remaining data

        self.stream = sd.OutputStream(
            dtype=self.dtype, blocksize=self.blocksize, channels=self.channels,
            samplerate=self.sr, **kwargs
        )

    async def reader(self):
        # Pseudo generator, replacing with other code later
        while True:
            if self.end > self.pos:  # case loopback
                self.buffer[self.end:] = (fil.read(self.buf_size - self.end)
                                          .reshape(self.buf_size - self.end, self.channels))
                self.buffer[:self.pos] = fil.read(self.pos).reshape(self.pos, self.channels)
            else:
                self.buffer[self.end:self.pos] = (fil.read(self.pos - self.end)
                                                  .reshape(self.pos - self.end, self.channels))
            await asyncio.sleep(0)  # yield

    async def callback(self):
        print(f"\rCursor: {self.pos} End: {self.end}\nRemain: {self.remain} Delta: {self.delta}", end="")
        outdata = np.zeros((self.blocksize, self.channels), dtype=self.dtype)

        if self.end > self.pos:  # case loopback
            self.remain = self.end - self.pos
            self.delta = self.buf_size - self.remain - 1
        elif self.end < self.pos:
            self.delta = self.pos - self.end - 1
            self.remain = self.buf_size - self.delta + 1
        else:
            print(" !! No Data", end="")
            # self.stop_event.set()

        if self.pos + self.blocksize <= self.remain:  # sufficient data
            if self.end > self.pos:  # case loopback
                rear = self.buf_size - self.pos - 1  # read
                outdata[:rear] = self.buffer[self.pos:]
                outdata[rear:] = self.buffer[:self.end]
                self.pos = self.blocksize - rear
            else:
                outdata[:self.remain] = self.buffer[self.pos:self.end]  # read
                self.pos += self.blocksize

        else:  # Buffer underflow
            under = self.blocksize - self.remain
            print(f"\n!!!Buffer Underflow, under={under}")

            if self.end > self.pos:  # case loopback
                rear = self.buf_size - self.pos - 1  # read rest of data
                outdata[:rear] = self.buffer[self.pos:]
                outdata[rear:] = self.buffer[:self.end]
                outdata[-under:] = np.zeros((under, self.channels), dtype=self.dtype)  # return zeros
            else:
                outdata[:self.remain] = self.buffer[self.pos:self.end]  # read rest of data
                outdata[-under:] = np.zeros((under, self.channels), dtype=self.dtype)  # return zeros

            self.pos = self.end

        return outdata

    async def writer(self):
        while True:
            await blocking2async(self.stream.write, await self.callback())
            await asyncio.sleep(0)  # yield

    def start_stream(self):
        print(self.dtype, self.blocksize, self.channels, self.sr)

        self.stream.start()

async def main():
    engine = Engine(channels=1)
    engine.start_stream()
    await asyncio.gather(
        engine.reader(),
        engine.writer()
    )

if __name__ == '__main__':
    asyncio.run(main())
    input("Program Exited...")

Finally, thank you so much for your reply! It helps a lot!