roc-streaming / roc-toolkit

Real-time audio streaming over the network.
https://roc-streaming.org
Mozilla Public License 2.0
1.06k stars 213 forks source link

Add ring buffer to store packets without session. #357

Closed fusuiyi123 closed 3 years ago

fusuiyi123 commented 4 years ago

Issue: #217

gavv commented 4 years ago

Thanks for the PR! I'll take a look soon.

gavv commented 4 years ago

Please rebase on fresh develop, it was force-pushed. I guess you can just --skip all conflicting commits when rebasing.

gavv commented 4 years ago

Ping me when this will be ready for review.

fusuiyi123 commented 4 years ago

sure, I am looking at the unit test.

fusuiyi123 commented 4 years ago

are we trying to add a unit test int https://github.com/roc-project/roc/blob/develop/src/tests/roc_pipeline/test_sender_sink_receiver_source.cpp?

fusuiyi123 commented 4 years ago

I imitate https://github.com/roc-project/roc/blob/develop/src/tests/roc_pipeline/test_sender_sink_receiver_source.cpp#L234 to send repair packets first then source packet. could you suggest if it is correct? Though I got

.....................
src/modules/roc_packet/queue.cpp:25: error: roc_panic()

ERROR: roc_packet: queue: null packet

Backtrace:
#1: 0x10f5f580d roc::core::print_backtrace()+0xbd
#2: 0x10f5f6621 roc::core::crash(char const*)+0x71
#3: 0x10f5fc4c9 roc::core::panic(char const*, char const*, int, char const*, ...)+0x419
#4: 0x10f5e3e58 roc::packet::Queue::write(roc::core::SharedPtr<roc::packet::Packet, roc::core::RefCntOwnership> const&)+0xb8
#5: 0x10f457fa3 _ZN3roc8pipeline4test12PacketSender5writeERKNS_4core9SharedPtrINS_6packet6PacketENS3_15RefCntOwners+0x1f3
#6: 0x10f45b7c5 _ZN3roc8pipeline51TEST_GROUP_CppUTestGroupsender_sink_receiver_source28filter_repair_source_packets+0x835
#7: 0x10f4537ec roc::pipeline::TEST_GROUP_CppUTestGroupsender_sink_receiver_source::fetch_saved_repair_packets(int)+0xe4c
#8: 0x10f45298c roc::pipeline::TEST_sender_sink_receiver_source_fec_ldpc_ring_buffer_Test::testBody()+0x1cc
#9: 0x10f61f61a PlatformSpecificSetJmpImplementation(void (*)(void*), void*)+0x3d
#10: 0x10f61ece8 Utest::run()+0x32
#11: 0x10f61cfc7 UtestShell::runOneTestInCurrentProcess(TestPlugin*, TestResult&)+0x97
#12: 0x10f61f61a PlatformSpecificSetJmpImplementation(void (*)(void*), void*)+0x3d
#13: 0x10f61ce9f UtestShell::runOneTest(TestPlugin*, TestResult&)+0x45
#14: 0x10f611e5f TestRegistry::runAllTests(TestResult&)+0xa1
#15: 0x10f612cd3 CommandLineTestRunner::runAllTests()+0xd1
#16: 0x10f61296e CommandLineTestRunner::runAllTestsMain()+0x68
#17: 0x10f6127c8 CommandLineTestRunner::RunAllTests(int, char const**)+0x9e
#18: 0x10f4db7ee main+0x3de
scons: *** [test/roc_pipeline] Error -6

Could you suggest how to debug the unit test? Thanks 😄

fusuiyi123 commented 4 years ago

And here

        packet_sender.deliver(Latency / SamplesPerPacket);

        for (size_t np = 0; np < ManyFrames / FramesPerPacket; np++) {
            for (size_t nf = 0; nf < FramesPerPacket; nf++) {
                frame_reader.read_samples(SamplesPerFrame * NumCh, num_sessions);

                UNSIGNED_LONGS_EQUAL(num_sessions, receiver.num_sessions());
            }

            packet_sender.deliver(1);
        }

I am a little bit confused why there is still a packet_sender.deliver(1) though there is already packet_sender.deliver(Latency / SamplesPerPacket);

gavv commented 4 years ago

are we trying to add a unit test int https://github.com/roc-project/roc/blob/develop/src/tests/roc_pipeline/test_sender_sink_receiver_source.cpp?

