livekit / python-sdks

LiveKit real-time and server SDKs for Python
https://docs.livekit.io
Apache License 2.0
72 stars 23 forks source link

How to receive videoframes and display in opencv #213

Open scottDavidRad opened 1 week ago

scottDavidRad commented 1 week ago

ERROR:livekit:livekit_ffi::cabi:53:livekit_ffi::cabi - failed to handle request FfiRequest { message: Some(NewVideoStream(NewVideoStreamRequest { track_handle: 9, r#type: VideoStreamNative, format: None, normalize_stride: true })) }: InvalidRequest("handle is not a livekit_ffi::server::room::FfiTrack")

How do I solve this error.

import asyncio import colorsys import logging import os from signal import SIGINT, SIGTERM

import numpy as np from livekit import api, rtc import livekit import cv2 import numpy

tasks = set()

WIDTH, HEIGHT = 1280, 720

ensure LIVEKIT_URL, LIVEKIT_API_KEY, and LIVEKIT_API_SECRET are set

async def frame_loop(video_stream: rtc.VideoStream) -> None: argb_frame = None cv2.namedWindow("livekit_video", cv2.WINDOW_AUTOSIZE) cv2.startWindowThread()

async for frame in video_stream:
   buffer = frame_event.frame
   arr = np.frombuffer(buffer.data, dtype=np.uint8)
   arr = arr.reshape((buffer.height, buffer.width, 3))

cv2.namedWindow('livekit_video', cv2.WINDOW_AUTOSIZE)
cv2.startWindowThread()

async def main(room: rtc.Room): video_stream = None token = ( api.AccessToken() .with_identity("stuff") .with_name("scott") .with_grants( api.VideoGrants( room_join=True, room="my-room", ) ) .to_jwt() ) url = os.getenv("LIVEKIT_URL") logging.info("connecting to %s", url) try: await room.connect(url, token) logging.info("connected to room %s", room.name) except rtc.ConnectError as e: logging.error("failed to connect to the room: %s", e) return

print("connected to room: " + room.name)

@room.on("track_published")
def on_track_subscribed(
        track: rtc.Track,
        publication: rtc.RemoteTrackPublication):
    if track.kind == rtc.TrackKind.KIND_VIDEO:
        video_stream = rtc.VideoStream(track)

        asyncio.ensure_future(frame_loop(video_stream))

@room.on("track_unpublished")
def on_track_unpublished(
        publication: rtc.RemoteTrackPublication,
        participant: rtc.RemoteParticipant):
    logging.info("track unpublished: %s", publication.sid)

if name == "main": loop = asyncio.get_event_loop() room = rtc.Room(loop=loop)

async def cleanup():
    await room.disconnect()
    loop.stop()

asyncio.ensure_future(main(room))
for signal in [SIGINT, SIGTERM]:
    loop.add_signal_handler(
        signal, lambda: asyncio.ensure_future(
            cleanup()))

try:
    loop.run_forever()
finally:
    loop.close()