livekit / python-sdks

LiveKit real-time and server SDKs for Python
https://docs.livekit.io
Apache License 2.0
123 stars 37 forks source link

Client cannot receive any of the events #205

Closed xiaokang00010 closed 3 months ago

xiaokang00010 commented 3 months ago

track_subscribed and participant_connected are set when initializing chatroom. However, client can not receive any of the events even the peer publish a new track. When the client receives events, it supposed to print log. However it didn't. And source.capture_frame in broadcastAudioLoop seemed not working too.

Log:

(cyberWaifuV2) (base) ➜ CyberWaifu-v2 git:(master) ✗ LIVEKIT_RTC_DEBUG=true make start_web_backend
python app.py -s
(1374805) wsgi starting up on http://192.168.1.13:6211
(1374805) accepted ('192.168.1.13', 49194)
113.102.22.44,192.168.1.13 - - [09/Jun/2024 23:06:04] "OPTIONS /api/v1/rtvc/establish HTTP/1.1" 200 382 0.002057
(1374805) accepted ('192.168.1.13', 49210)
initialized voice chat session
Unclosed client session
client_session:
Unclosed connector
connections: ['[(, 5641375.975447935)]']
connector:
preparing to start chat...
connecting to room...
broadcast audio track published: stream_track
113.102.22.44,192.168.1.13 - - [09/Jun/2024 23:06:04] "POST /api/v1/rtvc/establish HTTP/1.1" 200 1075 0.198894

Code of my implementation:

class VoiceChatSession:
    """
    Real time chat session for a character.

    Attributes:
        sessionName (str): session name
        charName (str): character name
        bot (instance.Chatbot): chatbot object for the real time session
        chatRoom (Optional[livekit.rtc.Room]): livekit room object for the real time session
        dataProvider (dataProvider.DataProvider): data provider object
        currentImageFrame (Optional[livekit.rtc.VideoFrame]): current image frame from the user's camera or sharing screen
        broadcastAudioTrack (Optional[livekit.rtc.AudioTrack]): audio track for broadcasting audio to other users in the session

    Methods:
        VAD(stream: livekit.rtc.AudioStream) -> None: Voice activity detection.
        receiveVideoStream(stream: livekit.rtc.VideoStream) -> None: Receive video stream from other user.
        start(botToken: str) -> None: Start the hoster of chat session.
        chatPluginGetUserMedia() -> glm.File: Get image of user's camera or sharing screen. Use it when user want you to know about the content of his camera or screen or your response is related to the content of the camera or screen.
    """

    def __init__(self, sessionName: str, charName: str, dataProvider: dataProvider.DataProvider) -> None:
        self.sessionName = sessionName
        self.charName = charName
        self.bot = instance.Chatbot(memory.Memory(
            dataProvider, charName), dataProvider.getUserName(), [self.chatPluginGetUserMedia])
        self.chatRoom: Optional[livekit.rtc.Room] = None
        self.dataProvider = dataProvider
        self.currentImageFrame: Optional[livekit.rtc.VideoFrame] = None
        self.broadcastAudioTrack: Optional[livekit.rtc.AudioTrack] = None
        self.broadcastMissions: queue.Queue[dict[str, str | int | bool]] = []
        self.currentBroadcastMission: Optional[av.InputContainer |
                                               av.OutputContainer] = None
        self.ttsServiceId = self.bot.memory.getCharTTSServiceId()
        self.ttsService = self.dataProvider.getGPTSoVitsService(
            self.ttsServiceId)
        self.GPTSoVITsAPI = GPTSoVitsAPI(self.ttsService['url'])
        self.vadModel = webrtcvad.Vad(3)
        print('initialized voice chat session')

    async def chat(self, audios: list[glm.File]) -> None:
        """
        Send audio files to chatbot and retrive response as broadcast missions.

        Args:
            audios (list[glm.File]): list of audio files

        Returns:
            None
        """

        resp = []
        # no to use self.bot.chat here cuz we've already uploaded the files.
        if self.bot.inChatting:
            resp = self.bot.llm.chat(audios)
        else:
            resp = self.bot.llm.initiate(audios)
            self.bot.inChatting = True
        for i in self.dataProvider.parseModelResponse(resp):
            self.broadcastMissions.put(i)

    async def VAD(self, stream: livekit.rtc.AudioStream) -> None:
        """
        Voice activity detection.
        Fetch and identify each audio frame, when activity detected, save to local temporary file and upload as glm.File then send to Gemini model as Input.

        Args:
            stream (livekit.rtc.AudioStream): audio stream

        Returns:
            None
        """
        # a simple implemenation of WebRTC VAD algorithm
        ring_buffer: list[tuple[bytes, bool]] = []
        voiced_frames: list[bytes] = []
        bs = []
        # i don't even know whether it's a good idea to use 648 as the maxlen.
        maxlen = 30
        triggered = False
        async for frame in stream:
            byteFrame = frame.frame.data.tobytes()
            isSpeech = self.vadModel.is_speech(
                frame.frame.data.tobytes(), webFrontend.config.LIVEKIT_SAMPLE_RATE)
            if not triggered:
                ring_buffer.append((byteFrame, isSpeech))
                num_voiced = len([f for f, speech in ring_buffer if speech])
                if num_voiced > 0.9 * maxlen:
                    triggered = True
                    voiced_frames.extend(
                        [f for f, speech in ring_buffer if speech])
                    ring_buffer = []
            else:
                voiced_frames.append(byteFrame)
                ring_buffer.append((byteFrame, isSpeech))
                num_unvoiced = len(
                    [f for f, speech in ring_buffer if not speech])
                if num_unvoiced > 0.9 * maxlen:
                    triggered = False
                    bs.append(b''.join(f for f in voiced_frames))

                    def proc(b: bytes):
                        temp = self.dataProvider.tempFilePathProvider('wav')
                        with open(temp, 'wb') as f:
                            f.write(b)

                        glmFile = google.generativeai.upload_file(temp)
                        os.remove(temp)
                        return glmFile
                    files = [proc(b) for b in bs]
                    self.chat(files)
                    ring_buffer = []
                    voiced_frames = []

    async def receiveVideoStream(self, stream: livekit.rtc.VideoStream) -> None:
        """
        Receive video stream from other user.

        Args:
            stream (livekit.rtc.VideoStream): video stream

        Returns:
            None
        """
        async for frame in stream:
            self.currentImageFrame = frame.frame

    async def start(self, botToken: str) -> None:
        """
        Start the hoster of chat session.

        Returns:
            None
        """

        print('preparing to start chat...')

        self.chatRoom = livekit.rtc.Room()
        await self.chatRoom.connect(f"ws://{webFrontend.config.LIVEKIT_API_URL}", botToken)

        print('connecting to room...')

        @self.chatRoom.on("track_subscribed")
        def on_track_subscribed(track: livekit.rtc.Track):
            print(f"track subscribed: {track.kind} {track.name}")

        @self.chatRoom.on("participant_connected")
        def on_participant_connected(participant: livekit.rtc.RemoteParticipant):
            print(f"participant connected: {
                      participant.identity} {participant.sid}")

            # publish track
        audioSource = livekit.rtc.AudioSource(
            webFrontend.config.LIVEKIT_SAMPLE_RATE, 1)
        self.broadcastAudioTrack = livekit.rtc.LocalAudioTrack.create_audio_track(
                "stream_track", audioSource)
        publication = await self.chatRoom.local_participant.publish_track(
            self.broadcastAudioTrack, livekit.rtc.TrackPublishOptions(source=livekit.rtc.TrackSource.SOURCE_MICROPHONE))
        print(f"broadcast audio track published: {
                publication.track.name}")

        # asyncio.get_event_loop().create_task(self.broadcastAudioLoop(
        #     source=audioSource, frequency=1000))

    def generateEmptyAudioFrame(self) -> livekit.rtc.AudioFrame:
        """
        Generate an empty audio frame.

        Returns:
            livekit.rtc.AudioFrame: empty audio frame
        """
        amplitude = 32767  # for 16-bit audio
        samples_per_channel = 480  # 10ms at 48kHz
        time = numpy.arange(samples_per_channel) / webFrontend.config.LIVEKIT_SAMPLE_RATE
        total_samples = 0
        audio_frame = livekit.rtc.AudioFrame.create(webFrontend.config.LIVEKIT_SAMPLE_RATE, 1, samples_per_channel)
        audio_data = numpy.frombuffer(audio_frame.data, dtype=numpy.int16)
        time = (total_samples + numpy.arange(samples_per_channel)) / webFrontend.config.LIVEKIT_SAMPLE_RATE
        wave = numpy.int16(0)
        numpy.copyto(audio_data, wave)
        print('done1')
        return audio_frame

    def fetchBroadcastMission(self) -> None:
        if len(self.broadcastMissions) == 0:
            self.currentBroadcastMission = None
        else:
            r = self.dataProvider.convertModelResponseToTTSInput(
                [self.broadcastMissions.get()], self.ttsService['reference_audios'])[0]
            refAudio = self.dataProvider.getReferenceAudioByName(
                self.ttsServiceId, r['emotion'])
            if refAudio is None:
                raise exceptions.ReferenceAudioNotFound(
                    f"Reference audio for emotion {r['emotion']} not found")
            self.currentBroadcastMission = av.open(self.GPTSoVITsAPI.tts(
                refAudio['path'], refAudio['text'], r['text'], refAudio['language']).raw)
        return self.currentBroadcastMission

    async def broadcastAudioLoop(self, source: livekit.rtc.AudioSource, frequency: int):
        while True:
            print('broadcasting audio...')
            if self.fetchBroadcastMission() is None:
                await source.capture_frame(self.generateEmptyAudioFrame())
                print('done2')
            else:
                frame: Optional[av.AudioFrame] = None
                async for frame in self.currentBroadcastMission.decode(audio=0):
                    livekitFrame = livekit.rtc.AudioFrame(
                        frame.to_ndarray().tobytes(),
                        frame.sample_rate,
                        num_channels=1, samples_per_channel=480).remix_and_resample(webFrontend.config.LIVEKIT_SAMPLE_RATE, 1)

                    await source.capture_frame(livekitFrame)

    def chatPluginGetUserMedia(self) -> glm.File:
        """
        Get image of user's camera or sharing screen. Use it when user want you to know about the content of his camera or screen or 
        your response is related to the content of the camera or screen.

        Returns:
            glm.File: Image file of user's camera or sharing screen.
        """

        if self.currentImageFrame is None:
            raise exceptions.NoUserMediaFound(
                f"No image frame found for {self.charName}")

        img = self.currentImageFrame.convert(
            livekit.rtc.VideoBufferType.RGBA).data.tobytes()
        img_np = numpy.frombuffer(img, dtype=numpy.uint8).reshape(
            self.currentImageFrame.height,
            self.currentImageFrame.width,
            4
        )
        encoded, buffer = cv2.imencode('.jpg', img_np)
        temp = self.dataProvider.tempFilePathProvider('jpg')
        with open(temp, 'wb') as f:
            f.write(buffer)

        glmFile = google.generativeai.upload_file(temp)
        os.remove(temp)
        return glmFile

    def terminateSession(self) -> None:
        """
        Terminate the chat session.
        """
        async def f():
            await self.chatRoom.disconnect()
            self.bot.terminateChat()

        asyncio.get_event_loop().run_until_complete(f())
xiaokang00010 commented 3 months ago

Well, I fixed it this morning

aylarov commented 1 month ago

Hi, can you elaborate please? Seems like having similar problem and can't figure it out

xiaokang00010 commented 1 month ago

Hi, can you elaborate please? Seems like having similar problem and can't figure it out

Sry for not providing detailed response, cuz I didn't remember how to fix it. It's been a long time ago. But you can check out my changes here