test_sender_sink_receiver_source.cpp tests the combination of sender + receiver. It converts audio stream to packet stream using sender pipeline, then converts packet stream back to audio stream using receiver pipeline, and compare the original and resulting audio streams.

It's now well suited for testing various lower-level aspects of the pipeline. For that purpose, we have test_receiver_source (detailed tests for receiver pipeline) and test_sender_sink (detailed tests for sender pipeline).

So I guess this specific feature it's better to be covered in test_receiver_source.

gavv commented 4 years ago

I imitate https://github.com/roc-project/roc/blob/develop/src/tests/roc_pipeline/test_sender_sink_receiver_source.cpp#L234 to send repair packets first then source packet. could you suggest if it is correct?

I'd suggest to start with one of the simple receiver_source tests, e.g. TEST(receiver_source, one_session). You can copy it and modify to use SenderSink + FrameWriter instead of PacketWriter to generate the packet stream. (Using PacketWriter we're generating the packet stream manually; using SenderSink + FrameWriter we will generate audio stream instead (stream of samples), pass it to FrameWriter, which will pass frames to SenderSink, which will generate packet stream for us.)

SenderSink will generate both source and repair packets and write them to output writer. You can use a queue or create a specialized mock writer for that. The you can gather generated packets, filter them somehow (e.g. reorder), pass to ReceiverSource, and check how it handles them.

gavv commented 4 years ago

ERROR: roc_packet: queue: null packet

That means that you're passing null (shared) pointer to write().

gavv commented 4 years ago

Could you suggest how to debug the unit test?

Take a look at this section in developer cookbook: https://github.com/roc-project/roc/blob/develop/docs/sphinx/building/developer_cookbook.rst#running-tests-manually

You can run the test binary with verbose logs or under debugger. You can also select specific test case to run.

gavv commented 4 years ago

I am a little bit confused why there is still a packet_sender.deliver(1) though there is already packet_sender.deliver(Latency / SamplesPerPacket);

Good question. This is because the receiver pipeline includes DelayedReader, which you can find on diagram here: https://roc-project.github.io/roc/docs/internals/data_flow.html#receiver-pipeline

The purpose of DelayedReader is to delay the pipeline start until we accumulate at least target_latency samples.

This way, we create the additional latency (called session latency) between the sender and receiver. This latency is needed for two purposes: to compensate packet reordering by network and to allow FEC reader to repair missing packets. The better is the network, the lower session latency we can use.

A similar feature is often called "jitter buffer", though in our case it's not exactly a buffer and it's used not only for jitter, but also for FEC.

So we first feed pipeline with the amount of packets corresponding to Latency samples, and the pipeline don't process these packets yet, but only accumulates them. Then we start a loop that iteratively adds one more packet and reads the amount of frames corresponding to one packet. Note that this way the retrieved audio stream (via frame reader) lags behind the packet stream (pushed via packet writer) by the amount of packets corresponding to Latency samples.

gavv commented 4 years ago

Also, here is a small hint. Both Frame and Packet classes have print() methods. You can add debug prints to inspect how the packet stream corresponds to the audio (i.e. frame) stream, and vice versa.

fusuiyi123 commented 4 years ago

thanks for your guidance! will look at it in weekends.

fusuiyi123 commented 4 years ago

so we need a multiple methods defined in test_sender_sink.cpp and should I directly copy paste to test_receiver_source.cpp?

fusuiyi123 commented 4 years ago

and if I understand correctly, https://github.com/roc-project/roc/blob/develop/src/tests/roc_pipeline/test_sender_sink.cpp#L244-L251 packet_reader will read packet and stores the packet in the queue so that we can change the order of the packets, but why CHECK(!queue.read()) which check there is no data in the queue..?

gavv commented 4 years ago

so we need a multiple methods defined in test_sender_sink.cpp and should I directly copy paste to test_receiver_source.cpp?

Hm, I don't quite understand the question.

packet_reader will read packet and stores the packet in the queue so that we can change the order of the packets, but why CHECK(!queue.read()) which check there is no data in the queue..?

test::PacketReader::read_packet reads one packet (by calling queue.read) from queue and check its contents. We call it ManyFrames / FramesPerPacket times to read and check this exact number of packets. Then we call queue.read() manually and check that it returns NULL to ensure that the queue is now empty and thus contained exactly expected number of packets and no more.

