gtk2k / Unity_aiortc_sample

Sample connecting to aiortc using WebRTC for Unity
MIT License
4 stars 1 forks source link

how to recv video frame from python aiortc per? #1

Closed csjiyw closed 1 week ago

csjiyw commented 1 week ago

problem

i want to use python script to stream video to unity using webrtc, i wrote a python tcp webrtc side to stream video and unity code to recv. The signaling part is success connect, however the video stream is not suppose to transfer.

here is python sender

import asyncio import cv2 from aiortc import RTCPeerConnection, RTCSessionDescription, VideoStreamTrack from aiortc.contrib.signaling import TcpSocketSignaling from av import VideoFrame import fractions from datetime import datetime

class CustomVideoStreamTrack(VideoStreamTrack): def init(self, camera_id): super().init() self.cap = cv2.VideoCapture(camera_id) self.frame_count = 0

async def recv(self):
    self.frame_count += 1
    print(f"Sending frame {self.frame_count}")
    ret, frame = self.cap.read()
    if not ret:
        print("Failed to read frame from camera")
        return None
    frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    video_frame = VideoFrame.from_ndarray(frame, format="rgb24")
    video_frame.pts = self.frame_count
    video_frame.time_base = fractions.Fraction(1, 30)  # Use fractions for time_base
    # Add timestamp to the frame
    timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]  # Current time with milliseconds
    cv2.putText(frame, timestamp, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2, cv2.LINE_AA)

    frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    video_frame = VideoFrame.from_ndarray(frame, format="rgb24")
    video_frame.pts = self.frame_count
    video_frame.time_base = fractions.Fraction(1, 30)  # Use fractions for time_base
    return video_frame

async def setup_webrtc_and_run(ip_address, port, camera_id): signaling = TcpSocketSignaling(ip_address, port) pc = RTCPeerConnection() video_sender = CustomVideoStreamTrack(camera_id) pc.addTrack(video_sender)

try:
    await signaling.connect()

    @pc.on("datachannel")
    def on_datachannel(channel):
        print(f"Data channel established: {channel.label}")

    @pc.on("connectionstatechange")
    async def on_connectionstatechange():
        print(f"Connection state is {pc.connectionState}")
        if pc.connectionState == "connected":
            print("WebRTC connection established successfully")

    offer = await pc.createOffer()
    await pc.setLocalDescription(offer)
    await signaling.send(pc.localDescription)

    while True:
        obj = await signaling.receive()
        if isinstance(obj, RTCSessionDescription):
            await pc.setRemoteDescription(obj)
            print("Remote description set")
        elif obj is None:
            print("Signaling ended")
            break
    print("Closing connection")
finally:
    await pc.close()

async def main(): ip_address = "127.0.0.1" # Ip Address of Remote Server/Machine port = 9999 camera_id = 1 # Change this to the appropriate camera ID await setup_webrtc_and_run(ip_address, port, camera_id)

if name == "main": asyncio.run(main())

here is my unity recv

using System; using System.Threading.Tasks; using UnityEngine; using Unity.WebRTC; using System.Collections; using System.Collections.Generic; using System.Linq;

