lipku / metahuman-stream

Real time interactive streaming digital human
https://livetalking-doc.readthedocs.io/
Apache License 2.0
3.54k stars 499 forks source link

我参考作者的实现方式自己实现了GaussianTalker接入进来,为什么asyncio.Queue会一直阻塞住呢?各位懂异步编程的可以帮帮我吗 #264

Open shehuiwojiege opened 6 days ago

shehuiwojiege commented 6 days ago

我参考作者的实现思路,实现了Player和Tracker,下面是我的简单demo,但我就是不明白为什么在Tracker的recv那里,异步Queue被阻塞了,希望各位帮我看下,不胜感谢: '''python import time, av import aiohttp import logging import uvicorn import asyncio import threading import multiprocessing from aiortc import RTCPeerConnection, RTCSessionDescription import fractions, asyncio from typing import ( Set, Tuple, Union, Optional, ) from loguru import logger from av.frame import Frame from av.packet import Packet from aiortc import MediaStreamTrack AUDIO_PTIME = 0.020 # 20ms audio packetization VIDEO_CLOCK_RATE = 90000 VIDEO_PTIME = 1 / 25 # 30fps VIDEO_TIME_BASE = fractions.Fraction(1, VIDEO_CLOCK_RATE) SAMPLE_RATE = 48000 AUDIO_TIME_BASE = fractions.Fraction(1, SAMPLE_RATE)

def player_worker_thread( quit_event, loop, linker, audio_track, video_track ): linker.render_forever(quit_event, loop, audio_track, video_track)

class PlayerStreamTrack(MediaStreamTrack):

def __init__(self, player, kind):
    super().__init__()
    self.kind = kind
    self._player = player
    self._queue = asyncio.Queue()

_start: float
_timestamp: int

async def next_timestamp(self) -> Tuple[int, fractions.Fraction]:
    if self.readyState != 'live':
        raise Exception

    if self.kind == 'video':
        if hasattr(self, '_timestamp'):
            self._timestamp += int(VIDEO_PTIME * VIDEO_CLOCK_RATE)
            wait = self._start + (self._timestamp / VIDEO_CLOCK_RATE)
            if wait > 0:
                await asyncio.sleep(wait)
        else:
            self._start = time.time()
            self._timestamp = 0
            # print('video start:', self._start)
        return self._timestamp, VIDEO_TIME_BASE
    else:
        if hasattr(self, '_timestamp'):
            self._timestamp += int(AUDIO_PTIME * SAMPLE_RATE)
            wait = self._start + (self._timestamp / SAMPLE_RATE) - time.time()
            if wait > 0:
                await asyncio.sleep(wait)
        else:
            self._start = time.time()
            self._timestamp = 0
            # print('audio start:', self._start)
        return self._timestamp, VIDEO_TIME_BASE

async def recv(self) -> Union[Frame, Packet]:
    self._player._start(self)
    frame = await self._queue.get()
    print(11111, self.kind, frame)
    if frame is None:
        self.stop()
        raise Exception
    pts, time_base = await self.next_timestamp()
    frame.pts = pts
    frame.time_base = time_base
    print(self.kind, self._queue.qsize())
    return frame

def stop(self):
    super().stop()
    if self._player is not None:
        self._player._stop(self)
        self._player = None

class HumanPlayer:

def __init__(self, linker):
    self.__thread: Optional[threading.Thread] = None
    self.__thread_quit: Optional[threading.Event] = None
    self.__started: Set[PlayerStreamTrack] = set()
    self.__audio: Optional[PlayerStreamTrack] = None
    self.__video: Optional[PlayerStreamTrack] = None
    self.__audio = PlayerStreamTrack(self, 'audio')
    self.__video = PlayerStreamTrack(self, 'video')
    self.__linker = linker

@property
def audio(self) -> MediaStreamTrack:
    return self.__audio

@property
def video(self) -> MediaStreamTrack:
    return self.__video

def _start(self, track: PlayerStreamTrack) -> None:
    self.__started.add(track)
    if self.__thread is None:
        self.__log_debug('Starting worker thread')
        self.__thread_quit = threading.Event()
        self.__thread = threading.Thread(
            name='media-player',
            target=player_worker_thread,
            args=(
                self.__thread_quit,
                asyncio.new_event_loop(),
                self.__linker,
                self.__audio,
                self.__video
            )
        )
        self.__thread.start()
        time.sleep(2)

def _stop(self, track: PlayerStreamTrack) -> None:
    self.__started.discard(track)

    if not self.__started and self.__thread is not None:
        self.__log_debug("Stopping worker thread")
        self.__thread_quit.set()
        self.__thread.join()
        self.__thread = None

    if not self.__started and self.__linker is not None:
        self.__linker = None

def __log_debug(self, msg: str, *args) -> None:
    logger.debug(f'HumanPlayer {msg}', *args)

class GaussianTalkerLinker:

def __init__(self):
    con = av.open('1.mov')
    self.audio_generator = [f for f in con.decode(audio=0)]
    self.audio_idx = 0
    con = av.open('1.mov')
    self.video_generator = [f for f in con.decode(video=0)]
    self.video_idx = 0

def get_audio(self):
    if self.audio_idx >= len(self.audio_generator):
        self.audio_idx = 0
    frame = self.audio_generator[self.audio_idx]
    self.audio_idx += 1
    return frame

def get_video(self):
    if self.video_idx >= len(self.video_generator):
        self.video_idx = 0
    frame = self.video_generator[self.video_idx]
    self.video_idx += 1
    return frame

def render_forever(
        self,
        quit_event,
        loop=None,
        audio_track: PlayerStreamTrack = None,
        video_track: PlayerStreamTrack = None):
    while not quit_event.is_set():
        self.run_step(loop, audio_track, video_track)
        if video_track._queue.qsize() >= 5:
            time.sleep(0.04*video_track._queue.qsize()*0.8)
    print('linker thread stop')

def run_step(self, loop=None, audio_track=None, video_track=None):
    for i in range(2):
        new_frame = self.get_audio()
        asyncio.run_coroutine_threadsafe(audio_track._queue.put(new_frame), loop)
    # 模型推理
    new_frame = self.get_video()
    asyncio.run_coroutine_threadsafe(video_track._queue.put(new_frame), loop)

async def post(url,data): try: async with aiohttp.ClientSession() as session: async with session.post(url,data=data) as response: return await response.text() except aiohttp.ClientError as e: print(f'Error: {e}')

pcs = set()

async def run(push_url): pc = RTCPeerConnection() pcs.add(pc) @pc.on("connectionstatechange") async def on_connectionstatechange(): print("Connection state is %s" % pc.connectionState) if pc.connectionState == "failed": await pc.close() pcs.discard(pc) player = HumanPlayer(linker) audio_sender = pc.addTrack(player.audio) video_sender = pc.addTrack(player.video) await pc.setLocalDescription(await pc.createOffer()) answer = await post(push_url, pc.localDescription.sdp) await pc.setRemoteDescription(RTCSessionDescription(sdp=answer, type='answer'))

if name == 'main': linker = GaussianTalkerLinker() multiprocessing.set_start_method('spawn') loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.run_until_complete(run('http://192.168.31.19:1985/rtc/v1/whip/?app=live&stream=livestream')) loop.run_forever() '''

shehuiwojiege commented 6 days ago

异步编程实在搞不定呀,它们不都是一个loop的吗,为什么在recv那里的queue.get会阻塞住呀?

xiongxiaoxiang commented 3 days ago

实例化的哪个地方改改