gavv commented 4 years ago

so we need a multiple methods defined in test_sender_sink.cpp and should I directly copy paste to test_receiver_source.cpp?

If you're talking about helpers like add_endpoint_set() for endpoint manipulation, I think yes, we could copy-paste them.

fusuiyi123 commented 4 years ago

I see, so after frame_writer.write_samples, the frames are already stored and read_packet is reading and check packet in the queue. Then should I add another method in test::PacketReader to store and reorder the packets order? And after that how do I correctly pass the packets queue to ReceiverSource? Should I overload PacketWriter::write_packets to pass PacketPtr& to the method? Thank you!

gavv commented 4 years ago

I see, so after frame_writer.write_samples, the frames are already stored and read_packet is reading and check packet in the queue.

Yep.

Then should I add another method in test::PacketReader to store and reorder the packets order?

Maybe. Or you can avoid using PacketReader at all, and instead just manually read packets from queue and e.g. place them into a local array and then just write to ReceiverSource in different order. Or you can extract a separate helper class for reordering.

I think the way with a local array is the simplest one, but if it will make the test too large, we can extract reading + reordering + writing from the test.

And after that how do I correctly pass the packets queue to ReceiverSource? Should I overload PacketWriter::write_packets to pass PacketPtr& to the method?

I think you don't need PacketWriter. PacketWriter is designed to build packets from samples, and you already have all packets and can just write them to ReceiverSource in the order you need.

BTW, maybe it's better to rename PacketWriter to something like PacketGenerator and PacketReader to something like PacketChecker. If you think it will reduce confusion, you could create a PR with such renaming.

fusuiyi123 commented 4 years ago

Thanks for the comments. I am a little bit busy recently, will squeeze some time to update the unit test.

fusuiyi123 commented 4 years ago

Sorry this is a little bit new and difficult to me. Here is the draft and my questions

TEST(receiver_source, fetch_saved_repair_packets) {
    packet::Queue queue;

    SenderSink sender(scheduler, sender_config, format_map, packet_pool, byte_buffer_pool,
                      sample_buffer_pool, allocator);
    CHECK(sender.valid());

    SenderSink::EndpointSetHandle endpoint_set = add_endpoint_set(sender);
    CHECK(endpoint_set);

    SenderSink::EndpointHandle source_endpoint =
        create_endpoint(sender, endpoint_set, address::Iface_AudioSource, source_proto);
    CHECK(source_endpoint);

    set_endpoint_output_writer(sender, source_endpoint, queue);
    set_endpoint_destination_udp_address(sender, source_endpoint, dst_addr);

    test::FrameWriter frame_writer(sender, sample_buffer_pool);

    for (size_t nf = 0; nf < ManyFrames; nf++) {
        frame_writer.write_samples(SamplesPerFrame * NumCh);
    }
    // so here the queue store all the packets?
    // here I create a local array and reorder it.
    // but how to channel the local array to the receiver..?
    ReceiverSource receiver(scheduler, config, format_map, packet_pool, byte_buffer_pool,
                            sample_buffer_pool, allocator);
    CHECK(receiver.valid());
    test::FrameReader frame_reader(receiver, sample_buffer_pool);

    for (size_t np = 0; np < ManyPackets; np++) {
        for (size_t nf = 0; nf < FramesPerPacket; nf++) {
            frame_reader.read_samples(SamplesPerFrame * NumCh, 1);

            UNSIGNED_LONGS_EQUAL(1, receiver.num_sessions());
        }
    }
}
gavv commented 4 years ago

so here the queue store all the packets?

Exactly.

here I create a local array and reorder it.

Yes.

but how to channel the local array to the receiver..?

See, for example, TEST(receiver_source, one_session).

We need to create receiver, register a new endpoint set, and then add one (source) or two (source + repair) endpoints to it:

    ReceiverSource receiver(scheduler, config, format_map, packet_pool, byte_buffer_pool,
                            sample_buffer_pool, allocator);

    CHECK(receiver.valid());

    ReceiverSource::EndpointSetHandle endpoint_set = add_endpoint_set(receiver);
    CHECK(endpoint_set);

    packet::IWriter* endpoint1_writer =
        add_endpoint(receiver, endpoint_set, address::Iface_AudioSource, proto1);
    CHECK(endpoint1_writer);