public class WebRTCSignaling : MonoBehaviour {

RenderTexture rt1;
public Material mat1;

private RTCPeerConnection  pc;

[System.Serializable]
public class SdpData
{
    public string sdp;
    public string type;
}

string GetName(RTCPeerConnection pc)
{
    return "pc1";
}

private DelegateOnIceConnectionChange pc1OnIceConnectionChange;

void OnIceConnectionChange(RTCPeerConnection pc, RTCIceConnectionState state)
{
    switch (state)
    {
        case RTCIceConnectionState.New:
            Debug.Log($"{GetName(pc)} IceConnectionState: New");
            break;
        case RTCIceConnectionState.Checking:
            Debug.Log($"{GetName(pc)} IceConnectionState: Checking");
            break;
        case RTCIceConnectionState.Closed:
            Debug.Log($"{GetName(pc)} IceConnectionState: Closed");
            break;
        case RTCIceConnectionState.Completed:
            Debug.Log($"{GetName(pc)} IceConnectionState: Completed");
            break;
        case RTCIceConnectionState.Connected:
            Debug.Log($"{GetName(pc)} IceConnectionState: Connected");
            break;
        case RTCIceConnectionState.Disconnected:
            Debug.Log($"{GetName(pc)} IceConnectionState: Disconnected");
            break;
        case RTCIceConnectionState.Failed:
            Debug.Log($"{GetName(pc)} IceConnectionState: Failed");
            break;
        case RTCIceConnectionState.Max:
            Debug.Log($"{GetName(pc)} IceConnectionState: Max");
            break;
        default:
            break;
    }
}
void Pc1OnIceConnectinChange(RTCIceConnectionState state)
{
    OnIceConnectionChange(pc, state);
}

private List<RTCRtpReceiver> pc1Receivers;
MediaStream receiveStream;
private DelegateOnNegotiationNeeded pc1OnNegotiationNeeded;

VideoStreamTrack videoTrack1;

IEnumerator S1()
{

    // WebRTC.Initialize(); // Initialize WebRTC
    // receiveStream = new MediaStream();

    pc1OnIceConnectionChange = state => { OnIceConnectionChange(pc, state); };

    pc = new RTCPeerConnection();
    pc.OnIceCandidate = OnIceCandidate; // Handle ICE Candidates
    pc.OnIceConnectionChange = pc1OnIceConnectionChange;
    // pc.OnNegotiationNeeded = pc1OnNegotiationNeeded;

    pc1Receivers = new List<RTCRtpReceiver>();

    pc.OnTrack = (RTCTrackEvent e) =>
    {
         if (e.Track is VideoStreamTrack videoTrack)
        {
            Debug.Log("Video track received");
            // Debug.Log($"Video Track Source: {videoTrack.m_source}");
            // receiveStream.AddTrack(videoTrack);

            videoTrack.OnVideoReceived += tex =>
            {
                Debug.Log("OnVideoReceived");
                mat1.mainTexture = tex;
                // receiveImage.color = Color.white;
            };; // Subscribe to frame reception

            // var receiveVideoStream =e.Streams.;
            // yield return receiveVideoStream.First;
            receiveStream=  e.Streams.First();
            // receiveStream = receiveVideoStream.FirstOrDefault;
            receiveStream.OnRemoveTrack = ev =>
            {
                // receiveVideo.texture = null;
                ev.Track.Dispose();
            };
        }
    };

    StartCoroutine(PeerNegotiationNeeded(pc));

    yield return new WaitForSeconds(1);

}

TcpSocketSignaling signaling;
IEnumerator PeerNegotiationNeeded(RTCPeerConnection pc)
{
    // Assume signaling is a custom TCP signaling connection you established
    signaling = new TcpSocketSignaling("127.0.0.1", 9999);
    signaling.Connect();

    // Receive an offer from the sender
    Debug.Log("Waiting for offer from sender...");
    string offerSdp = signaling.Receive(); // Get the offer as a string
    SdpData sdpData = JsonUtility.FromJson<SdpData>(offerSdp);

    RTCSessionDescription offer = new RTCSessionDescription { sdp = sdpData.sdp, type = RTCSdpType.Offer };

    // Set the remote description to the received offer
    Debug.Log("Offer received" + sdpData.sdp + " " + sdpData.type);
    var op3 = pc.SetRemoteDescription(ref offer);
    yield return op3;

    if (!op3.IsError)
    {
        // yield return op3;
        Debug.Log("Remote description set");
    }
    else
    {
        var error = op3.Error;
        Debug.Log(error);
    }

    // Create an answer
    var op4 = pc.CreateAnswer();
    yield return op4;
    if (!op4.IsError)
    {
        // yield return op4;
        Debug.Log("Answer created");

        // // Set the local description to the answer
        // await signaling.Send(pc.LocalDescription.sdp);

        // OnCreateOfferSuccess(pc,op4.Desc);
        RTCSessionDescription desc = op4.Desc;

        var op5 = pc.SetLocalDescription(ref desc);
        yield return op5;

        // Debug.Log("Local description set");

        // Send the answer back to the sender via the signaling channel

        SdpData d2 = new SdpData();
        d2.sdp = pc.LocalDescription.sdp;
        d2.type = "Answer";
        String txt = JsonUtility.ToJson(d2);
        signaling.Send(txt);
        Debug.Log("Answer sent to sender");

        // Wait for connection
        WaitForConnection();
    }
    else
    {
        var error = op4.Error;
        Debug.Log(error);
    }
}

void Start()
{
    WebRTC.Initialize();
    // mat1.mainTexture=tex1;
    StartCoroutine(WebRTC.Update());
    StartCoroutine(S1());
}

private void WaitForConnection()
{
    pc.OnConnectionStateChange = state =>
    {
        Debug.Log($"Connection state: {state}");
        if (state == RTCPeerConnectionState.Connected)
        {
            Debug.Log("Connection established!");
        }
    };
}

private void OnIceCandidate(RTCIceCandidate candidate)
{
    // Send ICE candidates over signaling channel if needed
    Debug.Log($"New ICE candidate: {candidate.Candidate}");
}

private void OnDestroy()
{
    pc.Close();
    signaling.Stop();
    // WebRTC.Dispose();
}

}

here is the unity tcp signaling code

using System; using System.Net; using System.Net.Sockets; using System.Text; using System.Threading; using System.Threading.Tasks; using UnityEngine;

