matrix-org / waterfall

A cascading stream forwarding unit for scalable, distributed voice and video conferencing over Matrix
Apache License 2.0
97 stars 5 forks source link

Performant RTP Publisher #134

Closed daniel-abramov closed 1 year ago

daniel-abramov commented 1 year ago

(per commit review is recommended to follow the changes easier; only one of the commits [the latest one] is relatively large due to moving many things around)

A rough idea of the current conference is that the conference owns the state of the conference and was the only entity that modified and mutated the state of the conference. That was convenient as there is no need for any locks and the access to the data was sequentialised (see the sketch: https://github.com/matrix-org/waterfall/pull/52). This worked under the assumption that processing a single message takes a very short time and does not really block the conference (i.e. handling all incoming events from the peers, Matrix signaling, etc).

At some, it was clear that certain functions were blocking, while others require more time to get processed than expected, so all blocking operations have been gradually moved to their separate workers/go-routines where the conference loop still received the messages, but offloaded the expensive processing of some of them to these workers while maintaining the state of the conference. That was implemented for e.g. processing the incoming RTP packets from the peers where each subscription had its own worker to process messages. Then the same was repeated for outgoing matrix signaling messages that also were moved into their queue.

While this helped, this did not really solve all the problems since the incoming RTP packets from peers were still sent to the conference (i.e. shared the same message queue) and while the RTP packets were not processed by the conference (the conference just read the packet from the queue, identified which subscription it belonged and dispatched it to the right worker), it still was not optimal as any slight lag or hiccup in the processing of any of the events would potentially delay the dispatching of all incoming RTP packets to their workers which means that a seldom lag could cost us several seconds (or even minutes) of freezes.

Sending RTP packets to the conference was not the best idea and it looked like it does not really belong there given that these packets don’t even mutate the conference state and are only relevant for their particular subscribers, so it was clear that the RTP packet handling must be moved out of the main conference queue processing.

This PR changes this by introducing a new logic where each peer only sends a message to the conference informing it about a new available track (or a simulcast quality). The conference then processes this message by creating a published track (that in turn creates a publisher that runs in its own go-routine). The publisher then reads packets from the incoming track and sends them to the subscriber workers (as the conference did before). This means that the RTP processing is now completely independent from the conference and runs in parallel with another packet handling. The published track (and the publisher) only share the list of available subscriptions with the conference (common shared data structure) the access to which is synchronized, which means that the only moment we currently lock/synchronize the access to the subscriptions is when the state of the subscription changes (layer changed, subscription starts/stops, etc).

This means that the existing publishers and subscribers are not really affected by the main conference loop much and once started, will continue to run efficiently.

Another thing that it allowed us to make is to regulate the size of the queue for the incoming RTP packets that each subscription gets. Since now each published track’s message about the incoming RTP is not sent on a ‘global’ conference queue, we have more control when it comes to the implementation of the backpressure. Currently, each publisher’s worker loop essentially looks like this:

packet = track.read()
if packet.error() {
    inform_conference_that_the_track_is_gone()
    return
}

for subscription in subscriptions {
    queue_full = subscription.enqueue(packet)
    if queue_full {
        // the packet was not delivered, so we drop this packet
        // log it, todo: we probably want to take the old packets from the queue here
    }
}

So now it looks almost exactly the same as the processing of publishers in LiveKit: https://blog.livekit.io/going-beyond-a-single-core-4a464d20d17a/ (except that we went for the approach where we spawn a go-routine per publisher instead of creating a work-pool with a limited amount of workers, as it seems like the last one is a bit more complicated, but does not bring that much of a performance advantage).


Fixes https://github.com/matrix-org/waterfall/issues/117 Relates to https://github.com/matrix-org/waterfall/issues/120 (but only for RTP packets)