When you're adding an endpoint, you get endpoint writer, a pointer to packet::IWriter. The you can just pass packets to that writer to simulate delivery to that endpoint.

You'll have to pass source packets to source endpoint's writer, and repair packets to repair endpoint's writer. (You can determine packet type by its flags).

When you pass packets to receiver endpoints, they're internally routed to a session (or stored in a ring buffer which you have implemented). Then you call FrameReader, which calls Receiver.read(), which reads those packets from queues and builds audio frames from them.

IIRC tests in test_receiver_source.cpp always use a single (source) endpoint. test_sender_sink_receiver_source.cpp can give you an example of using both source and repair endpoints.

gavv commented 4 years ago

Endpoint set is needed to group related endpoints. Currently each endpoint set can have one source or one source + one repair endpoints.

Endpoint sets are needed for packet routing. When a packet is delivered to an endpoint, we should find a session for it. We should ensure that a session will never receive packets from unrelated endpoints. For example, if we have 4 endpoints: sourceA+repairA (e.g. for reed-solomon FEC scheme) and sourceB+repairB (e.g. for LDPC FEC scheme), a session can receive packets from sourceA+repairA, or from sourceB+repairB, but never from sourceA+repairB or sourceB+repairA.

So we group related endpoints into endpoint sets, in this example we'll have two sets: setA (sourceA+repairA) and setB (sourceB+repairB).

Each endpoint set has its own session group (ReceiverSessionGroup). One endpoint set corresponds to one session group and vice versa.

Session group contains a list of sessions to which packets may be routed when they are delivered to one of the endpoint of the endpoint set.

So in this example we'll have two session groups, one for setA and one for setB. For example, packets delivered to endpoints sourceA and repairA will be routed to one of the sessions of setA.

You have added ring buffer to ReceiverSessionGroup, so each endpoint set will now additionally have its own ring buffer.

I suppose that we can make most ring buffer tests operating with a single endpoint set. However we also need one or a few more advanced tests operating with different endpoint sets, to check that their ring buffers are independent.

fusuiyi123 commented 4 years ago

Hi @gavv thanks for your detailed explanations! I updated the test based on your suggestions and here is what I got:

(base) fusuiyi@fusuiyi:~/Desktop/opensource/roc% ./bin/x86_64-pc-apple-darwin19.4.0/roc-test-pipeline -v -g receiver_source -n fetch_saved_repair_packets
TEST(receiver_source, fetch_saved_repair_packets)10:29:10.245 [14416740] [dbg] roc_ctl: task queue: starting event loop
10:29:10.246 [14416716] [inf] roc_pipeline: sender sink: adding endpoint set
10:29:10.246 [14416716] [dbg] roc_pipeline: sender endpoint set: adding source endpoint rtp+rs8m
10:29:10.246 [14416716] [dbg] roc_pipeline: sender endpoint set: adding repair endpoint rs8m
10:29:10.246 [14416716] [dbg] roc_fec: openfec encoder: initializing: codec=rs m=8
10:29:10.246 [14416716] [dbg] roc_fec: fec writer: update block size: cur_sbl=0 cur_rbl=0 new_sbl=20 new_rbl=10
10:29:10.246 [14416716] [dbg] roc_audio: packetizer: initializing: n_channels=2 samples_per_packet=100
10:29:10.247 [14416716] [dbg] roc_audio: profiler: avg for last 1.0 sec: 135832 sample/sec (3.08 sec/sec)
10:29:10.247 [14416716] [dbg] roc_pipeline: task pipeline: tasks=7 in_place=1.00 in_frame=0.00 preempts=0 sched=0/0
10:29:10.247 [14416716] [dbg] roc_packet: router: detected new stream: source=1209445379 flags=0x8u
10:29:10.248 [14416716] [dbg] roc_packet: router: detected new stream: source=0 flags=0x10u
10:29:10.248 [14416716] [dbg] roc_audio: mixer: initializing: frame_size=500
10:29:10.249 [14416716] [dbg] roc_pipeline: receiver endpoint set: initializing
10:29:10.249 [14416716] [dbg] roc_pipeline: receiver endpoint set: adding source endpoint rtp+rs8m
10:29:10.249 [14416716] [dbg] roc_pipeline: receiver endpoint set: adding repair endpoint rs8m

src/modules/roc_packet/packet.cpp:22: error: roc_panic()

ERROR: roc_packet: packet: can't add flag more than once

