Open gavv opened 5 years ago
Hello @gavv, thanks for describing the issue so well. From the "easy hacks" label and the implementation you mentioned, this seems like a good first bug for me to dip my toes in this project. If no one is working on this can I pick this up? Also, would there be a deadline before which this needs to be done?
Also, are max_latency
and max_sessions
command line arguments?
@biljith You're welcome! Nobody is working on this.
Also, would there be a deadline before which this needs to be done?
It would be nice to have this in 0.2, which we're currently working on (because we're adding RTCP and RTSP). But there is no specific deadline neither for 0.2 nor for this task.
Also, are max_latency and max_sessions command line arguments?
We have --max-latency
command-line parameter. We don't have --max-sessions
yet (see #107).
After some thought, it seems that we can't use max_latency
here. The latency is measured in samples. But the ring buffer will contain both source and repair packets, and measuring repair packets is samples is meaningless.
Instead, we can add two parameters: --max-sessions
and --max-session-packets
and use them to limit the ring buffer. Later we can also use them to implement other limits described in #107.
@gavv I have coded up everything except for the unit tests. I am reading up on what repair packets are and what source packets are. Do you have any examples of code and documentation that you can point me to which can help me with writing the unit tests?
FEC and source/repair packets are documented here: https://roc-project.github.io/roc/docs/internals/fec.html
In particular, see "FECFRAME" and "FEC packet fields" sections.
These pages may be also helpful:
You can take a look at the tests for roc_fec module: https://github.com/roc-project/roc/blob/master/src/tests/roc_fec/target_openfec/test_writer_reader.cpp
Most of those tests has the same flow:
Here, in roc_pipeline tests, we need something like this. You can create pipeline::Sender and write some audio frames to it so it will generate source and repair packets and write it to a queue. Then you can fetch packets from the queue and simulate "delivering" of all or some of them, probably in another order, by writing them to pipeline::Receiver. Then you can read audio frames from pipeline::Receiver and see how it processed the packets you wrote to it.
Generally, FEC works the following way: if a block of 10 source packets and 5 repair packets is fully delivered, you will get audio from all 10 source packets. If one source packets is lost, you will still get the audio from all 10 source packets because the lost packet will be restored from delivered repair packets. But, for example, if one source packet is lost AND ALL repair packets are also lost, you will get zero samples instead of the lost source packet because there are no repair packets to be able to restore it.
So you can use this fact to check whether repair packets were dropped or were stored into the ring buffer and later successfully reached the session. E.g. you can first "deliver" repair packets (write them to receiver), then "deliver" all source packets except one (as it was lost) and then read audio frames from receiver and check them. They should contain correct samples, including the samples at the place of the lost packet, which means that repair packets were successfully delivered and were used to repair the loss.
Then, for example, you can repeat this test on a larger number of packets exceeding the size of ring buffer. E.g. you can "lose" one source packet in every FEC block. The you can check that the samples for the last N blocks are fully correct, but the samples for earlier blocks contain zeros at the place of the lost packets because the corresponding repair packets didn't fit into the ring buffer and were dropped.
Tests for pipeline::Sender are here: https://github.com/roc-project/roc/blob/master/src/tests/roc_pipeline/test_sender.cpp. They provide an example of using FrameWriter + Sender + Queue. You write samples to FrameWriter and get the resulting packets from the Queue.
Tests for pipeline::Receiver are here: https://github.com/roc-project/roc/blob/master/src/tests/roc_pipeline/test_receiver.cpp. They provide an example of using Receiver + FrameReader. You write packets to Receiver and read and check samples from it using FrameReader.
Current receiver tests usually generate synthetic packets using PacketWriter; but it's not capable of generating valid FEC packets. For ring buffer tests, it would be easier to use FrameWriter + Sender + Queue to generate valid source + repair FEC packets from an audio stream.
@biljith Do you need any help? :)
I'm unassigning the issue so as someone else could pick it up.
@biljith feel free to ping me if you still have plans on it.
I can take at look at this, but I will certainly give it back to @biljith if he returns and the issue is still open.
Great!
does pipeline::Receiver
change to pipeline::ReceiverSource
?
Find the place where Receiver::fetch_packets_() drops packets (when it gets false from route_packet_()), and make it store the packet inside the ring buffer instead (add the packet to the end of the buffer; if the buffer capacity exceeds the limit, remove the very first packet from it).
does it refer to https://github.com/roc-project/roc/blob/develop/src/modules/roc_pipeline/receiver_source.cpp#L162-L164 ?
does pipeline::Receiver change to pipeline::ReceiverSource?
Yes.
does it refer to [...]
routepacket() was moved to ReceiverSessionGroup:
If no session can handle the packet, and a new session can't be created, it drops the packet.
Here is where the drop is logged:
route_packet() is called from ReceiverEndpoint::flush_packets():
https://github.com/roc-project/roc/blob/develop/src/modules/roc_pipeline/receiver_endpoint.cpp#L112
which in turn is called from here:
which is called here:
So now the layout is the following:
Network thread writes packets to each ReceiverEndpoint (depending on what port did the packets arrive).
ReceiverSource asks every ReceiverEndpointSet to handle these packets. ReceiverEndpointSet asks ReceiverEndpoint to do it. ReceiverEndpoint fetches the packets and passes them to ReceiverSessionGroup, which either routes packet to existing session, creates a new session, or drops the packet.
Now instead of dropping, we should add packets to a ring buffer.
I think every ReceiverEndpointSet or ReceiverSessionGroup should have its own ring buffer.
HI @gavv I created a draft pr #357 want to make sure if I am on the right path. Are your previous comments on the unit test still applicable? Thanks!
want to make sure if I am on the right path.
In general, yes. I'll leave some minor comments.
Are your previous comments on the unit test still applicable?
I think yes.
@fusuiyi123 Feel free to ping me if you'll decide to get back to it.
This issue will be also important when we use clock synchronization based on e2e latency, because it requires to delay session start until we receive first RTCP report.
@gavv assign this to me and i’ll work on it
Great, thanks!
Last revised: Oct 2023
Problem
Currently, when we receive a packet, we first try to find an existing session and route the packet to it, then try to create a new session from the packet, and if both of these failed, we drop the packet.
We create sessions only when a source packet arrives and thus drop repair packets preceding the first delivered source packet. This may decrease the service quality at the session start a bit.
When we'll add RTCP and RTSP support, we might want to be able to disable session auto-creation at all and allow only explicitly announced sessions. In this case we will drop data packets until negotiation completes.
When we'll also add support for multiple sender ports, allowing different streams of the same session to have different source addresses, we will not be able to match the streams of the same session until we receive instructions from RTCP or RTSP. So in this case we will also drop data packets until negotiation completes.
When we'll add support for dynamic payload type, we also will not be able to handle and will drop packets until session negotiation completes.
In other words, there are numerous reasons why we will drop packets until session negotiation completes or, if it is disabled, until the first source packet arrives.
On the other hand, session negotiation may take time (RTSP connection establishment requires a few round-trips; RTCP packets may be lost and are not retried immediately).
In view of the above, this means that we will always drop some packets at the beginning of the session and so increase the "cold" (startup) latency.
Solution
The following simple solution is suggested, which solves all these problems:
The ring buffer capacity should be enough to hold packets for
max_latency * max_sessions
samples.Implementation
Add a ring buffer to pipeline::ReceiverSessionGroup; we can use
core::List<packet::Packet>
as a container because we don't need random access.Find the place where ReceiverSessionGroup::route_packet() drops packets, and make it store the packet inside the ring buffer instead (add the packet to the end of the buffer; if the buffer capacity exceeds the limit, remove the very first packet from it).
Find the place where ReceiverSessionGroup creates a new session and, after creating the session, iterate over the ring buffer and try to pass every packet to the newly created session (sess->handle(packet)); if the session accepts the packet, remove it from the ring buffer.
Tests
We should cover this feature in receiver unit tests: https://github.com/roc-streaming/roc-toolkit/blob/master/src/tests/roc_pipeline/test_receiver_source.cpp
The following cases come into my mind:
N repair packets (N = ring_buffer_size) for the session A arrive before source packets and then a source packet arrives and the session is created; the newly created session should receive all the N repair packets
N repair packets (N > ring_buffer_size) for the session A arrive before source packets and then a source packet arrives and the session is created; the new session should receive only ring_buffer_size repair packets
N repair packets (N = ring_buffer_size) for the session A arrive before source packets, then K repair packets (K < ring_buffer_size) for the session B arrive, then source packets arrive for sessions A and B; the session B should receive all the K repair packets, and the session A should not receive any repair packets