public class TcpSocketSignaling { // private TcpListener _listener; private TcpClient _client;

private NetworkStream _stream;

public TcpSocketSignaling(string ipAddress, int port)
{
    _client = new TcpClient(ipAddress, port);
}

public  void Connect()
{

    Debug.Log("Waiting for a connection...");
    // _client = await _client.AcceptTcpClientAsync();
    Debug.Log("Client connected.");
    _stream = _client.GetStream();
}

public string Receive()
{
    byte[] buffer = new byte[3024];
    int bytesRead =  _stream.Read(buffer, 0, buffer.Length);
    string message = Encoding.UTF8.GetString(buffer, 0, bytesRead);
    Debug.Log($"Received message: {message}");
    return message;
}

public void Send(string message)
{
    if (_client != null && _client.Connected)
    {
        byte[] data = Encoding.UTF8.GetBytes(message);
         _stream.Write(data, 0, data.Length);
        Debug.Log($"Sent message: {message}");
    }
}

public void Stop()
{
    _client?.Close();
    Debug.Log("Server stopped.");
}

}

csjiyw commented 1 week ago

the python code is bug free while i tested it with the python recv, code as follow: import cv2 from aiortc.rtcrtpreceiver import RemoteStreamTrack

cv2.namedWindow("win", cv2.WINDOW_NORMAL)

import asyncio

import numpy as np from aiortc import RTCPeerConnection, RTCSessionDescription, MediaStreamTrack from aiortc.contrib.signaling import TcpSocketSignaling from av import VideoFrame from datetime import datetime, timedelta

class VideoReceiver: def init(self): self.track = None self.frame = None