Backtrace:
#1: 0x104a1193d roc::core::print_backtrace()+0xbd
#2: 0x104a12751 roc::core::crash(char const*)+0x71
#3: 0x104a188f9 roc::core::panic(char const*, char const*, int, char const*, ...)+0x419
#4: 0x1049fcad0 roc::packet::Packet::add_flags(unsigned int)+0xe0
#5: 0x10490fae1 _ZN3roc3fec6ParserINS0_14RS8M_PayloadIDELNS0_14PayloadID_TypeE0ELNS0_13PayloadID_PosE1EE5parseERNS_+0x491
#6: 0x10490a599 roc::pipeline::ReceiverEndpoint::flush_packets()+0x669
#7: 0x104914972 roc::pipeline::ReceiverEndpointSet::update(unsigned int)+0x1d2
#8: 0x1049300aa roc::pipeline::ReceiverSource::process_frame_imp(roc::audio::Frame&)+0x3fa
#9: 0x104961348 roc::pipeline::TaskPipeline::process_next_subframe_(roc::audio::Frame&, unsigned long*)+0x858
#10: 0x10495f06d roc::pipeline::TaskPipeline::process_frame_and_tasks_precise_(roc::audio::Frame&)+0x65d
#11: 0x10495e923 roc::pipeline::TaskPipeline::process_frame_and_tasks(roc::audio::Frame&)+0x253
#12: 0x10492f1f3 roc::pipeline::ReceiverSource::read(roc::audio::Frame&)+0x783
#13: 0x1047df96a roc::pipeline::test::FrameReader::read_samples(unsigned long, unsigned long)+0x97a
#14: 0x10481bfbc roc::pipeline::TEST_receiver_source_fetch_saved_repair_packets_Test::testBody()+0x2ffc
#15: 0x104a3ba4a PlatformSpecificSetJmpImplementation(void (*)(void*), void*)+0x3d
#16: 0x104a3b118 Utest::run()+0x32
#17: 0x104a393f7 UtestShell::runOneTestInCurrentProcess(TestPlugin*, TestResult&)+0x97
#18: 0x104a3ba4a PlatformSpecificSetJmpImplementation(void (*)(void*), void*)+0x3d
#19: 0x104a392cf UtestShell::runOneTest(TestPlugin*, TestResult&)+0x45
#20: 0x104a2e28f TestRegistry::runAllTests(TestResult&)+0xa1
#21: 0x104a2f103 CommandLineTestRunner::runAllTests()+0xd1
#22: 0x104a2ed9e CommandLineTestRunner::runAllTestsMain()+0x68
#23: 0x104a2ebf8 CommandLineTestRunner::RunAllTests(int, char const**)+0x9e
#24: 0x1048ebbde main+0x3de

still need some guidance of this debug message...

gavv commented 4 years ago
ERROR: roc_packet: packet: can't add flag more than once

This panic indicates that Packet::add_flag() was called more than once with the same flag. This is a safety check. It's not bad by itself, but current workflow implies that each flag is set only in one place, so this panic indicates that something went wrong.

In our case, it indicated the problem with passing packets from sender to receiver. In real app, a packet produced by sender will be formatted into bytes, sent over network, and then parsed from bytes. In your test, however, you pass the Packet object produced by sender directly to receiver, so some unwanted state like packet flags leaks from sender to receiver. That's now very good because it makes the test a bit different from real usage, and fortunately we have a panic that found this problem.

We already experienced the same problem in roc_fec tests. We solved it by introducing PacketDispatcher::reparsepacket() method. Instead of passing Packet object from sender to receiver, we format packet into bytes, parse it again from bytes to a brand new Packet object, and pass it to receiver. I think we should use the similar approach here.

github-actions[bot] commented 4 years ago

:umbrella: The latest upstream change (presumably these) made this pull request unmergeable. Please resolve the merge conflicts.

gavv commented 4 years ago

@fusuiyi123 Hi, do you have plans to finish this, may be partially? (e.g. we can create a new issue for missing tests).

fusuiyi123 commented 4 years ago

Hi @gavv very sorry for my late reply. I am actually frustrated at my current job, so I don't feel like coding except for my work(which I have to do) for the time being. Hope you can understand. I will come back to this when I feel better(if the issue is still open). Thanks!!

gavv commented 4 years ago

@fusuiyi123 Thanks for reply! Sounds familiar to me :) No worries.