aiortc / aiortc

WebRTC and ORTC implementation for Python using asyncio
BSD 3-Clause "New" or "Revised" License
4.29k stars 767 forks source link

How to use the same player for two or more local peers? #367

Closed riccardobrue closed 3 years ago

riccardobrue commented 4 years ago

Hi, I am creating a multi-peer system using aiortc (Python 3.7). The core logic of my software is to create as many RTCPeerConnections as the number of "clients" are attached to my server, so I have n peer-to-peer connections. Now I am facing issues for using the same global player (player = MediaPlayer("hw:0", format="alsa")) from several local (server-side) peers. In particular:

I am pretty new to the asyncio system, so maybe I am missing something there so maybe I am missing something on the creation of the loops and threads in my app.

Firstly, can someone tell me if what I want to achieve is possible using aiortc? If yes, can someone provide a piece of code showing two local peers (RTCPeerConnection) sharing the same player? How to generate the loops and the threads is such app.

Actually I managed to work with only one peer which creates the player, similarly to the webcam.py example provided.

jlaine commented 4 years ago

In its current architecture, each track can have only one reader: when the recv method is called it will retrieve the next frame and move on. This is true both for MediaPlayer tracks and RemoteStreamTrack (received media).

You could write a "proxy" class if you need multiple readers for a given track.

riccardobrue commented 4 years ago

In its current architecture, each track can have only one reader: when the recv method is called it will retrieve the next frame and move on. This is true both for MediaPlayer tracks and RemoteStreamTrack (received media).

You could write a "proxy" class if you need multiple readers for a given track.

In my architetture I should read from the microphon and broadcasting the audio, so I should create a single microphone reader class which will then share the same track to all the media players right? How can I achieve this? Have you got any idea? Where can I start? Thank you.

jlaine commented 4 years ago

Your proxy class would take a track as input (in your case the microphone) and needs to implement some logic to duplicate all the frames to feed them to multiple readers.

riccardobrue commented 4 years ago

Ok, I've managed to read from the mic on a single process, then to broadcast the audio frames to all the other peer processes by using queues. So far it is working pretty well, but I am wondering if it really the best practice.

Listening thread

[main]
thread_listening = Thread(target=thread_callback_listening, daemon=True)

[...]
def thread_callback_listening():
    event_loop_listening = asyncio.new_event_loop()
    asyncio.set_event_loop(event_loop_listening)
    event_loop_listening.run_until_complete(listening_service())

[...]
async def listening_service():
    player = MediaPlayer("hw:0",format="alsa")
    f_audio = await player.audio.recv()
    while True:
        f_audio=await player.audio.recv()
        my_array = f_audio.to_ndarray()
        for q in queues:
            # putting the received audio on the queues for each connection
            q.put(my_array)

In order to handle each peer connection I have this signalling service based on a TCP socket:

[main]
thread_tcp_service = Thread(target=thread_callback_tcp_service, daemon=True)

[...]
def thread_callback_tcp_service():
    event_loop_tcp_service = asyncio.new_event_loop()
    asyncio.set_event_loop(event_loop_tcp_service)
    asyncio.get_event_loop().call_soon(lambda: tcp_service())
    event_loop_tcp_service.run_forever()

[...]
def tcp_service():
    serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    serversocket.bind(("0.0.0.0", 5000))
    serversocket.listen(5)

    try:
        while True:
            connection, client_address = serversocket.accept()

            #creating new queue for a specific connection thread
            q = queue.Queue()
            q.maxsize=10
            queues.append(q)

            #creating a connection thread (wraps a webrtc peer)
            t = MyConnThread(connection, client_address, q)
            t.start()

    except KeyboardInterrupt:
        pass

Inside MyConnThreadclass, I have a wrapper for a webrtc peer instance, which runs on a different thread too. In order to read from the audio queue, I have implemented this logic with a custom MediaStreamTrackclass (based on AudioStreamTrackfrom aiortc.mediastreams).

def __init__(self, key, audio_queue:queue.Queue):
      self.localPeer = RTCPeerConnection()
      [...]
      audio_track=MyAudioStreamTrack(self.audio_queue)
      self.localPeer.addTrack(audio_track)

[....]
class MyAudioStreamTrack(MediaStreamTrack):
    kind = "audio"
    def __init__(self, audio_queue) -> None:
        super().__init__()
        self.audio_queue = audio_queue

        self._start: float
        self._timestamp: int
        self._start=None
        self._timestamp=None

    async def recv(self) -> Frame:
        if self.readyState != "live":
            raise MediaStreamError

        audio_data = self.audio_queue.get()

        frame = AudioFrame(format="s16", layout="stereo", samples=1)
        frame = frame.from_ndarray(array=audio_data, format="s16", layout="stereo")
        frame.sample_rate = 48000 # fixed?

        if self._timestamp is not None and self._start is not None:
            self._timestamp += frame.samples
            #wait = self._start + (self._timestamp / frame.sample_rate) - time.time()
            #await asyncio.sleep(wait)
        else:
            self._start = time.time()
            self._timestamp = 0

        frame.pts = self._timestamp
        return frame

Over all this logic seems to be a little bit heavy (on a dual core system iMX6 seems to be too heavy and it works really bad) meanwhile on a iMX8 system it works very well.

I am facing problems with the management of the threads and the loops. I'll post the updated code when I'll solve these problems. So far, what do you think of this solution based on the queues?

YLTsai0609 commented 3 years ago

@riccardobrue Hi

I'm new to aiortc. facing the same problem with you. According to the code above. I think you achieved your goal.

Could you share your script for the new guy who just came to this package?

Many thanks!

Laityned commented 3 years ago

See #495

jlaine commented 3 years ago

I'm closing this issue since PR #495 has landed