async def handle_track(self, track : RemoteStreamTrack):
    print("Inside handle track")
    self.track = track
    frame_count = 0
    while True:
        try:
            print("Waiting for frame...")
            frame = await asyncio.wait_for(track.recv(), timeout=5.0)
            frame_count += 1
            print(f"Received frame {frame_count}")

            if isinstance(frame, VideoFrame):
                print(f"Frame type: VideoFrame, pts: {frame.pts}, time_base: {frame.time_base}")
                frame = frame.to_ndarray(format="bgr24")
            elif isinstance(frame, np.ndarray):
                print(f"Frame type: numpy array")
            else:
                print(f"Unexpected frame type: {type(frame)}")
                continue

            # Add timestamp to the frame
            current_time = datetime.now()
            new_time = current_time - timedelta(seconds=55)
            timestamp = new_time.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
            cv2.putText(frame, timestamp, (10, frame.shape[0] - 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2,
                        cv2.LINE_AA)

            self.frame = frame.copy()
            cv2.imshow('win', self.frame)
            cv2.waitKey(1)
            # cv2.imwrite(f"imgs/received_frame_{frame_count}.jpg", frame)
            # print(f"Saved frame {frame_count} to file")
            # cv2.imshow("Frame", frame)
            #
            # # Exit on 'q' key press
            # if cv2.waitKey(1) & 0xFF == ord('q'):
            #     break
        except asyncio.TimeoutError:
            print("Timeout waiting for frame, continuing...")
        except Exception as e:
            print(f"Error in handle_track: {str(e)}")
            if "Connection" in str(e):
                break
    print("Exiting handle_track")

def show_frame(self):
    if self.frame is not None:
        cv2.imshow('win', self.frame)
        cv2.waitKey(1)
    else:
        print("frame is None")

async def run(pc, signaling): await signaling.connect()

@pc.on("track")
def on_track(track):
    if isinstance(track, MediaStreamTrack):
        print(f"Receiving {track.kind} track")
        asyncio.ensure_future(video_receiver.handle_track(track))

@pc.on("datachannel")
def on_datachannel(channel):
    print(f"Data channel established: {channel.label}")

@pc.on("connectionstatechange")
async def on_connectionstatechange():
    print(f"Connection state is {pc.connectionState}")
    if pc.connectionState == "connected":
        print("WebRTC connection established successfully")

print("Waiting for offer from sender...")
offer = await signaling.receive()
print("Offer received")
await pc.setRemoteDescription(offer)
print("Remote description set")

answer = await pc.createAnswer()
print("Answer created")
await pc.setLocalDescription(answer)
print("Local description set")

await signaling.send(pc.localDescription)
print("Answer sent to sender")

print("Waiting for connection to be established...")
while pc.connectionState != "connected":
    await asyncio.sleep(0.1)

print("Connection established, waiting for frames...")
await asyncio.sleep(1000000)  # Wait for 35 seconds to receive frames

# while True:
#     video_receiver.show_frame()

print("Closing connection")

import threading

async def main(): signaling = TcpSocketSignaling("127.0.0.1", 9999) pc = RTCPeerConnection()

global video_receiver
video_receiver = VideoReceiver()

# t1 = threading.Thread(target=video_receiver.show_frame)
# t1.start()

try:
    await run(pc, signaling)
except Exception as e:
    print(f"Error in main: {str(e)}")
finally:
    print("Closing peer connection")
    await pc.close()

if name == "main": asyncio.run(main())

csjiyw commented 1 week ago

finally i am success to recv video from aiortc example webcam.py by modify aiortcConnector.cs as follows:

using System.Collections; using System.Linq; using System.Text; using Unity.WebRTC; using UnityEngine; using UnityEngine.Networking; using UnityEngine.UI;

public class aiortcConnector : MonoBehaviour { [SerializeField] private string aiortcServerURL; [SerializeField] private RawImage receiveVideo;

private enum Side
{
    Local,
    Remote
}

private class SignalingMsg
{
    public string type;
    public string sdp;
    public string video_transform;
    public RTCSessionDescription ToDesc()
    {
        return new RTCSessionDescription
        {
            type = type == "offer" ? RTCSdpType.Offer : RTCSdpType.Answer,
            sdp = sdp
        };
    }
}

private RTCPeerConnection pc;
private MediaStream receiveVideoStream;

void Start()
{
    WebRTC.Initialize();
    StartCoroutine(WebRTC.Update());
    Connect();
}

public void Connect()
{
    pc = new RTCPeerConnection();

    var transceiver = pc.AddTransceiver(TrackKind.Video, new RTCRtpTransceiverInit
    {
        direction = RTCRtpTransceiverDirection.RecvOnly
    });

    pc.OnIceGatheringStateChange = state =>
    {
        Debug.Log($"OnIceGatheringStateChange > state: {state}");
    };
    pc.OnConnectionStateChange = state =>
    {
        Debug.Log($"OnConnectionStateChange > state: {state}");
    };

    receiveVideoStream = new MediaStream();
    receiveVideoStream.OnAddTrack = e => {
        if (e.Track is VideoStreamTrack track)
        {
            // You can access received texture using `track.Texture` property.
            track.OnVideoReceived+= tex =>
            {
                receiveVideo.texture = tex;
            };
        }

    };

    pc.OnTrack = e =>
    {
        Debug.Log($"OnTrack");

        if (e.Track is VideoStreamTrack video)
        {
            Debug.Log($"OnTrackVideo");
            receiveVideoStream.AddTrack(e.Track);
        }
    };
    StartCoroutine(CreateDesc(RTCSdpType.Offer));
}

private IEnumerator CreateDesc(RTCSdpType type)
{
    Debug.Log("CreateDesc"+type);
    var op = type == RTCSdpType.Offer ? pc.CreateOffer() : pc.CreateAnswer();
    yield return op;

    if (op.IsError)
    {
        Debug.LogError($"Create {type} Error: {op.Error.message}");
        yield break;
    }

    StartCoroutine(SetDesc(Side.Local, op.Desc));
}

private IEnumerator SetDesc(Side side, RTCSessionDescription desc)
{
    Debug.Log("SetDesc"+side+desc);
    var op = side == Side.Local ? pc.SetLocalDescription(ref desc) : pc.SetRemoteDescription(ref desc);
    yield return op;

    if (op.IsError)
    {
        Debug.Log($"Set {desc.type} Error: {op.Error.message}");
        yield break;
    }

    if (side == Side.Local)
    {
        // aiortc not support Tricle ICE. 
        var msg = new SignalingMsg
        {
            type = pc.LocalDescription.type.ToString().ToLower(),
            sdp = pc.LocalDescription.sdp
        };

         Debug.Log("sdp:"+pc.LocalDescription.sdp);

        yield return aiortcSignaling(msg);

    }
    else if (desc.type == RTCSdpType.Offer)
    {
        yield return StartCoroutine(CreateDesc(RTCSdpType.Answer));
    }
}

private IEnumerator aiortcSignaling(SignalingMsg msg)
{
    Debug.Log("aiortcSignaling");
    var jsonStr = JsonUtility.ToJson(msg);
    using var req = new UnityWebRequest($"{aiortcServerURL}/{msg.type}", "POST");
    var bodyRaw = Encoding.UTF8.GetBytes(jsonStr);
    req.uploadHandler = new UploadHandlerRaw(bodyRaw);
    req.downloadHandler = new DownloadHandlerBuffer();
    req.SetRequestHeader("Content-Type", "application/json");

    yield return req.SendWebRequest();
    Debug.Log(req.downloadHandler.text);

    var resMsg = JsonUtility.FromJson<SignalingMsg>(req.downloadHandler.text);

    yield return StartCoroutine(SetDesc(Side.Remote, resMsg.ToDesc()));
}

void Update()
{
}

}